This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c6fe5c352b9 Subscription: improve end-to-end latency (#12925)
c6fe5c352b9 is described below

commit c6fe5c352b95883b17d11e45ad7994d0335fbf53
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Jul 16 17:36:41 2024 +0800

    Subscription: improve end-to-end latency (#12925)
    
     Specifically includes the following optimization strategies:
    
     1. Each time the consumer polls, it may trigger the `trySealBatch` request 
to batch.
     2. Shorten the interval parameters for some consumer operations.
     3. Optimize `SubscriptionPrefetchingTsFileQueue` lock contention.
---
 .../rpc/subscription/config/ConsumerConstant.java  |   3 +-
 .../consumer/SubscriptionConsumer.java             |   2 +-
 .../consumer/SubscriptionPushConsumer.java         |   6 +-
 .../subscription/SubscriptionCoordinator.java      |   6 +-
 .../broker/SubscriptionPrefetchingQueue.java       |  67 +++++---
 .../broker/SubscriptionPrefetchingTabletQueue.java |  15 +-
 .../broker/SubscriptionPrefetchingTsFileQueue.java | 168 ++++++++++++---------
 .../db/subscription/event/SubscriptionEvent.java   |  10 +-
 .../batch/SubscriptionPipeTabletEventBatch.java    |   2 +-
 .../batch/SubscriptionPipeTsFileEventBatch.java    |   2 +-
 10 files changed, 166 insertions(+), 115 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
index d449b2f9741..3693e89c3bf 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
@@ -65,8 +65,7 @@ public class ConsumerConstant {
   public static final String CONSUME_LISTENER_KEY = "consume-listener";
 
   public static final String AUTO_POLL_INTERVAL_MS_KEY = 
"auto-poll-interval-ms";
-  public static final long AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE = 5_000L;
-  public static final long AUTO_POLL_INTERVAL_MS_MIN_VALUE = 500L;
+  public static final long AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE = 100L;
 
   public static final String AUTO_POLL_TIMEOUT_MS_KEY = "auto-poll-timeout-ms";
   public static final long AUTO_POLL_TIMEOUT_MS_DEFAULT_VALUE = 10_000L;
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 633f63ae7a9..ddf8abee931 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -82,7 +82,7 @@ abstract class SubscriptionConsumer implements AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionConsumer.class);
 
-  private static final long SLEEP_NS = 1_000_000_000L;
+  private static final long SLEEP_NS = 100_000_000L; // 100ms
 
   private final String username;
   private final String password;
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index e28cf3b7c7e..5143958b89a 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -102,8 +102,7 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
     this.ackStrategy = ackStrategy;
     this.consumeListener = consumeListener;
 
-    this.autoPollIntervalMs =
-        Math.max(autoPollIntervalMs, 
ConsumerConstant.AUTO_POLL_INTERVAL_MS_MIN_VALUE);
+    this.autoPollIntervalMs = Math.max(autoPollIntervalMs, 1);
     this.autoPollTimeoutMs =
         Math.max(autoPollTimeoutMs, 
ConsumerConstant.AUTO_POLL_TIMEOUT_MS_MIN_VALUE);
   }
@@ -290,8 +289,7 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
     }
 
     public Builder autoPollIntervalMs(final long autoPollIntervalMs) {
-      this.autoPollIntervalMs =
-          Math.max(autoPollIntervalMs, 
ConsumerConstant.AUTO_POLL_INTERVAL_MS_MIN_VALUE);
+      this.autoPollIntervalMs = Math.max(autoPollIntervalMs, 1);
       return this;
     }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index 2d72f3c6c70..c48f3e0f078 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -51,6 +51,8 @@ public class SubscriptionCoordinator {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionCoordinator.class);
 
   private final ConfigManager configManager;
+
+  // NEVER EXPOSE THIS DIRECTLY TO THE OUTSIDE
   private final SubscriptionInfo subscriptionInfo;
 
   private final PipeTaskCoordinatorLock coordinatorLock;
@@ -122,7 +124,9 @@ public class SubscriptionCoordinator {
     subscriptionMetaSyncer.stop();
   }
 
-  /** Caller should ensure that the method is called in the lock {@link 
#tryLock}. */
+  /**
+   * Caller should ensure that the method is called in the write lock of 
{@link #subscriptionInfo}.
+   */
   public void updateLastSyncedVersion() {
     subscriptionInfo.updateLastSyncedVersion();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 2d23950fe75..f88368d01c8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -93,7 +93,7 @@ public abstract class SubscriptionPrefetchingQueue {
 
   public SubscriptionEvent poll(final String consumerId) {
     if (prefetchingQueue.isEmpty()) {
-      tryPrefetch();
+      tryPrefetch(true);
     }
 
     final long size = prefetchingQueue.size();
@@ -140,14 +140,27 @@ public abstract class SubscriptionPrefetchingQueue {
   public abstract void executePrefetch();
 
   /**
-   * prefetch at most one {@link SubscriptionEvent} from {@link
+   * Prefetch at most one {@link SubscriptionEvent} from {@link
    * SubscriptionPrefetchingQueue#inputPendingQueue} to {@link
-   * SubscriptionPrefetchingQueue#prefetchingQueue}
+   * SubscriptionPrefetchingQueue#prefetchingQueue}.
+   *
+   * <p>It will continuously attempt to prefetch and generate a {@link 
SubscriptionEvent} until
+   * {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty.
+   *
+   * @param trySealBatchIfEmpty {@code true} if {@link 
SubscriptionPrefetchingQueue#trySealBatch} is
+   *     called when {@link SubscriptionPrefetchingQueue#inputPendingQueue} is 
empty, {@code false}
+   *     otherwise
    */
-  protected void tryPrefetch() {
-    Event event;
-    while (Objects.nonNull(
-        event = 
UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll()))) {
+  protected void tryPrefetch(final boolean trySealBatchIfEmpty) {
+    while (!inputPendingQueue.isEmpty()) {
+      final Event event = 
UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll());
+      if (Objects.isNull(event)) {
+        // The event will be null in two cases:
+        // 1. The inputPendingQueue is empty.
+        // 2. The tsfile event has been deduplicated.
+        continue;
+      }
+
       if (!(event instanceof EnrichedEvent)) {
         LOGGER.warn(
             "Subscription: SubscriptionPrefetchingQueue {} only support 
prefetch EnrichedEvent. Ignore {}.",
@@ -169,26 +182,35 @@ public abstract class SubscriptionPrefetchingQueue {
 
       if (event instanceof TabletInsertionEvent) {
         if (onEvent((TabletInsertionEvent) event)) {
-          break;
+          return;
         }
-      } else if (event instanceof PipeTsFileInsertionEvent) {
+        continue;
+      }
+
+      if (event instanceof PipeTsFileInsertionEvent) {
         if (onEvent((PipeTsFileInsertionEvent) event)) {
-          break;
-        }
-      } else {
-        // TODO:
-        //  - PipeHeartbeatEvent: ignored? (may affect pipe metrics)
-        //  - UserDefinedEnrichedEvent: ignored?
-        //  - Others: events related to meta sync, safe to ignore
-        LOGGER.info(
-            "Subscription: SubscriptionPrefetchingQueue {} ignore 
EnrichedEvent {} when prefetching.",
-            this,
-            event);
-        if (trySealBatch()) {
-          break;
+          return;
         }
+        continue;
+      }
+
+      // TODO:
+      //  - PipeHeartbeatEvent: ignored? (may affect pipe metrics)
+      //  - UserDefinedEnrichedEvent: ignored?
+      //  - Others: events related to meta sync, safe to ignore
+      LOGGER.info(
+          "Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent 
{} when prefetching.",
+          this,
+          event);
+      if (trySealBatch()) {
+        return;
       }
     }
+
+    // At this moment, the inputPendingQueue is empty.
+    if (trySealBatchIfEmpty) {
+      trySealBatch();
+    }
   }
 
   /**
@@ -222,6 +244,7 @@ public abstract class SubscriptionPrefetchingQueue {
     }
 
     if (event.isCommitted()) {
+      event.cleanup();
       LOGGER.warn(
           "Subscription: subscription event {} is committed, subscription 
commit context {}, prefetching queue: {}",
           event,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index 3aa36425b88..b0f69da2e76 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -83,7 +83,7 @@ class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQueue {
 
   @Override
   public void executePrefetch() {
-    super.tryPrefetch();
+    super.tryPrefetch(false);
     this.serializeEventsInQueue();
   }
 
@@ -114,13 +114,18 @@ class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQueue {
 
   @Override
   protected boolean trySealBatch() {
+    final AtomicBoolean result = new AtomicBoolean(false);
     currentBatchRef.getAndUpdate(
         (batch) -> {
-          sealBatch(batch);
-          return new SubscriptionPipeTabletEventBatch(
-              BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES);
+          if (batch.shouldEmit()) {
+            sealBatch(batch);
+            result.set(true);
+            return new SubscriptionPipeTabletEventBatch(
+                BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES);
+          }
+          return batch;
         });
-    return true;
+    return result.get();
   }
 
   private void sealBatch(final SubscriptionPipeTabletEventBatch batch) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index b0aed50b8bf..26fb19846b1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -33,6 +33,7 @@ import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload;
 import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
 
+import com.google.common.collect.ImmutableSet;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +43,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -95,16 +97,11 @@ class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQueue {
 
   @Override
   public SubscriptionEvent poll(final String consumerId) {
+    // check before polling event from prefetching queue
     if (hasUnPollableOnTheFlySubscriptionTsFileEvent(consumerId)) {
       return null;
     }
 
-    final SubscriptionEvent pollableEvent = 
getPollableOnTheFlySubscriptionTsFileEvent(consumerId);
-    if (Objects.nonNull(pollableEvent)) {
-      return pollableEvent;
-    }
-
-    // At this point, the event that might be polled should not have been 
polled by any consumer.
     final SubscriptionEvent event = super.poll(consumerId);
     if (Objects.nonNull(event)) {
       consumerIdToSubscriptionEventMap.put(consumerId, event);
@@ -113,10 +110,20 @@ class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQueue {
     return event;
   }
 
-  public synchronized @NonNull SubscriptionEvent pollTsFile(
+  public @NonNull SubscriptionEvent pollTsFile(
       final String consumerId, final String fileName, final long 
writingOffset) {
     // 1. Extract current event and check it
-    final SubscriptionEvent event = 
consumerIdToSubscriptionEventMap.get(consumerId);
+    final SubscriptionEvent event =
+        consumerIdToSubscriptionEventMap.compute(
+            consumerId,
+            (id, ev) -> {
+              if (Objects.nonNull(ev) && ev.isCommitted()) {
+                ev.cleanup();
+                return null; // remove this entry
+              }
+              return ev;
+            });
+
     if (Objects.isNull(event)) {
       final String errorMessage =
           String.format(
@@ -126,17 +133,6 @@ class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQueue {
       return generateSubscriptionPollErrorResponse(errorMessage);
     }
 
-    if (event.isCommitted()) {
-      event.cleanup();
-      consumerIdToSubscriptionEventMap.remove(consumerId);
-      final String errorMessage =
-          String.format(
-              "SubscriptionEvent %s related to TsFile is committed, consumer: 
%s, writing offset: %s, prefetching queue: %s",
-              event, consumerId, writingOffset, this);
-      LOGGER.warn(errorMessage);
-      return generateSubscriptionPollErrorResponse(errorMessage);
-    }
-
     // check consumer id
     if (!Objects.equals(event.getLastPolledConsumerId(), consumerId)) {
       final String errorMessage =
@@ -260,16 +256,45 @@ class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQueue {
   /////////////////////////////// prefetch ///////////////////////////////
 
   @Override
-  public synchronized void executePrefetch() {
-    super.tryPrefetch();
-
-    // prefetch remaining subscription events based on {@link 
consumerIdToSubscriptionEventMap}
-    for (final SubscriptionEvent event : 
consumerIdToSubscriptionEventMap.values()) {
-      try {
-        event.prefetchRemainingResponses();
-        event.trySerializeRemainingResponses();
-      } catch (final IOException ignored) {
-      }
+  public void executePrefetch() {
+    super.tryPrefetch(false);
+
+    // iterate on the snapshot of the key set
+    final Set<String> consumerIds = 
ImmutableSet.copyOf(consumerIdToSubscriptionEventMap.keySet());
+    // NOTE:
+    // 1. Ignore entries added during iteration.
+    // 2. For entries deleted by other threads during iteration, just check if 
the value is null.
+    for (final String consumerId : consumerIds) {
+      consumerIdToSubscriptionEventMap.compute(
+          consumerId,
+          (id, ev) -> {
+            if (Objects.isNull(ev)) {
+              return null;
+            }
+
+            // clean up committed event
+            if (ev.isCommitted()) {
+              ev.cleanup();
+              return null; // remove this entry
+            }
+
+            // nack pollable event
+            if (ev.pollable()) {
+              ev.nack();
+              return null; // remove this entry
+            }
+
+            // prefetch and serialize remaining subscription events
+            // NOTE: Since the compute call for the same key is atomic and 
will be executed
+            // serially, the current prefetch and serialize operations are 
safe.
+            try {
+              ev.prefetchRemainingResponses();
+              ev.trySerializeRemainingResponses();
+            } catch (final IOException ignored) {
+            }
+
+            return ev;
+          });
     }
   }
 
@@ -355,16 +380,39 @@ class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQueue {
 
   /////////////////////////////// commit ///////////////////////////////
 
+  /**
+   * @return {@code true} if ack successfully
+   */
+  @Override
+  public boolean ack(final String consumerId, final SubscriptionCommitContext 
commitContext) {
+    if (super.ack(consumerId, commitContext)) {
+      consumerIdToSubscriptionEventMap.compute(
+          consumerId,
+          (id, ev) -> {
+            if (Objects.nonNull(ev) && Objects.equals(commitContext, 
ev.getCommitContext())) {
+              return null; // remove this entry
+            }
+            return ev;
+          });
+      return true;
+    }
+    return false;
+  }
+
   /**
    * @return {@code true} if nack successfully
    */
   @Override
   public boolean nack(final String consumerId, final SubscriptionCommitContext 
commitContext) {
     if (super.nack(consumerId, commitContext)) {
-      final SubscriptionEvent event = 
consumerIdToSubscriptionEventMap.get(consumerId);
-      if (Objects.nonNull(event) && Objects.equals(commitContext, 
event.getCommitContext())) {
-        consumerIdToSubscriptionEventMap.remove(consumerId);
-      }
+      consumerIdToSubscriptionEventMap.compute(
+          consumerId,
+          (id, ev) -> {
+            if (Objects.nonNull(ev) && Objects.equals(commitContext, 
ev.getCommitContext())) {
+              return null; // remove this entry
+            }
+            return ev;
+          });
       return true;
     }
     return false;
@@ -372,20 +420,20 @@ class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQueue {
 
   /////////////////////////////// utility ///////////////////////////////
 
-  private synchronized boolean hasUnPollableOnTheFlySubscriptionTsFileEvent(
-      final String consumerId) {
-    final SubscriptionEvent event = 
consumerIdToSubscriptionEventMap.get(consumerId);
-    if (Objects.isNull(event)) {
-      return false;
-    }
+  private boolean hasUnPollableOnTheFlySubscriptionTsFileEvent(final String 
consumerId) {
+    final SubscriptionEvent event =
+        consumerIdToSubscriptionEventMap.compute(
+            consumerId,
+            (id, ev) -> {
+              if (Objects.nonNull(ev) && ev.isCommitted()) {
+                ev.cleanup();
+                return null; // remove this entry
+              }
 
-    if (event.isCommitted()) {
-      event.cleanup();
-      consumerIdToSubscriptionEventMap.remove(consumerId);
-      return false;
-    }
+              return ev;
+            });
 
-    if (!event.pollable()) {
+    if (Objects.nonNull(event) && !event.pollable()) {
       LOGGER.info(
           "SubscriptionPrefetchingTsFileQueue {} is currently transferring 
TsFile (with event {}) to consumer {}",
           this,
@@ -397,36 +445,6 @@ class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQueue {
     return false;
   }
 
-  private synchronized SubscriptionEvent 
getPollableOnTheFlySubscriptionTsFileEvent(
-      final String consumerId) {
-    for (final Map.Entry<String, SubscriptionEvent> entry :
-        consumerIdToSubscriptionEventMap.entrySet()) {
-      final SubscriptionEvent event = entry.getValue();
-      if (event.isCommitted()) {
-        event.cleanup();
-        consumerIdToSubscriptionEventMap.remove(entry.getKey());
-        continue;
-      }
-
-      if (!event.pollable()) {
-        continue;
-      }
-
-      // uncommitted and pollable event
-
-      consumerIdToSubscriptionEventMap.remove(entry.getKey());
-
-      event.nack();
-      consumerIdToSubscriptionEventMap.put(consumerId, event);
-
-      event.recordLastPolledConsumerId(consumerId);
-      event.recordLastPolledTimestamp();
-      return event;
-    }
-
-    return null;
-  }
-
   private SubscriptionEvent generateSubscriptionPollErrorResponse(final String 
errorMessage) {
     // consider non-critical by default, meaning the client can retry
     return super.generateSubscriptionPollErrorResponse(errorMessage, false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index b8926a80316..36e9804dda4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -106,7 +106,7 @@ public class SubscriptionEvent {
     return getResponse(currentResponseIndex);
   }
 
-  public SubscriptionPollResponse getResponse(final int index) {
+  private SubscriptionPollResponse getResponse(final int index) {
     return responses[index];
   }
 
@@ -120,6 +120,7 @@ public class SubscriptionEvent {
     committedTimestamp = System.currentTimeMillis();
   }
 
+  /** NOTE: {@link SubscriptionEvent#cleanup} should be called immediately if 
event is committed */
   public boolean isCommitted() {
     if (commitContext.getCommitId() == INVALID_COMMIT_ID) {
       // event with invalid commit id is committed
@@ -162,6 +163,9 @@ public class SubscriptionEvent {
     if (lastPolledTimestamp == INVALID_TIMESTAMP) {
       return true;
     }
+    if (Objects.nonNull(lastPolledConsumerId)) {
+      return false;
+    }
     // Recycle events that may not be able to be committed, i.e., those that 
have been polled but
     // not committed within a certain period of time.
     return System.currentTimeMillis() - lastPolledTimestamp
@@ -265,7 +269,7 @@ public class SubscriptionEvent {
    * @param index the index of response to be serialized
    * @return {@code true} if a serialization operation was actually performed
    */
-  public boolean trySerializeResponse(final int index) {
+  private boolean trySerializeResponse(final int index) {
     if (index >= responses.length) {
       return false;
     }
@@ -311,7 +315,7 @@ public class SubscriptionEvent {
 
   /////////////////////////////// tsfile ///////////////////////////////
 
-  public @NonNull SubscriptionPollResponse 
generateSubscriptionPollResponseWithPieceOrSealPayload(
+  private @NonNull SubscriptionPollResponse 
generateSubscriptionPollResponseWithPieceOrSealPayload(
       final long writingOffset) throws IOException {
     final File tsFile = pipeEvents.getTsFile();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index e9ea7f423e0..892bcaabb68 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -103,7 +103,7 @@ public class SubscriptionPipeTabletEventBatch {
     return shouldEmit();
   }
 
-  public void ack() {
+  public synchronized void ack() {
     for (final EnrichedEvent enrichedEvent : enrichedEvents) {
       enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 1e0f91ae787..93f6a9464b4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -46,7 +46,7 @@ public class SubscriptionPipeTsFileEventBatch {
     return batch.onEvent(event);
   }
 
-  public void ack() {
+  public synchronized void ack() {
     batch.decreaseEventsReferenceCount(this.getClass().getName(), true);
   }
 

Reply via email to