This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c6fe5c352b9 Subscription: improve end-to-end latency (#12925)
c6fe5c352b9 is described below
commit c6fe5c352b95883b17d11e45ad7994d0335fbf53
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Jul 16 17:36:41 2024 +0800
Subscription: improve end-to-end latency (#12925)
Specifically includes the following optimization strategies:
1. Each time the consumer polls, it may trigger the `trySealBatch` request
to batch.
2. Shorten the interval parameters for some consumer operations.
3. Optimize `SubscriptionPrefetchingTsFileQueue` lock contention.
---
.../rpc/subscription/config/ConsumerConstant.java | 3 +-
.../consumer/SubscriptionConsumer.java | 2 +-
.../consumer/SubscriptionPushConsumer.java | 6 +-
.../subscription/SubscriptionCoordinator.java | 6 +-
.../broker/SubscriptionPrefetchingQueue.java | 67 +++++---
.../broker/SubscriptionPrefetchingTabletQueue.java | 15 +-
.../broker/SubscriptionPrefetchingTsFileQueue.java | 168 ++++++++++++---------
.../db/subscription/event/SubscriptionEvent.java | 10 +-
.../batch/SubscriptionPipeTabletEventBatch.java | 2 +-
.../batch/SubscriptionPipeTsFileEventBatch.java | 2 +-
10 files changed, 166 insertions(+), 115 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
index d449b2f9741..3693e89c3bf 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
@@ -65,8 +65,7 @@ public class ConsumerConstant {
public static final String CONSUME_LISTENER_KEY = "consume-listener";
public static final String AUTO_POLL_INTERVAL_MS_KEY =
"auto-poll-interval-ms";
- public static final long AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE = 5_000L;
- public static final long AUTO_POLL_INTERVAL_MS_MIN_VALUE = 500L;
+ public static final long AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE = 100L;
public static final String AUTO_POLL_TIMEOUT_MS_KEY = "auto-poll-timeout-ms";
public static final long AUTO_POLL_TIMEOUT_MS_DEFAULT_VALUE = 10_000L;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 633f63ae7a9..ddf8abee931 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -82,7 +82,7 @@ abstract class SubscriptionConsumer implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionConsumer.class);
- private static final long SLEEP_NS = 1_000_000_000L;
+ private static final long SLEEP_NS = 100_000_000L; // 100ms
private final String username;
private final String password;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index e28cf3b7c7e..5143958b89a 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -102,8 +102,7 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
this.ackStrategy = ackStrategy;
this.consumeListener = consumeListener;
- this.autoPollIntervalMs =
- Math.max(autoPollIntervalMs,
ConsumerConstant.AUTO_POLL_INTERVAL_MS_MIN_VALUE);
+ this.autoPollIntervalMs = Math.max(autoPollIntervalMs, 1);
this.autoPollTimeoutMs =
Math.max(autoPollTimeoutMs,
ConsumerConstant.AUTO_POLL_TIMEOUT_MS_MIN_VALUE);
}
@@ -290,8 +289,7 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
}
public Builder autoPollIntervalMs(final long autoPollIntervalMs) {
- this.autoPollIntervalMs =
- Math.max(autoPollIntervalMs,
ConsumerConstant.AUTO_POLL_INTERVAL_MS_MIN_VALUE);
+ this.autoPollIntervalMs = Math.max(autoPollIntervalMs, 1);
return this;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index 2d72f3c6c70..c48f3e0f078 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -51,6 +51,8 @@ public class SubscriptionCoordinator {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionCoordinator.class);
private final ConfigManager configManager;
+
+ // NEVER EXPOSE THIS DIRECTLY TO THE OUTSIDE
private final SubscriptionInfo subscriptionInfo;
private final PipeTaskCoordinatorLock coordinatorLock;
@@ -122,7 +124,9 @@ public class SubscriptionCoordinator {
subscriptionMetaSyncer.stop();
}
- /** Caller should ensure that the method is called in the lock {@link
#tryLock}. */
+ /**
+ * Caller should ensure that the method is called in the write lock of
{@link #subscriptionInfo}.
+ */
public void updateLastSyncedVersion() {
subscriptionInfo.updateLastSyncedVersion();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 2d23950fe75..f88368d01c8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -93,7 +93,7 @@ public abstract class SubscriptionPrefetchingQueue {
public SubscriptionEvent poll(final String consumerId) {
if (prefetchingQueue.isEmpty()) {
- tryPrefetch();
+ tryPrefetch(true);
}
final long size = prefetchingQueue.size();
@@ -140,14 +140,27 @@ public abstract class SubscriptionPrefetchingQueue {
public abstract void executePrefetch();
/**
- * prefetch at most one {@link SubscriptionEvent} from {@link
+ * Prefetch at most one {@link SubscriptionEvent} from {@link
* SubscriptionPrefetchingQueue#inputPendingQueue} to {@link
- * SubscriptionPrefetchingQueue#prefetchingQueue}
+ * SubscriptionPrefetchingQueue#prefetchingQueue}.
+ *
+ * <p>It will continuously attempt to prefetch and generate a {@link
SubscriptionEvent} until
+ * {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty.
+ *
+ * @param trySealBatchIfEmpty {@code true} if {@link
SubscriptionPrefetchingQueue#trySealBatch} is
+ * called when {@link SubscriptionPrefetchingQueue#inputPendingQueue} is
empty, {@code false}
+ * otherwise
*/
- protected void tryPrefetch() {
- Event event;
- while (Objects.nonNull(
- event =
UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll()))) {
+ protected void tryPrefetch(final boolean trySealBatchIfEmpty) {
+ while (!inputPendingQueue.isEmpty()) {
+ final Event event =
UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll());
+ if (Objects.isNull(event)) {
+ // The event will be null in two cases:
+ // 1. The inputPendingQueue is empty.
+ // 2. The tsfile event has been deduplicated.
+ continue;
+ }
+
if (!(event instanceof EnrichedEvent)) {
LOGGER.warn(
"Subscription: SubscriptionPrefetchingQueue {} only support
prefetch EnrichedEvent. Ignore {}.",
@@ -169,26 +182,35 @@ public abstract class SubscriptionPrefetchingQueue {
if (event instanceof TabletInsertionEvent) {
if (onEvent((TabletInsertionEvent) event)) {
- break;
+ return;
}
- } else if (event instanceof PipeTsFileInsertionEvent) {
+ continue;
+ }
+
+ if (event instanceof PipeTsFileInsertionEvent) {
if (onEvent((PipeTsFileInsertionEvent) event)) {
- break;
- }
- } else {
- // TODO:
- // - PipeHeartbeatEvent: ignored? (may affect pipe metrics)
- // - UserDefinedEnrichedEvent: ignored?
- // - Others: events related to meta sync, safe to ignore
- LOGGER.info(
- "Subscription: SubscriptionPrefetchingQueue {} ignore
EnrichedEvent {} when prefetching.",
- this,
- event);
- if (trySealBatch()) {
- break;
+ return;
}
+ continue;
+ }
+
+ // TODO:
+ // - PipeHeartbeatEvent: ignored? (may affect pipe metrics)
+ // - UserDefinedEnrichedEvent: ignored?
+ // - Others: events related to meta sync, safe to ignore
+ LOGGER.info(
+ "Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent
{} when prefetching.",
+ this,
+ event);
+ if (trySealBatch()) {
+ return;
}
}
+
+ // At this moment, the inputPendingQueue is empty.
+ if (trySealBatchIfEmpty) {
+ trySealBatch();
+ }
}
/**
@@ -222,6 +244,7 @@ public abstract class SubscriptionPrefetchingQueue {
}
if (event.isCommitted()) {
+ event.cleanup();
LOGGER.warn(
"Subscription: subscription event {} is committed, subscription
commit context {}, prefetching queue: {}",
event,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index 3aa36425b88..b0f69da2e76 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -83,7 +83,7 @@ class SubscriptionPrefetchingTabletQueue extends
SubscriptionPrefetchingQueue {
@Override
public void executePrefetch() {
- super.tryPrefetch();
+ super.tryPrefetch(false);
this.serializeEventsInQueue();
}
@@ -114,13 +114,18 @@ class SubscriptionPrefetchingTabletQueue extends
SubscriptionPrefetchingQueue {
@Override
protected boolean trySealBatch() {
+ final AtomicBoolean result = new AtomicBoolean(false);
currentBatchRef.getAndUpdate(
(batch) -> {
- sealBatch(batch);
- return new SubscriptionPipeTabletEventBatch(
- BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES);
+ if (batch.shouldEmit()) {
+ sealBatch(batch);
+ result.set(true);
+ return new SubscriptionPipeTabletEventBatch(
+ BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES);
+ }
+ return batch;
});
- return true;
+ return result.get();
}
private void sealBatch(final SubscriptionPipeTabletEventBatch batch) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index b0aed50b8bf..26fb19846b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -33,6 +33,7 @@ import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
+import com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +43,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -95,16 +97,11 @@ class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQueue {
@Override
public SubscriptionEvent poll(final String consumerId) {
+ // check before polling event from prefetching queue
if (hasUnPollableOnTheFlySubscriptionTsFileEvent(consumerId)) {
return null;
}
- final SubscriptionEvent pollableEvent =
getPollableOnTheFlySubscriptionTsFileEvent(consumerId);
- if (Objects.nonNull(pollableEvent)) {
- return pollableEvent;
- }
-
- // At this point, the event that might be polled should not have been
polled by any consumer.
final SubscriptionEvent event = super.poll(consumerId);
if (Objects.nonNull(event)) {
consumerIdToSubscriptionEventMap.put(consumerId, event);
@@ -113,10 +110,20 @@ class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQueue {
return event;
}
- public synchronized @NonNull SubscriptionEvent pollTsFile(
+ public @NonNull SubscriptionEvent pollTsFile(
final String consumerId, final String fileName, final long
writingOffset) {
// 1. Extract current event and check it
- final SubscriptionEvent event =
consumerIdToSubscriptionEventMap.get(consumerId);
+ final SubscriptionEvent event =
+ consumerIdToSubscriptionEventMap.compute(
+ consumerId,
+ (id, ev) -> {
+ if (Objects.nonNull(ev) && ev.isCommitted()) {
+ ev.cleanup();
+ return null; // remove this entry
+ }
+ return ev;
+ });
+
if (Objects.isNull(event)) {
final String errorMessage =
String.format(
@@ -126,17 +133,6 @@ class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQueue {
return generateSubscriptionPollErrorResponse(errorMessage);
}
- if (event.isCommitted()) {
- event.cleanup();
- consumerIdToSubscriptionEventMap.remove(consumerId);
- final String errorMessage =
- String.format(
- "SubscriptionEvent %s related to TsFile is committed, consumer:
%s, writing offset: %s, prefetching queue: %s",
- event, consumerId, writingOffset, this);
- LOGGER.warn(errorMessage);
- return generateSubscriptionPollErrorResponse(errorMessage);
- }
-
// check consumer id
if (!Objects.equals(event.getLastPolledConsumerId(), consumerId)) {
final String errorMessage =
@@ -260,16 +256,45 @@ class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQueue {
/////////////////////////////// prefetch ///////////////////////////////
@Override
- public synchronized void executePrefetch() {
- super.tryPrefetch();
-
- // prefetch remaining subscription events based on {@link
consumerIdToSubscriptionEventMap}
- for (final SubscriptionEvent event :
consumerIdToSubscriptionEventMap.values()) {
- try {
- event.prefetchRemainingResponses();
- event.trySerializeRemainingResponses();
- } catch (final IOException ignored) {
- }
+ public void executePrefetch() {
+ super.tryPrefetch(false);
+
+ // iterate on the snapshot of the key set
+ final Set<String> consumerIds =
ImmutableSet.copyOf(consumerIdToSubscriptionEventMap.keySet());
+ // NOTE:
+ // 1. Ignore entries added during iteration.
+ // 2. For entries deleted by other threads during iteration, just check if
the value is null.
+ for (final String consumerId : consumerIds) {
+ consumerIdToSubscriptionEventMap.compute(
+ consumerId,
+ (id, ev) -> {
+ if (Objects.isNull(ev)) {
+ return null;
+ }
+
+ // clean up committed event
+ if (ev.isCommitted()) {
+ ev.cleanup();
+ return null; // remove this entry
+ }
+
+ // nack pollable event
+ if (ev.pollable()) {
+ ev.nack();
+ return null; // remove this entry
+ }
+
+ // prefetch and serialize remaining subscription events
+ // NOTE: Since the compute call for the same key is atomic and
will be executed
+ // serially, the current prefetch and serialize operations are
safe.
+ try {
+ ev.prefetchRemainingResponses();
+ ev.trySerializeRemainingResponses();
+ } catch (final IOException ignored) {
+ }
+
+ return ev;
+ });
}
}
@@ -355,16 +380,39 @@ class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQueue {
/////////////////////////////// commit ///////////////////////////////
+ /**
+ * @return {@code true} if ack successfully
+ */
+ @Override
+ public boolean ack(final String consumerId, final SubscriptionCommitContext
commitContext) {
+ if (super.ack(consumerId, commitContext)) {
+ consumerIdToSubscriptionEventMap.compute(
+ consumerId,
+ (id, ev) -> {
+ if (Objects.nonNull(ev) && Objects.equals(commitContext,
ev.getCommitContext())) {
+ return null; // remove this entry
+ }
+ return ev;
+ });
+ return true;
+ }
+ return false;
+ }
+
/**
* @return {@code true} if nack successfully
*/
@Override
public boolean nack(final String consumerId, final SubscriptionCommitContext
commitContext) {
if (super.nack(consumerId, commitContext)) {
- final SubscriptionEvent event =
consumerIdToSubscriptionEventMap.get(consumerId);
- if (Objects.nonNull(event) && Objects.equals(commitContext,
event.getCommitContext())) {
- consumerIdToSubscriptionEventMap.remove(consumerId);
- }
+ consumerIdToSubscriptionEventMap.compute(
+ consumerId,
+ (id, ev) -> {
+ if (Objects.nonNull(ev) && Objects.equals(commitContext,
ev.getCommitContext())) {
+ return null; // remove this entry
+ }
+ return ev;
+ });
return true;
}
return false;
@@ -372,20 +420,20 @@ class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQueue {
/////////////////////////////// utility ///////////////////////////////
- private synchronized boolean hasUnPollableOnTheFlySubscriptionTsFileEvent(
- final String consumerId) {
- final SubscriptionEvent event =
consumerIdToSubscriptionEventMap.get(consumerId);
- if (Objects.isNull(event)) {
- return false;
- }
+ private boolean hasUnPollableOnTheFlySubscriptionTsFileEvent(final String
consumerId) {
+ final SubscriptionEvent event =
+ consumerIdToSubscriptionEventMap.compute(
+ consumerId,
+ (id, ev) -> {
+ if (Objects.nonNull(ev) && ev.isCommitted()) {
+ ev.cleanup();
+ return null; // remove this entry
+ }
- if (event.isCommitted()) {
- event.cleanup();
- consumerIdToSubscriptionEventMap.remove(consumerId);
- return false;
- }
+ return ev;
+ });
- if (!event.pollable()) {
+ if (Objects.nonNull(event) && !event.pollable()) {
LOGGER.info(
"SubscriptionPrefetchingTsFileQueue {} is currently transferring
TsFile (with event {}) to consumer {}",
this,
@@ -397,36 +445,6 @@ class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQueue {
return false;
}
- private synchronized SubscriptionEvent
getPollableOnTheFlySubscriptionTsFileEvent(
- final String consumerId) {
- for (final Map.Entry<String, SubscriptionEvent> entry :
- consumerIdToSubscriptionEventMap.entrySet()) {
- final SubscriptionEvent event = entry.getValue();
- if (event.isCommitted()) {
- event.cleanup();
- consumerIdToSubscriptionEventMap.remove(entry.getKey());
- continue;
- }
-
- if (!event.pollable()) {
- continue;
- }
-
- // uncommitted and pollable event
-
- consumerIdToSubscriptionEventMap.remove(entry.getKey());
-
- event.nack();
- consumerIdToSubscriptionEventMap.put(consumerId, event);
-
- event.recordLastPolledConsumerId(consumerId);
- event.recordLastPolledTimestamp();
- return event;
- }
-
- return null;
- }
-
private SubscriptionEvent generateSubscriptionPollErrorResponse(final String
errorMessage) {
// consider non-critical by default, meaning the client can retry
return super.generateSubscriptionPollErrorResponse(errorMessage, false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index b8926a80316..36e9804dda4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -106,7 +106,7 @@ public class SubscriptionEvent {
return getResponse(currentResponseIndex);
}
- public SubscriptionPollResponse getResponse(final int index) {
+ private SubscriptionPollResponse getResponse(final int index) {
return responses[index];
}
@@ -120,6 +120,7 @@ public class SubscriptionEvent {
committedTimestamp = System.currentTimeMillis();
}
+ /** NOTE: {@link SubscriptionEvent#cleanup} should be called immediately if
event is committed */
public boolean isCommitted() {
if (commitContext.getCommitId() == INVALID_COMMIT_ID) {
// event with invalid commit id is committed
@@ -162,6 +163,9 @@ public class SubscriptionEvent {
if (lastPolledTimestamp == INVALID_TIMESTAMP) {
return true;
}
+ if (Objects.nonNull(lastPolledConsumerId)) {
+ return false;
+ }
// Recycle events that may not be able to be committed, i.e., those that
have been polled but
// not committed within a certain period of time.
return System.currentTimeMillis() - lastPolledTimestamp
@@ -265,7 +269,7 @@ public class SubscriptionEvent {
* @param index the index of response to be serialized
* @return {@code true} if a serialization operation was actually performed
*/
- public boolean trySerializeResponse(final int index) {
+ private boolean trySerializeResponse(final int index) {
if (index >= responses.length) {
return false;
}
@@ -311,7 +315,7 @@ public class SubscriptionEvent {
/////////////////////////////// tsfile ///////////////////////////////
- public @NonNull SubscriptionPollResponse
generateSubscriptionPollResponseWithPieceOrSealPayload(
+ private @NonNull SubscriptionPollResponse
generateSubscriptionPollResponseWithPieceOrSealPayload(
final long writingOffset) throws IOException {
final File tsFile = pipeEvents.getTsFile();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index e9ea7f423e0..892bcaabb68 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -103,7 +103,7 @@ public class SubscriptionPipeTabletEventBatch {
return shouldEmit();
}
- public void ack() {
+ public synchronized void ack() {
for (final EnrichedEvent enrichedEvent : enrichedEvents) {
enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 1e0f91ae787..93f6a9464b4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -46,7 +46,7 @@ public class SubscriptionPipeTsFileEventBatch {
return batch.onEvent(event);
}
- public void ack() {
+ public synchronized void ack() {
batch.decreaseEventsReferenceCount(this.getClass().getName(), true);
}