This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new f8adc22119d Subscription: implement subscription event optimistic
transmission strategy to reduce peak memory usage (#13763) (#13928)
f8adc22119d is described below
commit f8adc22119de41663011db584907b8662e65e94b
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Oct 29 00:06:45 2024 +0800
Subscription: implement subscription event optimistic transmission strategy
to reduce peak memory usage (#13763) (#13928)
---
.../iotdb/rpc/subscription/config/TopicConfig.java | 7 +-
.../payload/poll/SubscriptionPollResponse.java | 20 +-
.../db/subscription/broker/SubscriptionBroker.java | 20 +-
.../broker/SubscriptionPrefetchingQueue.java | 21 +-
.../broker/SubscriptionPrefetchingTabletQueue.java | 7 +-
.../broker/SubscriptionPrefetchingTsFileQueue.java | 6 +-
.../db/subscription/event/SubscriptionEvent.java | 298 +++++----------------
.../event/batch/SubscriptionPipeEventBatch.java | 84 +++++-
.../batch/SubscriptionPipeTabletEventBatch.java | 157 ++++-------
.../batch/SubscriptionPipeTsFileEventBatch.java | 81 +++---
.../cache/CachedSubscriptionPollResponse.java | 83 ++++++
.../SubscriptionPollResponseCache.java} | 43 +--
.../event/pipe/SubscriptionPipeEmptyEvent.java | 7 -
.../event/pipe/SubscriptionPipeEvents.java | 7 -
.../pipe/SubscriptionPipeTabletBatchEvents.java | 7 -
.../pipe/SubscriptionPipeTsFileBatchEvents.java | 14 +-
.../pipe/SubscriptionPipeTsFilePlainEvent.java | 7 -
.../SubscriptionEventExtendableResponse.java | 151 +++++++++++
.../SubscriptionEventResponse.java} | 36 ++-
.../response/SubscriptionEventSingleResponse.java | 122 +++++++++
.../response/SubscriptionEventTabletResponse.java | 162 +++++++++++
.../response/SubscriptionEventTsFileResponse.java | 170 ++++++++++++
.../receiver/SubscriptionReceiverV1.java | 2 +-
.../db/subscription/SubscriptionStatesTest.java | 5 -
24 files changed, 997 insertions(+), 520 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
index 4bec8d762a2..46dc7601e0f 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
@@ -106,7 +106,7 @@ public class TopicConfig extends PipeParameters {
}
public Map<String, String> getAttributesWithRealtimeMode() {
- return REALTIME_STREAM_MODE_CONFIG;
+ return REALTIME_STREAM_MODE_CONFIG; // default to stream (hybrid)
}
public Map<String, String> getAttributesWithSourceMode() {
@@ -136,9 +136,6 @@ public class TopicConfig extends PipeParameters {
}
public Map<String, String> getAttributesWithSinkFormat() {
- return TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equalsIgnoreCase(
- attributes.getOrDefault(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_DEFAULT_VALUE))
- ? Collections.emptyMap()
- : SINK_TABLET_FORMAT_CONFIG;
+ return Collections.emptyMap(); // default to hybrid
}
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java
index 01d173d2742..06baa30acee 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java
@@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
public class SubscriptionPollResponse {
@@ -110,16 +112,18 @@ public class SubscriptionPollResponse {
return new SubscriptionPollResponse(responseType, payload, commitContext);
}
- /////////////////////////////// object ///////////////////////////////
+ /////////////////////////////// stringify ///////////////////////////////
@Override
public String toString() {
- return "SubscriptionPollResponse{responseType="
- + SubscriptionPollResponseType.valueOf(responseType).toString()
- + ", payload="
- + payload
- + ", commitContext="
- + commitContext
- + "}";
+ return "SubscriptionPollResponse" + coreReportMessage();
+ }
+
+ protected Map<String, String> coreReportMessage() {
+ final Map<String, String> result = new HashMap<>();
+ result.put("responseType",
SubscriptionPollResponseType.valueOf(responseType).toString());
+ result.put("payload", payload.toString());
+ result.put("commitContext", commitContext.toString());
+ return result;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index afc8f2f2290..c4e47dec150 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -24,13 +24,11 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
-import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent;
import
org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetrics;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
-import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
@@ -103,16 +101,14 @@ public class SubscriptionBroker {
brokerId);
events.add(
new SubscriptionEvent(
- new SubscriptionPipeEmptyEvent(),
- new SubscriptionPollResponse(
- SubscriptionPollResponseType.TERMINATION.getType(),
- new TerminationPayload(),
- new SubscriptionCommitContext(
-
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
- PipeDataNodeAgent.runtime().getRebootTimes(),
- topicName,
- brokerId,
- INVALID_COMMIT_ID))));
+ SubscriptionPollResponseType.TERMINATION.getType(),
+ new TerminationPayload(),
+ new SubscriptionCommitContext(
+
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
+ PipeDataNodeAgent.runtime().getRebootTimes(),
+ topicName,
+ brokerId,
+ INVALID_COMMIT_ID)));
continue;
}
// There are two reasons for not printing logs here:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index e072b7a8d51..9aa9343083d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -28,13 +28,11 @@ import
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches;
-import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
-import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
import com.google.common.collect.ImmutableSet;
@@ -288,6 +286,7 @@ public abstract class SubscriptionPrefetchingQueue {
}
protected void enqueueEventToPrefetchingQueue(final SubscriptionEvent event)
{
+ // TODO: consider memory usage
event.trySerializeCurrentResponse();
prefetchingQueue.add(event);
}
@@ -532,16 +531,14 @@ public abstract class SubscriptionPrefetchingQueue {
protected SubscriptionEvent generateSubscriptionPollErrorResponse(final
String errorMessage) {
// consider non-critical by default, meaning the client can retry
return new SubscriptionEvent(
- new SubscriptionPipeEmptyEvent(),
- new SubscriptionPollResponse(
- SubscriptionPollResponseType.ERROR.getType(),
- new ErrorPayload(errorMessage, false),
- new SubscriptionCommitContext(
- IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
- PipeDataNodeAgent.runtime().getRebootTimes(),
- topicName,
- brokerId,
- INVALID_COMMIT_ID)));
+ SubscriptionPollResponseType.ERROR.getType(),
+ new ErrorPayload(errorMessage, false),
+ new SubscriptionCommitContext(
+ IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
+ PipeDataNodeAgent.runtime().getRebootTimes(),
+ topicName,
+ brokerId,
+ INVALID_COMMIT_ID));
}
//////////////////////////// APIs provided for metric framework
////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index ea7a652e461..09f41cf23cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.subscription.broker;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -162,11 +163,7 @@ public class SubscriptionPrefetchingTabletQueue extends
SubscriptionPrefetchingQ
@Override
protected boolean onEvent(final TsFileInsertionEvent event) {
- LOGGER.warn(
- "Subscription: SubscriptionPrefetchingTabletQueue {} ignore
TsFileInsertionEvent {} when prefetching.",
- this,
- event);
- return false;
+ return batches.onEvent((EnrichedEvent) event,
this::enqueueEventToPrefetchingQueue);
}
/////////////////////////////// stringify ///////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 64daaca4b4c..5d12eb490cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -240,10 +240,8 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
final SubscriptionEvent ev =
new SubscriptionEvent(
new SubscriptionPipeTsFilePlainEvent((PipeTsFileInsertionEvent)
event),
- new SubscriptionPollResponse(
- SubscriptionPollResponseType.FILE_INIT.getType(),
- new FileInitPayload(((PipeTsFileInsertionEvent)
event).getTsFile().getName()),
- commitContext));
+ ((PipeTsFileInsertionEvent) event).getTsFile(),
+ commitContext);
super.enqueueEventToPrefetchingQueue(ev);
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index ad274023f6a..0e7fd04d009 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -20,34 +20,29 @@
package org.apache.iotdb.db.subscription.event;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue;
+import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
+import
org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
+import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent;
import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEvents;
-import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
-import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload;
-import org.apache.iotdb.rpc.subscription.payload.poll.FilePiecePayload;
-import org.apache.iotdb.rpc.subscription.payload.poll.FileSealPayload;
+import
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTabletBatchEvents;
+import
org.apache.iotdb.db.subscription.event.response.SubscriptionEventResponse;
+import
org.apache.iotdb.db.subscription.event.response.SubscriptionEventSingleResponse;
+import
org.apache.iotdb.db.subscription.event.response.SubscriptionEventTabletResponse;
+import
org.apache.iotdb.db.subscription.event.response.SubscriptionEventTsFileResponse;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
-import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
-import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
-import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
-import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
import static
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID;
@@ -58,81 +53,63 @@ public class SubscriptionEvent {
private static final long INVALID_TIMESTAMP = -1;
private final SubscriptionPipeEvents pipeEvents;
-
- private final SubscriptionPollResponse[] responses;
- private int currentResponseIndex = 0;
-
- private final ByteBuffer[] byteBuffers; // serialized responses
- private final SubscriptionCommitContext
- commitContext; // all responses have the same commit context
+ private final SubscriptionEventResponse<CachedSubscriptionPollResponse>
response;
+ private final SubscriptionCommitContext commitContext;
// lastPolledConsumerId is not used as a criterion for determining
pollability
private volatile String lastPolledConsumerId = null;
private final AtomicLong lastPolledTimestamp = new
AtomicLong(INVALID_TIMESTAMP);
private final AtomicLong committedTimestamp = new
AtomicLong(INVALID_TIMESTAMP);
+ // record file name for file payload
+ private String fileName;
+
/**
- * Constructs a {@link SubscriptionEvent} with an initial response.
- *
- * @param pipeEvents The underlying pipe events corresponding to this {@link
SubscriptionEvent}.
- * @param initialResponse The initial response which must be of type {@link
FileInitPayload}. This
- * indicates that subsequent responses need to be fetched using {@link
- * SubscriptionEvent#prefetchRemainingResponses()}.
+ * Constructs a {@link SubscriptionEvent} with the response type of {@link
+ * SubscriptionEventSingleResponse}.
*/
public SubscriptionEvent(
- final SubscriptionPipeEvents pipeEvents, final SubscriptionPollResponse
initialResponse) {
- this.pipeEvents = pipeEvents;
+ final short responseType,
+ final SubscriptionPollPayload payload,
+ final SubscriptionCommitContext commitContext) {
+ this.pipeEvents = new SubscriptionPipeEmptyEvent();
+ this.response = new SubscriptionEventSingleResponse(responseType, payload,
commitContext);
+ this.commitContext = commitContext;
+ }
- final int responseLength =
getResponseLength(initialResponse.getResponseType());
- this.responses = new SubscriptionPollResponse[responseLength];
- this.responses[0] = initialResponse;
+ @TestOnly
+ public SubscriptionEvent(final SubscriptionPollResponse response) {
+ this(response.getResponseType(), response.getPayload(),
response.getCommitContext());
+ }
- this.byteBuffers = new ByteBuffer[responseLength];
- this.commitContext = initialResponse.getCommitContext();
+ /**
+ * Constructs a {@link SubscriptionEvent} with the response type of {@link
+ * SubscriptionEventTabletResponse}.
+ */
+ public SubscriptionEvent(
+ final SubscriptionPipeTabletEventBatch batch, final
SubscriptionCommitContext commitContext) {
+ this.pipeEvents = new SubscriptionPipeTabletBatchEvents(batch);
+ this.response = new SubscriptionEventTabletResponse(batch, commitContext);
+ this.commitContext = commitContext;
}
/**
- * Constructs a {@link SubscriptionEvent} with a list of responses.
- *
- * @param pipeEvents The underlying pipe events corresponding to this {@link
SubscriptionEvent}.
- * @param responses A list of responses that can be of types {@link
TabletsPayload}, {@link
- * TerminationPayload}, or {@link ErrorPayload}. All responses are
already generated at the
- * time of construction, so {@link
SubscriptionEvent#prefetchRemainingResponses()} is not
- * required.
+ * Constructs a {@link SubscriptionEvent} with the response type of {@link
+ * SubscriptionEventTsFileResponse}.
*/
public SubscriptionEvent(
- final SubscriptionPipeEvents pipeEvents, final
List<SubscriptionPollResponse> responses) {
+ final SubscriptionPipeEvents pipeEvents,
+ final File tsFile,
+ final SubscriptionCommitContext commitContext) {
this.pipeEvents = pipeEvents;
+ this.response = new SubscriptionEventTsFileResponse(tsFile, commitContext);
+ this.commitContext = commitContext;
- final int responseLength = responses.size();
- this.responses = new SubscriptionPollResponse[responseLength];
- for (int i = 0; i < responseLength; i++) {
- this.responses[i] = responses.get(i);
- }
-
- this.byteBuffers = new ByteBuffer[responseLength];
- this.commitContext = this.responses[0].getCommitContext();
- }
-
- private int getResponseLength(final short responseType) {
- if (!Objects.equals(SubscriptionPollResponseType.FILE_INIT.getType(),
responseType)) {
- LOGGER.warn("unexpected response type: {}", responseType);
- return 1;
- }
- final long fileLength = pipeEvents.getTsFile().length();
- final long readFileBufferSize =
- SubscriptionConfig.getInstance().getSubscriptionReadFileBufferSize();
- final int length = (int) (fileLength / readFileBufferSize);
- // add for init, last piece and seal
- return (fileLength % readFileBufferSize != 0) ? length + 3 : length + 2;
+ this.fileName = tsFile.getName();
}
public SubscriptionPollResponse getCurrentResponse() {
- return getResponse(currentResponseIndex);
- }
-
- private SubscriptionPollResponse getResponse(final int index) {
- return responses[index];
+ return response.getCurrentResponse();
}
public SubscriptionCommitContext getCommitContext() {
@@ -158,7 +135,7 @@ public class SubscriptionEvent {
// event with invalid commit id is uncommittable
return false;
}
- return currentResponseIndex >= responses.length - 1;
+ return response.isCommittable();
}
public void ack() {
@@ -172,10 +149,12 @@ public class SubscriptionEvent {
*/
public void cleanUp() {
// reset serialized responses
- resetResponseByteBuffer(true);
+ response.cleanUp();
// clean up pipe events
pipeEvents.cleanUp();
+
+ // TODO: clean more fields
}
//////////////////////////// pollable ////////////////////////////
@@ -224,8 +203,8 @@ public class SubscriptionEvent {
}
public void nack() {
- // reset current response index
- currentResponseIndex = 0;
+ // nack response
+ response.nack();
// reset lastPolledTimestamp makes this event pollable
lastPolledTimestamp.set(INVALID_TIMESTAMP);
@@ -241,131 +220,31 @@ public class SubscriptionEvent {
//////////////////////////// prefetch & fetch ////////////////////////////
- /**
- * @param index the index of response to be prefetched
- */
- private void prefetchResponse(final int index) throws IOException {
- if (index >= responses.length || index <= 0) {
- return;
- }
-
- if (Objects.nonNull(responses[index])) {
- return;
- }
-
- final SubscriptionPollResponse previousResponse = this.getResponse(index -
1);
- final short responseType = previousResponse.getResponseType();
- final SubscriptionPollPayload payload = previousResponse.getPayload();
- if (!SubscriptionPollResponseType.isValidatedResponseType(responseType)) {
- LOGGER.warn("unexpected response type: {}", responseType);
- return;
- }
-
- switch (SubscriptionPollResponseType.valueOf(responseType)) {
- case FILE_INIT:
- responses[index] =
generateSubscriptionPollResponseWithPieceOrSealPayload(0);
- break;
- case FILE_PIECE:
- responses[index] =
- generateSubscriptionPollResponseWithPieceOrSealPayload(
- ((FilePiecePayload) payload).getNextWritingOffset());
- break;
- case FILE_SEAL:
- // not need to prefetch
- return;
- default:
- LOGGER.warn("unexpected message type: {}", responseType);
- }
- }
-
public void prefetchRemainingResponses() throws IOException {
- for (int currentIndex = currentResponseIndex;
- currentIndex < responses.length - 1;
- currentIndex++) {
- if (Objects.isNull(responses[currentIndex + 1])) {
- prefetchResponse(currentIndex + 1);
- return;
- }
- }
+ response.prefetchRemainingResponses();
}
public void fetchNextResponse() throws IOException {
- if (currentResponseIndex >= responses.length - 1) {
- LOGGER.warn("No more responses when fetching next response for {}, do
nothing.", this);
- return;
- }
- if (Objects.isNull(responses[currentResponseIndex + 1])) {
- prefetchRemainingResponses();
- }
- currentResponseIndex++;
+ response.fetchNextResponse();
}
//////////////////////////// byte buffer ////////////////////////////
public void trySerializeRemainingResponses() {
- for (int currentIndex = currentResponseIndex;
- currentIndex < responses.length - 1;
- currentIndex++) {
- if (Objects.nonNull(responses[currentIndex + 1]) &&
trySerializeResponse(currentIndex + 1)) {
- break;
- }
- }
- }
-
- public boolean trySerializeCurrentResponse() {
- return trySerializeResponse(currentResponseIndex);
+ // TODO: consider memory usage
+ response.trySerializeRemainingResponses();
}
- /**
- * @param index the index of response to be serialized
- * @return {@code true} if a serialization operation was actually performed
- */
- private boolean trySerializeResponse(final int index) {
- if (index >= responses.length) {
- return false;
- }
-
- if (Objects.isNull(responses[index])) {
- return false;
- }
-
- if (Objects.nonNull(byteBuffers[index])) {
- return false;
- }
-
- final Optional<ByteBuffer> optionalByteBuffer =
-
SubscriptionEventBinaryCache.getInstance().trySerialize(responses[index]);
- if (optionalByteBuffer.isPresent()) {
- byteBuffers[index] = optionalByteBuffer.get();
- return true;
- }
-
- return false;
+ public void trySerializeCurrentResponse() {
+ response.trySerializeCurrentResponse();
}
public ByteBuffer getCurrentResponseByteBuffer() throws IOException {
- if (Objects.nonNull(byteBuffers[currentResponseIndex])) {
- return byteBuffers[currentResponseIndex];
- }
-
- return byteBuffers[currentResponseIndex] =
-
SubscriptionEventBinaryCache.getInstance().serialize(getCurrentResponse());
+ return response.getCurrentResponseByteBuffer();
}
- public void resetResponseByteBuffer(final boolean resetAll) {
- if (resetAll) {
- SubscriptionEventBinaryCache.getInstance()
- .invalidateAll(
-
Arrays.stream(responses).filter(Objects::nonNull).collect(Collectors.toList()));
- // maybe friendly for gc
- Arrays.fill(byteBuffers, null);
- } else {
- if (Objects.nonNull(responses[currentResponseIndex])) {
-
SubscriptionEventBinaryCache.getInstance().invalidate(responses[currentResponseIndex]);
- }
- // maybe friendly for gc
- byteBuffers[currentResponseIndex] = null;
- }
+ public void invalidateCurrentResponseByteBuffer() {
+ response.invalidateCurrentResponseByteBuffer();
}
public int getCurrentResponseSize() throws IOException {
@@ -374,69 +253,24 @@ public class SubscriptionEvent {
return byteBuffer.limit() - byteBuffer.position();
}
- /////////////////////////////// tsfile ///////////////////////////////
-
- private @NonNull SubscriptionPollResponse
generateSubscriptionPollResponseWithPieceOrSealPayload(
- final long writingOffset) throws IOException {
- final File tsFile = pipeEvents.getTsFile();
-
- final long readFileBufferSize =
- SubscriptionConfig.getInstance().getSubscriptionReadFileBufferSize();
- final byte[] readBuffer = new byte[(int) readFileBufferSize];
- try (final RandomAccessFile reader = new RandomAccessFile(tsFile, "r")) {
- while (true) {
- reader.seek(writingOffset);
- final int readLength = reader.read(readBuffer);
- if (readLength == -1) {
- break;
- }
-
- final byte[] filePiece =
- readLength == readFileBufferSize
- ? readBuffer
- : Arrays.copyOfRange(readBuffer, 0, readLength);
-
- // generate subscription poll response with piece payload
- return new SubscriptionPollResponse(
- SubscriptionPollResponseType.FILE_PIECE.getType(),
- new FilePiecePayload(tsFile.getName(), writingOffset + readLength,
filePiece),
- commitContext);
- }
-
- // generate subscription poll response with seal payload
- return new SubscriptionPollResponse(
- SubscriptionPollResponseType.FILE_SEAL.getType(),
- new FileSealPayload(tsFile.getName(), tsFile.length()),
- commitContext);
- }
- }
+ //////////////////////////// tsfile ////////////////////////////
public String getFileName() {
- return pipeEvents.getTsFile().getName();
+ return fileName;
}
- /////////////////////////////// APIs provided for metric framework
///////////////////////////////
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
public int getPipeEventCount() {
return pipeEvents.getPipeEventCount();
}
- /////////////////////////////// object ///////////////////////////////
+ //////////////////////////// object ////////////////////////////
@Override
public String toString() {
- return "SubscriptionEvent{responses="
- + Arrays.toString(responses)
- + ", responses' byte buffer size="
- + Arrays.stream(byteBuffers)
- .map(
- byteBuffer ->
- Objects.isNull(byteBuffer)
- ? "<unknown>"
- : byteBuffer.limit() - byteBuffer.position())
- .collect(Collectors.toList())
- + ", currentResponseIndex="
- + currentResponseIndex
+ return "SubscriptionEvent{response="
+ + response
+ ", lastPolledConsumerId="
+ lastPolledConsumerId
+ ", lastPolledTimestamp="
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
index 758d9b31567..a396ff8ee2a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
@@ -22,17 +22,25 @@ package org.apache.iotdb.db.subscription.event.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
public abstract class SubscriptionPipeEventBatch {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionPipeEventBatch.class);
+
private final int regionId;
protected final SubscriptionPrefetchingQueue prefetchingQueue;
@@ -40,6 +48,7 @@ public abstract class SubscriptionPipeEventBatch {
protected final long maxBatchSizeInBytes;
protected volatile List<SubscriptionEvent> events = null;
+ protected final List<EnrichedEvent> enrichedEvents = new ArrayList<>();
protected SubscriptionPipeEventBatch(
final int regionId,
@@ -52,27 +61,60 @@ public abstract class SubscriptionPipeEventBatch {
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
}
+ /////////////////////////////// ack & clean ///////////////////////////////
+
+ public abstract void ack();
+
+ public abstract void cleanUp();
+
+ /////////////////////////////// APIs ///////////////////////////////
+
/**
* @return {@code true} if there are subscription events consumed.
*/
- public abstract boolean onEvent(final Consumer<SubscriptionEvent> consumer)
throws Exception;
+ protected synchronized boolean onEvent(final Consumer<SubscriptionEvent>
consumer)
+ throws Exception {
+ if (shouldEmit() && !enrichedEvents.isEmpty()) {
+ if (Objects.isNull(events)) {
+ events = generateSubscriptionEvents();
+ }
+ if (Objects.nonNull(events)) {
+ events.forEach(consumer);
+ return true;
+ }
+ return false;
+ }
+ return false;
+ }
/**
* @return {@code true} if there are subscription events consumed.
*/
- public abstract boolean onEvent(
+ protected synchronized boolean onEvent(
final @NonNull EnrichedEvent event, final Consumer<SubscriptionEvent>
consumer)
- throws Exception;
+ throws Exception {
+ if (event instanceof TabletInsertionEvent) {
+ onTabletInsertionEvent((TabletInsertionEvent) event); // no exceptions
will be thrown
+ enrichedEvents.add(event);
+ } else if (event instanceof TsFileInsertionEvent) {
+ onTsFileInsertionEvent((TsFileInsertionEvent) event);
+ enrichedEvents.add(event);
+ } else {
+ LOGGER.warn(
+ "SubscriptionPipeEventBatch {} ignore EnrichedEvent {} when
batching.", this, event);
+ }
+ return onEvent(consumer);
+ }
- public abstract void cleanUp();
+ /////////////////////////////// utility ///////////////////////////////
- public int getRegionId() {
- return regionId;
- }
+ protected abstract void onTabletInsertionEvent(final TabletInsertionEvent
event) throws Exception;
- public boolean isSealed() {
- return Objects.nonNull(events);
- }
+ protected abstract void onTsFileInsertionEvent(final TsFileInsertionEvent
event) throws Exception;
+
+ protected abstract boolean shouldEmit();
+
+ protected abstract List<SubscriptionEvent> generateSubscriptionEvents()
throws Exception;
/////////////////////////////// stringify ///////////////////////////////
@@ -82,6 +124,28 @@ public abstract class SubscriptionPipeEventBatch {
result.put("prefetchingQueue",
prefetchingQueue.coreReportMessage().toString());
result.put("maxDelayInMs", String.valueOf(maxDelayInMs));
result.put("maxBatchSizeInBytes", String.valueOf(maxBatchSizeInBytes));
+ // TODO: stringify subscription events?
+ result.put("enrichedEvents", formatEnrichedEvents(enrichedEvents, 4));
return result;
}
+
+ private static String formatEnrichedEvents(
+ final List<EnrichedEvent> enrichedEvents, final int threshold) {
+ final List<String> eventMessageList =
+ enrichedEvents.stream()
+ .limit(threshold)
+ .map(EnrichedEvent::coreReportMessage)
+ .collect(Collectors.toList());
+ if (eventMessageList.size() > threshold) {
+ eventMessageList.add(
+ String.format("omit the remaining %s event(s)...",
eventMessageList.size() - threshold));
+ }
+ return eventMessageList.toString();
+ }
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ public int getPipeEventCount() {
+ return enrichedEvents.size();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index 871b1c12587..b54607c868b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -20,43 +20,31 @@
package org.apache.iotdb.db.subscription.event.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTabletQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
-import
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTabletBatchEvents;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
-import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
-import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
-import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.tsfile.write.record.Tablet;
-import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionPipeTabletEventBatch.class);
- private static final long READ_TABLET_BUFFER_SIZE =
- SubscriptionConfig.getInstance().getSubscriptionReadTabletBufferSize();
-
- private final List<EnrichedEvent> enrichedEvents = new ArrayList<>();
- private final List<Tablet> tablets = new ArrayList<>();
-
+ private volatile List<Tablet> tablets = new LinkedList<>();
private long firstEventProcessingTime = Long.MIN_VALUE;
private long totalBufferSize = 0;
@@ -68,28 +56,36 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
super(regionId, prefetchingQueue, maxDelayInMs, maxBatchSizeInBytes);
}
- @Override
- public synchronized boolean onEvent(final Consumer<SubscriptionEvent>
consumer) {
- if (shouldEmit() && !enrichedEvents.isEmpty()) {
- if (Objects.isNull(events)) {
- events = generateSubscriptionEvents();
+ public LinkedList<Tablet> moveTablets() {
+ if (Objects.isNull(tablets)) {
+ tablets = new ArrayList<>();
+ for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+ if (enrichedEvent instanceof TsFileInsertionEvent) {
+ onTsFileInsertionEvent((TsFileInsertionEvent) enrichedEvent);
+ } else if (enrichedEvent instanceof TabletInsertionEvent) {
+ onTabletInsertionEvent((TabletInsertionEvent) enrichedEvent);
+ } else {
+ LOGGER.warn(
+ "SubscriptionPipeTabletEventBatch {} ignore EnrichedEvent {}
when moving.",
+ this,
+ enrichedEvent);
+ }
}
- if (Objects.nonNull(events)) {
- events.forEach(consumer);
- return true;
- }
- return false;
}
- return false;
+ final LinkedList<Tablet> result = new LinkedList<>(tablets);
+ firstEventProcessingTime = Long.MIN_VALUE;
+ totalBufferSize = 0;
+ tablets = null; // reset to null for gc & subsequent move
+ return result;
}
+ /////////////////////////////// ack & clean ///////////////////////////////
+
@Override
- public synchronized boolean onEvent(
- final @NonNull EnrichedEvent event, final Consumer<SubscriptionEvent>
consumer) {
- if (event instanceof TabletInsertionEvent) {
- onEventInternal((TabletInsertionEvent) event); // no exceptions will be
thrown
+ public synchronized void ack() {
+ for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+ enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
}
- return onEvent(consumer);
}
@Override
@@ -99,65 +95,42 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
enrichedEvent.clearReferenceCount(this.getClass().getName());
}
enrichedEvents.clear();
- tablets.clear();
+ if (Objects.nonNull(tablets)) {
+ tablets.clear();
+ }
}
- public synchronized void ack() {
- for (final EnrichedEvent enrichedEvent : enrichedEvents) {
- enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
+ /////////////////////////////// utility ///////////////////////////////
+
+ @Override
+ protected void onTabletInsertionEvent(final TabletInsertionEvent event) {
+ constructBatch(event);
+ if (firstEventProcessingTime == Long.MIN_VALUE) {
+ firstEventProcessingTime = System.currentTimeMillis();
}
}
- /////////////////////////////// utility ///////////////////////////////
+ @Override
+ protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
+ for (final TabletInsertionEvent tabletInsertionEvent :
event.toTabletInsertionEvents()) {
+ onTabletInsertionEvent(tabletInsertionEvent);
+ }
+ }
- private List<SubscriptionEvent> generateSubscriptionEvents() {
+ @Override
+ protected List<SubscriptionEvent> generateSubscriptionEvents() {
if (tablets.isEmpty()) {
return null;
}
- final SubscriptionCommitContext commitContext =
- prefetchingQueue.generateSubscriptionCommitContext();
- final List<SubscriptionPollResponse> responses = new ArrayList<>();
- final List<Tablet> currentTablets = new ArrayList<>();
- long currentTotalBufferSize = 0;
- for (final Tablet tablet : tablets) {
- final long bufferSize =
PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
- if (bufferSize > READ_TABLET_BUFFER_SIZE) {
- LOGGER.warn("Detect large tablet with {} byte(s).", bufferSize);
- responses.add(
- new SubscriptionPollResponse(
- SubscriptionPollResponseType.TABLETS.getType(),
- new TabletsPayload(Collections.singletonList(tablet),
responses.size() + 1),
- commitContext));
- continue;
- }
- if (currentTotalBufferSize + bufferSize > READ_TABLET_BUFFER_SIZE) {
- responses.add(
- new SubscriptionPollResponse(
- SubscriptionPollResponseType.TABLETS.getType(),
- new TabletsPayload(new ArrayList<>(currentTablets),
responses.size() + 1),
- commitContext));
- currentTablets.clear();
- currentTotalBufferSize = 0;
- }
- currentTablets.add(tablet);
- currentTotalBufferSize += bufferSize;
- }
- responses.add(
- new SubscriptionPollResponse(
- SubscriptionPollResponseType.TABLETS.getType(),
- new TabletsPayload(new ArrayList<>(currentTablets),
-tablets.size()),
- commitContext));
return Collections.singletonList(
- new SubscriptionEvent(new SubscriptionPipeTabletBatchEvents(this),
responses));
+ new SubscriptionEvent(this,
prefetchingQueue.generateSubscriptionCommitContext()));
}
- private void onEventInternal(final TabletInsertionEvent event) {
- constructBatch(event);
- enrichedEvents.add((EnrichedEvent) event);
- if (firstEventProcessingTime == Long.MIN_VALUE) {
- firstEventProcessingTime = System.currentTimeMillis();
- }
+ @Override
+ protected boolean shouldEmit() {
+ return totalBufferSize >= maxBatchSizeInBytes
+ || System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
}
private void constructBatch(final TabletInsertionEvent event) {
@@ -173,11 +146,6 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
.orElse(0L);
}
- private boolean shouldEmit() {
- return totalBufferSize >= maxBatchSizeInBytes
- || System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
- }
-
private List<Tablet> convertToTablets(final TabletInsertionEvent
tabletInsertionEvent) {
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
return ((PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent).convertToTablets();
@@ -203,30 +171,11 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
@Override
protected Map<String, String> coreReportMessage() {
final Map<String, String> coreReportMessage = super.coreReportMessage();
- coreReportMessage.put("enrichedEvents",
formatEnrichedEvents(enrichedEvents, 4));
- coreReportMessage.put("size of tablets", String.valueOf(tablets.size()));
+ coreReportMessage.put(
+ "size of tablets",
+ (Objects.nonNull(tablets) ? String.valueOf(tablets.size()) :
"<unknown>"));
coreReportMessage.put("firstEventProcessingTime",
String.valueOf(firstEventProcessingTime));
coreReportMessage.put("totalBufferSize", String.valueOf(totalBufferSize));
return coreReportMessage;
}
-
- private static String formatEnrichedEvents(
- final List<EnrichedEvent> enrichedEvents, final int threshold) {
- final List<String> eventMessageList =
- enrichedEvents.stream()
- .limit(threshold)
- .map(EnrichedEvent::coreReportMessage)
- .collect(Collectors.toList());
- if (eventMessageList.size() > threshold) {
- eventMessageList.add(
- String.format("omit the remaining %s event(s)...",
eventMessageList.size() - threshold));
- }
- return eventMessageList.toString();
- }
-
- //////////////////////////// APIs provided for metric framework
////////////////////////////
-
- public int getPipeEventCount() {
- return enrichedEvents.size();
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index e38887e19f2..dc96fc476da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -25,25 +25,24 @@ import
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueu
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
-import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
-import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
-import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(SubscriptionPipeTsFileEventBatch.class);
+
private final PipeTabletEventTsFileBatch batch;
- private final List<EnrichedEvent> enrichedEvents;
public SubscriptionPipeTsFileEventBatch(
final int regionId,
@@ -52,36 +51,11 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
final long maxBatchSizeInBytes) {
super(regionId, prefetchingQueue, maxDelayInMs, maxBatchSizeInBytes);
this.batch = new PipeTabletEventTsFileBatch(maxDelayInMs,
maxBatchSizeInBytes);
- this.enrichedEvents = new ArrayList<>();
- }
-
- @Override
- public synchronized boolean onEvent(final Consumer<SubscriptionEvent>
consumer) throws Exception {
- if (batch.shouldEmit() && !enrichedEvents.isEmpty()) {
- if (Objects.isNull(events)) {
- events = generateSubscriptionEvents();
- }
- if (Objects.nonNull(events)) {
- events.forEach(consumer);
- return true;
- }
- return false;
- }
- return false;
}
@Override
- public synchronized boolean onEvent(
- final @NonNull EnrichedEvent event, final Consumer<SubscriptionEvent>
consumer)
- throws Exception {
- if (event instanceof TabletInsertionEvent) {
- batch.onEvent((TabletInsertionEvent) event); // no exceptions will be
thrown
- enrichedEvents.add(event);
- event.decreaseReferenceCount(
- SubscriptionPipeTsFileEventBatch.class.getName(),
- false); // missing releaseLastEvent decreases reference count
- }
- return onEvent(consumer);
+ public synchronized void ack() {
+ batch.decreaseEventsReferenceCount(this.getClass().getName(), true);
}
@Override
@@ -91,13 +65,27 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
enrichedEvents.clear();
}
- public synchronized void ack() {
- batch.decreaseEventsReferenceCount(this.getClass().getName(), true);
+ /////////////////////////////// utility ///////////////////////////////
+
+ @Override
+ protected void onTabletInsertionEvent(final TabletInsertionEvent event)
throws Exception {
+ batch.onEvent(event); // no exceptions will be thrown
+ ((EnrichedEvent) event)
+ .decreaseReferenceCount(
+ SubscriptionPipeTsFileEventBatch.class.getName(),
+ false); // missing releaseLastEvent decreases reference count
}
- /////////////////////////////// utility ///////////////////////////////
+ @Override
+ protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
+ LOGGER.warn(
+ "SubscriptionPipeTsFileEventBatch {} ignore TsFileInsertionEvent {}
when batching.",
+ this,
+ event);
+ }
- private List<SubscriptionEvent> generateSubscriptionEvents() throws
Exception {
+ @Override
+ protected List<SubscriptionEvent> generateSubscriptionEvents() throws
Exception {
if (batch.isEmpty()) {
return null;
}
@@ -110,15 +98,16 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
prefetchingQueue.generateSubscriptionCommitContext();
events.add(
new SubscriptionEvent(
- new SubscriptionPipeTsFileBatchEvents(this, tsFile,
referenceCount),
- new SubscriptionPollResponse(
- SubscriptionPollResponseType.FILE_INIT.getType(),
- new FileInitPayload(tsFile.getName()),
- commitContext)));
+ new SubscriptionPipeTsFileBatchEvents(this, referenceCount),
tsFile, commitContext));
}
return events;
}
+ @Override
+ protected boolean shouldEmit() {
+ return batch.shouldEmit();
+ }
+
/////////////////////////////// stringify ///////////////////////////////
@Override
@@ -132,10 +121,4 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
coreReportMessage.put("batch", batch.toString());
return coreReportMessage;
}
-
- //////////////////////////// APIs provided for metric framework
////////////////////////////
-
- public int getPipeEventCount() {
- return enrichedEvents.size();
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java
new file mode 100644
index 00000000000..54dbcceaccb
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.event.cache;
+
+import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+public class CachedSubscriptionPollResponse extends SubscriptionPollResponse {
+
+ private volatile ByteBuffer byteBuffer; // cached serialized response
+
+ public CachedSubscriptionPollResponse(
+ final short responseType,
+ final SubscriptionPollPayload payload,
+ final SubscriptionCommitContext commitContext) {
+ super(responseType, payload, commitContext);
+ }
+
+ public CachedSubscriptionPollResponse(final SubscriptionPollResponse
response) {
+ super(response.getResponseType(), response.getPayload(),
response.getCommitContext());
+ }
+
+ public ByteBuffer getByteBuffer() {
+ return byteBuffer;
+ }
+
+ public void invalidateByteBuffer() {
+ // maybe friendly for gc
+ byteBuffer = null;
+ }
+
+ public static ByteBuffer serialize(final CachedSubscriptionPollResponse
response)
+ throws IOException {
+ return response.serialize();
+ }
+
+ private ByteBuffer serialize() throws IOException {
+ return Objects.nonNull(byteBuffer)
+ ? byteBuffer
+ : (byteBuffer = SubscriptionPollResponse.serialize(this));
+ }
+
+ /////////////////////////////// stringify ///////////////////////////////
+
+ @Override
+ public String toString() {
+ return "CachedSubscriptionPollResponse" + coreReportMessage();
+ }
+
+ @Override
+ protected Map<String, String> coreReportMessage() {
+ final Map<String, String> coreReportMessage = super.coreReportMessage();
+ coreReportMessage.put(
+ "sizeof(byteBuffer)",
+ Objects.isNull(byteBuffer)
+ ? "<unknown>"
+ : String.valueOf(byteBuffer.limit() - byteBuffer.position()));
+ return coreReportMessage;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
similarity index 78%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
index 6e62a819c69..02a634c9c19 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.subscription.event;
+package org.apache.iotdb.db.subscription.event.cache;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -35,65 +35,66 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
-/** This class is used to cache {@link SubscriptionPollResponse} in {@link
SubscriptionEvent}. */
-class SubscriptionEventBinaryCache {
+/**
+ * This class is used to control memory usage of cache {@link
SubscriptionPollResponse} in {@link
+ * CachedSubscriptionPollResponse}.
+ */
+public class SubscriptionPollResponseCache {
- private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionEventBinaryCache.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionPollResponseCache.class);
private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
- private final LoadingCache<SubscriptionPollResponse, ByteBuffer> cache;
+ private final LoadingCache<CachedSubscriptionPollResponse, ByteBuffer> cache;
- ByteBuffer serialize(final SubscriptionPollResponse response) throws
IOException {
+ public ByteBuffer serialize(final CachedSubscriptionPollResponse response)
throws IOException {
try {
return this.cache.get(response);
} catch (final Exception e) {
LOGGER.warn(
- "SubscriptionEventBinaryCache raised an exception while serializing
SubscriptionPollResponse: {}",
+ "SubscriptionEventBinaryCache raised an exception while serializing
CachedSubscriptionPollResponse: {}",
response,
e);
throw new IOException(e);
}
}
- Optional<ByteBuffer> trySerialize(final SubscriptionPollResponse response) {
+ public Optional<ByteBuffer> trySerialize(final
CachedSubscriptionPollResponse response) {
try {
return Optional.of(serialize(response));
} catch (final IOException e) {
LOGGER.warn(
- "Subscription: something unexpected happened when serializing
SubscriptionPollResponse: {}",
+ "Subscription: something unexpected happened when serializing
CachedSubscriptionPollResponse: {}",
response,
e);
return Optional.empty();
}
}
- void invalidate(final SubscriptionPollResponse response) {
+ public void invalidate(final CachedSubscriptionPollResponse response) {
this.cache.invalidate(response);
- }
-
- void invalidateAll(final Iterable<SubscriptionPollResponse> responses) {
- this.cache.invalidateAll(responses);
+ response.invalidateByteBuffer();
}
//////////////////////////// singleton ////////////////////////////
private static class SubscriptionEventBinaryCacheHolder {
- private static final SubscriptionEventBinaryCache INSTANCE = new
SubscriptionEventBinaryCache();
+ private static final SubscriptionPollResponseCache INSTANCE =
+ new SubscriptionPollResponseCache();
private SubscriptionEventBinaryCacheHolder() {
// empty constructor
}
}
- static SubscriptionEventBinaryCache getInstance() {
- return
SubscriptionEventBinaryCache.SubscriptionEventBinaryCacheHolder.INSTANCE;
+ public static SubscriptionPollResponseCache getInstance() {
+ return
SubscriptionPollResponseCache.SubscriptionEventBinaryCacheHolder.INSTANCE;
}
- private SubscriptionEventBinaryCache() {
+ private SubscriptionPollResponseCache() {
final long initMemorySizeInBytes =
- PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() / 20;
+ PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() / 5;
final long maxMemorySizeInBytes =
(long)
(PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
@@ -129,13 +130,13 @@ class SubscriptionEventBinaryCache {
Caffeine.newBuilder()
.maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
.weigher(
- (Weigher<SubscriptionPollResponse, ByteBuffer>)
+ (Weigher<CachedSubscriptionPollResponse, ByteBuffer>)
(message, buffer) -> {
// TODO: overflow
return (int) (buffer.capacity() *
memoryUsageCheatFactor.get());
})
.recordStats() // TODO: metrics
// NOTE: lambda CAN NOT be replaced with method reference
- .build(response -> SubscriptionPollResponse.serialize(response));
+ .build(response ->
CachedSubscriptionPollResponse.serialize(response));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
index 3e74e03d8f4..ef6cc39a60c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
@@ -19,15 +19,8 @@
package org.apache.iotdb.db.subscription.event.pipe;
-import java.io.File;
-
public class SubscriptionPipeEmptyEvent implements SubscriptionPipeEvents {
- @Override
- public File getTsFile() {
- return null;
- }
-
@Override
public void ack() {}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
index 489c4cf8120..05f011699ec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
@@ -19,15 +19,8 @@
package org.apache.iotdb.db.subscription.event.pipe;
-import java.io.File;
-
public interface SubscriptionPipeEvents {
- /**
- * @return {@code null} if the pipe events do not contain the corresponding
tsfile.
- */
- File getTsFile();
-
void ack();
void cleanUp();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
index 226367405eb..f518c7b0194 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.subscription.event.pipe;
import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
-import java.io.File;
-
public class SubscriptionPipeTabletBatchEvents implements
SubscriptionPipeEvents {
private final SubscriptionPipeTabletEventBatch batch;
@@ -31,11 +29,6 @@ public class SubscriptionPipeTabletBatchEvents implements
SubscriptionPipeEvents
this.batch = batch;
}
- @Override
- public File getTsFile() {
- return null;
- }
-
@Override
public void ack() {
batch.ack();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
index 5cae21f5ac8..16151a16a50 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
@@ -21,31 +21,21 @@ package org.apache.iotdb.db.subscription.event.pipe;
import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTsFileEventBatch;
-import java.io.File;
import java.util.concurrent.atomic.AtomicInteger;
public class SubscriptionPipeTsFileBatchEvents implements
SubscriptionPipeEvents {
private final SubscriptionPipeTsFileEventBatch batch;
- private final File tsFile;
private final AtomicInteger referenceCount; // shared between the same batch
private final int count; // snapshot the initial reference count, used for
event count calculation
public SubscriptionPipeTsFileBatchEvents(
- final SubscriptionPipeTsFileEventBatch batch,
- final File tsFile,
- final AtomicInteger referenceCount) {
+ final SubscriptionPipeTsFileEventBatch batch, final AtomicInteger
referenceCount) {
this.batch = batch;
- this.tsFile = tsFile;
this.referenceCount = referenceCount;
this.count = Math.max(1, referenceCount.get());
}
- @Override
- public File getTsFile() {
- return tsFile;
- }
-
@Override
public void ack() {
if (referenceCount.decrementAndGet() == 0) {
@@ -66,8 +56,6 @@ public class SubscriptionPipeTsFileBatchEvents implements
SubscriptionPipeEvents
public String toString() {
return "SubscriptionPipeTsFileBatchEvents{batch="
+ batch
- + ", tsFile="
- + tsFile
+ ", referenceCount="
+ referenceCount
+ "}";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
index 111006fa6d3..8effb654d73 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.subscription.event.pipe;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
-import java.io.File;
-
public class SubscriptionPipeTsFilePlainEvent implements
SubscriptionPipeEvents {
private final PipeTsFileInsertionEvent tsFileInsertionEvent;
@@ -31,11 +29,6 @@ public class SubscriptionPipeTsFilePlainEvent implements
SubscriptionPipeEvents
this.tsFileInsertionEvent = tsFileInsertionEvent;
}
- @Override
- public File getTsFile() {
- return tsFileInsertionEvent.getTsFile();
- }
-
@Override
public void ack() {
tsFileInsertionEvent.decreaseReferenceCount(this.getClass().getName(),
true);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
new file mode 100644
index 00000000000..07dfaabdf4f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.event.response;
+
+import
org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
+import
org.apache.iotdb.db.subscription.event.cache.SubscriptionPollResponseCache;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+/**
+ * The {@code SubscriptionEventExtendableResponse} class represents a
subscription event response
+ * that can dynamically change as new responses are fetched. It maintains a
list of {@link
+ * CachedSubscriptionPollResponse} objects and provides methods for managing
and serializing these
+ * responses.
+ */
+public abstract class SubscriptionEventExtendableResponse
+ implements SubscriptionEventResponse<CachedSubscriptionPollResponse> {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(SubscriptionEventTabletResponse.class);
+
+ private final Deque<CachedSubscriptionPollResponse> responses;
+ protected volatile boolean hasNoMore = false;
+
+ protected SubscriptionEventExtendableResponse() {
+ this.responses = new ConcurrentLinkedDeque<>();
+ }
+
+ @Override
+ public CachedSubscriptionPollResponse getCurrentResponse() {
+ return peekFirst();
+ }
+
+ @Override
+ public void fetchNextResponse() throws IOException {
+ prefetchRemainingResponses();
+ if (Objects.isNull(poll())) {
+ LOGGER.warn(
+ "SubscriptionEventExtendableResponse {} is empty when fetching next
response (broken invariant)",
+ this);
+ }
+ }
+
+ @Override
+ public void trySerializeCurrentResponse() {
+
SubscriptionPollResponseCache.getInstance().trySerialize(getCurrentResponse());
+ }
+
+ @Override
+ public void trySerializeRemainingResponses() {
+ responses.stream()
+ .skip(1)
+ .filter(response -> Objects.isNull(response.getByteBuffer()))
+ .findFirst()
+ .ifPresent(response ->
SubscriptionPollResponseCache.getInstance().trySerialize(response));
+ }
+
+ @Override
+ public ByteBuffer getCurrentResponseByteBuffer() throws IOException {
+ return
SubscriptionPollResponseCache.getInstance().serialize(getCurrentResponse());
+ }
+
+ @Override
+ public void invalidateCurrentResponseByteBuffer() {
+
SubscriptionPollResponseCache.getInstance().invalidate(getCurrentResponse());
+ }
+
+ @Override
+ public void cleanUp() {
+ CachedSubscriptionPollResponse response;
+ while (Objects.nonNull(response = poll())) {
+ SubscriptionPollResponseCache.getInstance().invalidate(response);
+ }
+
+ hasNoMore = false;
+ }
+
+ @Override
+ public boolean isCommittable() {
+ return hasNoMore && size() == 1;
+ }
+
+ /////////////////////////////// utility ///////////////////////////////
+
+ protected void offer(final CachedSubscriptionPollResponse response) {
+ responses.addLast(response);
+ }
+
+ protected CachedSubscriptionPollResponse poll() {
+ return responses.isEmpty() ? null : responses.removeFirst();
+ }
+
+ protected CachedSubscriptionPollResponse peekFirst() {
+ return responses.isEmpty() ? null : responses.getFirst();
+ }
+
+ protected CachedSubscriptionPollResponse peekLast() {
+ return responses.isEmpty() ? null : responses.getLast();
+ }
+
+ protected int size() {
+ return responses.size();
+ }
+
+ protected boolean isEmpty() {
+ return responses.isEmpty();
+ }
+
+ /////////////////////////////// stringify ///////////////////////////////
+
+ @Override
+ public String toString() {
+ return "SubscriptionEventExtendableResponse" + coreReportMessage();
+ }
+
+ protected Map<String, String> coreReportMessage() {
+ final Map<String, String> result = new HashMap<>();
+ final CachedSubscriptionPollResponse currentResponse =
getCurrentResponse();
+ result.put(
+ "currentResponse",
+ Objects.nonNull(currentResponse) ? currentResponse.toString() :
"<unknown>");
+ result.put("hasNoMore", String.valueOf(hasNoMore));
+ return result;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
similarity index 52%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
index 489c4cf8120..211f0905afa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
@@ -17,22 +17,36 @@
* under the License.
*/
-package org.apache.iotdb.db.subscription.event.pipe;
+package org.apache.iotdb.db.subscription.event.response;
-import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
-public interface SubscriptionPipeEvents {
+public interface SubscriptionEventResponse<E> {
- /**
- * @return {@code null} if the pipe events do not contain the corresponding
tsfile.
- */
- File getTsFile();
+ /////////////////////////////// response ///////////////////////////////
- void ack();
+ E getCurrentResponse();
- void cleanUp();
+ void prefetchRemainingResponses() throws IOException;
+
+ void fetchNextResponse() throws IOException;
+
+ /////////////////////////////// byte buffer ///////////////////////////////
+
+ void trySerializeCurrentResponse();
+
+ void trySerializeRemainingResponses();
- //////////////////////////// APIs provided for metric framework
////////////////////////////
+ ByteBuffer getCurrentResponseByteBuffer() throws IOException;
+
+ void invalidateCurrentResponseByteBuffer();
+
+ /////////////////////////////// lifecycle ///////////////////////////////
+
+ void nack();
+
+ void cleanUp();
- int getPipeEventCount();
+ boolean isCommittable();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java
new file mode 100644
index 00000000000..dbc48ebda00
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.event.response;
+
+import
org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
+import
org.apache.iotdb.db.subscription.event.cache.SubscriptionPollResponseCache;
+import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
+import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
+import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The {@link SubscriptionEventSingleResponse} class represents a single
subscription event response
+ * that wraps a cached {@link SubscriptionPollResponse}. The actual payload of
the response can be
+ * either a {@link TerminationPayload} or an {@link ErrorPayload}.
+ */
+public class SubscriptionEventSingleResponse
+ implements SubscriptionEventResponse<CachedSubscriptionPollResponse> {
+
+ private final CachedSubscriptionPollResponse response;
+
+ public SubscriptionEventSingleResponse(
+ final short responseType,
+ final SubscriptionPollPayload payload,
+ final SubscriptionCommitContext commitContext) {
+ this.response = new CachedSubscriptionPollResponse(responseType, payload,
commitContext);
+ }
+
+ public SubscriptionEventSingleResponse(final SubscriptionPollResponse
response) {
+ this.response = new CachedSubscriptionPollResponse(response);
+ }
+
+ @Override
+ public CachedSubscriptionPollResponse getCurrentResponse() {
+ return response;
+ }
+
+ @Override
+ public void prefetchRemainingResponses() {
+ // do nothing
+ }
+
+ @Override
+ public void fetchNextResponse() {
+ // do nothing
+ }
+
+ @Override
+ public void trySerializeCurrentResponse() {
+ SubscriptionPollResponseCache.getInstance().trySerialize(response);
+ }
+
+ @Override
+ public void trySerializeRemainingResponses() {
+ // do nothing
+ }
+
+ @Override
+ public ByteBuffer getCurrentResponseByteBuffer() throws IOException {
+ return SubscriptionPollResponseCache.getInstance().serialize(response);
+ }
+
+ @Override
+ public void invalidateCurrentResponseByteBuffer() {
+ SubscriptionPollResponseCache.getInstance().invalidate(response);
+ }
+
+ @Override
+ public void nack() {
+ invalidateCurrentResponseByteBuffer();
+ }
+
+ @Override
+ public void cleanUp() {
+ invalidateCurrentResponseByteBuffer();
+ }
+
+ @Override
+ public boolean isCommittable() {
+ return true;
+ }
+
+ /////////////////////////////// stringify ///////////////////////////////
+
+ @Override
+ public String toString() {
+ return "SubscriptionEventSingleResponse" + coreReportMessage();
+ }
+
+ protected Map<String, String> coreReportMessage() {
+ final Map<String, String> result = new HashMap<>();
+ final CachedSubscriptionPollResponse currentResponse =
getCurrentResponse();
+ result.put(
+ "currentResponse",
+ Objects.nonNull(currentResponse) ? currentResponse.toString() :
"<unknown>");
+ return result;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
new file mode 100644
index 00000000000..4fd13517891
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.event.response;
+
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
+import
org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
+import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
+import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
+
+import org.apache.tsfile.write.record.Tablet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * The {@code SubscriptionEventTabletResponse} class extends {@link
+ * SubscriptionEventExtendableResponse} to handle subscription responses
specifically for tablet
+ * data. The actual payload of the response includes a {@link TabletsPayload},
which contains the
+ * tablet information being processed.
+ */
+public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendableResponse {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(SubscriptionEventTabletResponse.class);
+
+ private static final long READ_TABLET_BUFFER_SIZE =
+ SubscriptionConfig.getInstance().getSubscriptionReadTabletBufferSize();
+
+ private final SubscriptionPipeTabletEventBatch batch;
+ private final SubscriptionCommitContext commitContext;
+
+ private volatile LinkedList<Tablet> tablets;
+ private volatile int tabletsSize;
+ private final AtomicInteger nextOffset = new AtomicInteger(0);
+
+ public SubscriptionEventTabletResponse(
+ final SubscriptionPipeTabletEventBatch batch, final
SubscriptionCommitContext commitContext) {
+ this.batch = batch;
+ this.commitContext = commitContext;
+
+ init(batch);
+ }
+
+ @Override
+ public void prefetchRemainingResponses() {
+ if (hasNoMore) {
+ return;
+ }
+
+ offer(generateNextTabletResponse());
+ }
+
+ @Override
+ public void nack() {
+ cleanUp();
+ init(batch);
+ }
+
+ @Override
+ public void cleanUp() {
+ super.cleanUp();
+
+ tablets = null;
+ tabletsSize = 0;
+ nextOffset.set(0);
+ }
+
+ /////////////////////////////// utility ///////////////////////////////
+
+ private void init(final SubscriptionPipeTabletEventBatch batch) {
+ if (!isEmpty()) {
+ LOGGER.warn(
+ "SubscriptionEventTabletResponse {} is not empty when initializing
(broken invariant)",
+ this);
+ return;
+ }
+
+ tablets = batch.moveTablets();
+ tabletsSize = tablets.size();
+ offer(generateNextTabletResponse());
+ }
+
+ private synchronized CachedSubscriptionPollResponse
generateNextTabletResponse() {
+ final List<Tablet> currentTablets = new ArrayList<>();
+ final AtomicLong currentTotalBufferSize = new AtomicLong();
+
+ Tablet currentTablet;
+ while (!tablets.isEmpty() && Objects.nonNull(currentTablet =
tablets.removeFirst())) {
+ final long bufferSize =
PipeMemoryWeightUtil.calculateTabletSizeInBytes(currentTablet);
+ if (bufferSize > READ_TABLET_BUFFER_SIZE) {
+ LOGGER.warn("Detect large tablet with {} byte(s).", bufferSize);
+ tablets.addAll(currentTablets); // re-enqueue previous tablets
+ currentTablets.clear();
+ currentTotalBufferSize.set(0);
+ return new CachedSubscriptionPollResponse(
+ SubscriptionPollResponseType.TABLETS.getType(),
+ new TabletsPayload(
+ Collections.singletonList(currentTablet),
nextOffset.incrementAndGet()),
+ commitContext);
+ }
+ if (currentTotalBufferSize.get() + bufferSize > READ_TABLET_BUFFER_SIZE)
{
+ final CachedSubscriptionPollResponse response =
+ new CachedSubscriptionPollResponse(
+ SubscriptionPollResponseType.TABLETS.getType(),
+ new TabletsPayload(new ArrayList<>(currentTablets),
nextOffset.incrementAndGet()),
+ commitContext);
+ tablets.add(currentTablet); // re-enqueue current tablet
+ currentTablets.clear();
+ currentTotalBufferSize.set(0);
+ return response;
+ }
+ currentTablets.add(currentTablet);
+ currentTotalBufferSize.addAndGet(bufferSize);
+ }
+
+ final CachedSubscriptionPollResponse response;
+ if (currentTablets.isEmpty()) {
+ response =
+ new CachedSubscriptionPollResponse(
+ SubscriptionPollResponseType.TABLETS.getType(),
+ new TabletsPayload(Collections.emptyList(), -tabletsSize),
+ commitContext);
+ hasNoMore = true;
+ } else {
+ response =
+ new CachedSubscriptionPollResponse(
+ SubscriptionPollResponseType.TABLETS.getType(),
+ new TabletsPayload(new ArrayList<>(currentTablets),
nextOffset.incrementAndGet()),
+ commitContext);
+ }
+ currentTablets.clear();
+ currentTotalBufferSize.set(0);
+ return response;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
new file mode 100644
index 00000000000..01b6611dd06
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.event.response;
+
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import
org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
+import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload;
+import org.apache.iotdb.rpc.subscription.payload.poll.FilePiecePayload;
+import org.apache.iotdb.rpc.subscription.payload.poll.FileSealPayload;
+import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
+import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
+
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * The {@code SubscriptionEventTsFileResponse} class extends {@link
+ * SubscriptionEventExtendableResponse} to manage subscription responses
related to time series
+ * files. The actual payload can include {@link FileInitPayload}, {@link
FilePiecePayload}, and
+ * {@link FileSealPayload}, allowing for detailed control over file data
streaming.
+ */
+public class SubscriptionEventTsFileResponse extends
SubscriptionEventExtendableResponse {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(SubscriptionEventTsFileResponse.class);
+
+ private final File tsFile;
+ private final SubscriptionCommitContext commitContext;
+
+ public SubscriptionEventTsFileResponse(
+ final File tsFile, final SubscriptionCommitContext commitContext) {
+ super();
+
+ this.tsFile = tsFile;
+ this.commitContext = commitContext;
+
+ init();
+ }
+
+ @Override
+ public void prefetchRemainingResponses() throws IOException {
+ if (hasNoMore) {
+ return;
+ }
+
+ generateNextTsFileResponse().ifPresent(super::offer);
+ }
+
+ @Override
+ public void nack() {
+ cleanUp();
+ init();
+ }
+
+ @Override
+ public void cleanUp() {
+ super.cleanUp();
+ }
+
+ /////////////////////////////// utility ///////////////////////////////
+
+ private void init() {
+ if (!isEmpty()) {
+ LOGGER.warn(
+ "SubscriptionEventTsFileResponse {} is not empty when initializing
(broken invariant)",
+ this);
+ return;
+ }
+
+ offer(
+ new CachedSubscriptionPollResponse(
+ SubscriptionPollResponseType.FILE_INIT.getType(),
+ new FileInitPayload(tsFile.getName()),
+ commitContext));
+ }
+
+ private synchronized Optional<CachedSubscriptionPollResponse>
generateNextTsFileResponse()
+ throws IOException {
+ final SubscriptionPollResponse previousResponse = peekLast();
+ if (Objects.isNull(previousResponse)) {
+ LOGGER.warn(
+ "SubscriptionEventTsFileResponse {} is empty when generating next
response (broken invariant)",
+ this);
+ return Optional.empty();
+ }
+ final short responseType = previousResponse.getResponseType();
+ final SubscriptionPollPayload payload = previousResponse.getPayload();
+ if (!SubscriptionPollResponseType.isValidatedResponseType(responseType)) {
+ LOGGER.warn("unexpected response type: {}", responseType);
+ return Optional.empty();
+ }
+
+ switch (SubscriptionPollResponseType.valueOf(responseType)) {
+ case FILE_INIT:
+ return Optional.of(generateResponseWithPieceOrSealPayload(0));
+ case FILE_PIECE:
+ return Optional.of(
+ generateResponseWithPieceOrSealPayload(
+ ((FilePiecePayload) payload).getNextWritingOffset()));
+ case FILE_SEAL:
+ // not need to prefetch
+ break;
+ default:
+ LOGGER.warn("unexpected message type: {}", responseType);
+ }
+
+ return Optional.empty();
+ }
+
+ private @NonNull CachedSubscriptionPollResponse
generateResponseWithPieceOrSealPayload(
+ final long writingOffset) throws IOException {
+ final long readFileBufferSize =
+ SubscriptionConfig.getInstance().getSubscriptionReadFileBufferSize();
+ final byte[] readBuffer = new byte[(int) readFileBufferSize];
+ try (final RandomAccessFile reader = new RandomAccessFile(tsFile, "r")) {
+ while (true) {
+ reader.seek(writingOffset);
+ final int readLength = reader.read(readBuffer);
+ if (readLength == -1) {
+ break;
+ }
+
+ final byte[] filePiece =
+ readLength == readFileBufferSize
+ ? readBuffer
+ : Arrays.copyOfRange(readBuffer, 0, readLength);
+
+ // generate subscription poll response with piece payload
+ return new CachedSubscriptionPollResponse(
+ SubscriptionPollResponseType.FILE_PIECE.getType(),
+ new FilePiecePayload(tsFile.getName(), writingOffset + readLength,
filePiece),
+ commitContext);
+ }
+
+ // generate subscription poll response with seal payload
+ hasNoMore = true;
+ return new CachedSubscriptionPollResponse(
+ SubscriptionPollResponseType.FILE_SEAL.getType(),
+ new FileSealPayload(tsFile.getName(), tsFile.length()),
+ commitContext);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 97401c73808..d6962d22543 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -421,7 +421,7 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
SubscriptionPrefetchingQueue.generatePrefetchingQueueId(
commitContext.getConsumerGroupId(),
commitContext.getTopicName()),
size);
- event.resetResponseByteBuffer(false);
+ event.invalidateCurrentResponseByteBuffer();
LOGGER.info(
"Subscription: consumer {} poll {} successfully with
request: {}",
consumerConfig,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/SubscriptionStatesTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/SubscriptionStatesTest.java
index 7b4e76fac76..248ceda2b6d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/SubscriptionStatesTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/SubscriptionStatesTest.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.subscription;
import org.apache.iotdb.db.subscription.broker.SubscriptionStates;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
-import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
@@ -53,28 +52,24 @@ public class SubscriptionStatesTest {
final SubscriptionEvent event1 =
new SubscriptionEvent(
- new SubscriptionPipeEmptyEvent(),
new SubscriptionPollResponse(
SubscriptionPollResponseType.TABLETS.getType(),
payload,
new SubscriptionCommitContext(-1, -1, "topic1", "cg1", 0)));
final SubscriptionEvent event2 =
new SubscriptionEvent(
- new SubscriptionPipeEmptyEvent(),
new SubscriptionPollResponse(
SubscriptionPollResponseType.TABLETS.getType(),
payload,
new SubscriptionCommitContext(-1, -1, "topic2", "cg1", 0)));
final SubscriptionEvent event3 =
new SubscriptionEvent(
- new SubscriptionPipeEmptyEvent(),
new SubscriptionPollResponse(
SubscriptionPollResponseType.TABLETS.getType(),
payload,
new SubscriptionCommitContext(-1, -1, "topic3", "cg1", 0)));
final SubscriptionEvent event4 =
new SubscriptionEvent(
- new SubscriptionPipeEmptyEvent(),
new SubscriptionPollResponse(
SubscriptionPollResponseType.TABLETS.getType(),
payload,