This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ef6a06a2def Subscription: allow generate subsequent events with the 
same tablet batch to avoid large message & improve poll logic to avoid 
unnecessary nack (#14452)
ef6a06a2def is described below

commit ef6a06a2def254e30d152e90f134502fd949082d
Author: VGalaxies <[email protected]>
AuthorDate: Wed Dec 18 00:00:37 2024 +0800

    Subscription: allow generate subsequent events with the same tablet batch 
to avoid large message & improve poll logic to avoid unnecessary nack (#14452)
---
 .../consumer/SubscriptionConsumer.java             |   3 +-
 .../consumer/SubscriptionPullConsumer.java         |   3 +-
 .../consumer/SubscriptionPushConsumer.java         |   3 +-
 .../subtask/connector/PipeConnectorSubtask.java    |   2 +-
 .../db/subscription/broker/SubscriptionBroker.java | 104 +++++++++++++++++----
 .../broker/SubscriptionPrefetchingQueue.java       |   7 +-
 .../db/subscription/broker/SubscriptionStates.java |  30 +++---
 ...java => SubscriptionCommitContextSupplier.java} |  34 +------
 .../db/subscription/event/SubscriptionEvent.java   |  14 ++-
 .../batch/SubscriptionPipeTabletEventBatch.java    |  57 ++++++++---
 .../SubscriptionEventExtendableResponse.java       |   6 --
 .../event/response/SubscriptionEventResponse.java  |   7 ++
 .../response/SubscriptionEventTabletResponse.java  |  60 ++++++++++--
 .../receiver/SubscriptionReceiverV1.java           |   2 +-
 14 files changed, 234 insertions(+), 98 deletions(-)

diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index f02ae3b2448..c85bdc4b9a6 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -610,7 +610,8 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
                         LOGGER.warn("unexpected response type: {}", 
responseType);
                         return Optional.empty();
                       })
-                  .apply(response, timer)
+                  // TODO: reuse previous timer?
+                  .apply(response, new PollTimer(System.currentTimeMillis(), 
timeoutMs))
                   .ifPresent(currentMessages::add);
             } catch (final SubscriptionRuntimeNonCriticalException e) {
               LOGGER.warn(
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index ae9f99b969d..874659c4339 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.session.subscription.consumer;
 import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.util.CollectionUtils;
 import org.apache.iotdb.session.subscription.util.IdentifierUtils;
 
 import org.slf4j.Logger;
@@ -180,7 +181,7 @@ public class SubscriptionPullConsumer extends 
SubscriptionConsumer {
       LOGGER.info(
           "SubscriptionPullConsumer {} poll empty message from topics {} after 
{} millisecond(s)",
           this,
-          parsedTopicNames,
+          CollectionUtils.getLimitedString(parsedTopicNames, 32),
           timeoutMs);
       return messages;
     }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index 953894eece7..327596d474f 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.session.subscription.consumer;
 import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.util.CollectionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -178,7 +179,7 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
           LOGGER.info(
               "SubscriptionPushConsumer {} poll empty message from topics {} 
after {} millisecond(s)",
               this,
-              subscribedTopics.keySet(),
+              CollectionUtils.getLimitedString(subscribedTopics.keySet(), 32),
               autoPollTimeoutMs);
           return;
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index c0470becb3d..09782813fff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -202,7 +202,7 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
       final long startTime = System.currentTimeMillis();
       outputPipeConnector.close();
       LOGGER.info(
-          "Pipe: connector subtask {} was closed {} within {} ms",
+          "Pipe: connector subtask {} ({}) was closed within {} ms",
           taskID,
           outputPipeConnector,
           System.currentTimeMillis() - startTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index 74db75b277c..df888ec1b03 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -34,12 +34,15 @@ import 
org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
-import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -88,18 +91,92 @@ public class SubscriptionBroker {
 
   public List<SubscriptionEvent> poll(
       final String consumerId, final Set<String> topicNames, final long 
maxBytes) {
-    final List<SubscriptionEvent> events = new ArrayList<>();
+    final List<SubscriptionEvent> eventsToPoll = new ArrayList<>();
+    final Set<String> candidateTopicNames = 
prepareCandidateTopicNames(topicNames, eventsToPoll);
+
+    // Sort topic names based on the current subscription states (number of 
events received per
+    // topic)
+    final List<String> sortedTopicNames = new ArrayList<>(candidateTopicNames);
+    sortedTopicNames.sort(
+        Comparator.comparingLong(
+            topicName ->
+                
Objects.requireNonNull(consumerIdToSubscriptionStates.get(consumerId))
+                    .getStates(topicName)));
+
+    final List<SubscriptionEvent> eventsToNack = new ArrayList<>();
+    long totalSize = 0;
+    final Map<String, Long> topicNameToIncrements = new HashMap<>();
+
+    // Iterate over each sorted topic name and poll the corresponding events
+    for (final String topicName : sortedTopicNames) {
+      final SubscriptionPrefetchingQueue prefetchingQueue =
+          topicNameToPrefetchingQueue.get(topicName);
+      // Recheck
+      if (Objects.isNull(prefetchingQueue) || prefetchingQueue.isClosed()) {
+        continue;
+      }
+
+      // Poll the event from the prefetching queue
+      final SubscriptionEvent event = prefetchingQueue.poll(consumerId);
+      if (Objects.isNull(event)) {
+        continue;
+      }
+
+      // Try to get the current size of the event
+      final long currentSize;
+      try {
+        currentSize = event.getCurrentResponseSize();
+      } catch (final IOException e) {
+        // If there is an error getting the event's size, nack the event
+        eventsToNack.add(event);
+        continue;
+      }
+
+      // Add the event to the poll list
+      eventsToPoll.add(event);
+
+      // Increment the event count for the topic
+      topicNameToIncrements.merge(event.getCommitContext().getTopicName(), 1L, 
Long::sum);
+
+      // Update the total size
+      totalSize += currentSize;
+
+      // If adding this event exceeds the maxBytes (pessimistic estimation), 
break the loop
+      if (totalSize + currentSize > maxBytes) {
+        break;
+      }
+    }
+
+    // Update the subscription states with the increments for the topics 
processed
+    Objects.requireNonNull(consumerIdToSubscriptionStates.get(consumerId))
+        .updateStates(topicNameToIncrements);
+
+    // Commit the nack events for the consumer
+    commit(
+        consumerId,
+        
eventsToNack.stream().map(SubscriptionEvent::getCommitContext).collect(Collectors.toList()),
+        true);
+
+    // Return the list of events that are to be polled
+    return eventsToPoll;
+  }
+
+  private Set<String> prepareCandidateTopicNames(
+      final Set<String> topicNames,
+      final List<SubscriptionEvent> eventsToPoll /* output parameter */) {
+    final Set<String> candidateTopicNames = new HashSet<>();
     for (final String topicName : topicNames) {
       final SubscriptionPrefetchingQueue prefetchingQueue =
           topicNameToPrefetchingQueue.get(topicName);
+      // If there is no prefetching queue for the topic, check if it's 
completed
       if (Objects.isNull(prefetchingQueue)) {
-        // check if completed
         if (completedTopicNames.containsKey(topicName)) {
           LOGGER.info(
               "Subscription: prefetching queue bound to topic [{}] for 
consumer group [{}] is completed, return termination response to client",
               topicName,
               brokerId);
-          events.add(
+          // Add a termination event for the completed topic
+          eventsToPoll.add(
               new SubscriptionEvent(
                   SubscriptionPollResponseType.TERMINATION.getType(),
                   new TerminationPayload(),
@@ -118,6 +195,8 @@ public class SubscriptionBroker {
         //   2.2. potential disorder of unbind and close prefetching queue...
         continue;
       }
+
+      // Check if the prefetching queue is closed
       if (prefetchingQueue.isClosed()) {
         LOGGER.warn(
             "Subscription: prefetching queue bound to topic [{}] for consumer 
group [{}] is closed",
@@ -125,22 +204,11 @@ public class SubscriptionBroker {
             brokerId);
         continue;
       }
-      final SubscriptionEvent event = prefetchingQueue.poll(consumerId);
-      if (Objects.nonNull(event)) {
-        events.add(event);
-      }
+
+      candidateTopicNames.add(topicName);
     }
 
-    final Pair<List<SubscriptionEvent>, List<SubscriptionEvent>> 
eventsToPollWithEventsToNack =
-        Objects.requireNonNull(consumerIdToSubscriptionStates.get(consumerId))
-            .filter(events, maxBytes);
-    commit(
-        consumerId,
-        eventsToPollWithEventsToNack.right.stream()
-            .map(SubscriptionEvent::getCommitContext)
-            .collect(Collectors.toList()),
-        true);
-    return eventsToPollWithEventsToNack.left;
+    return candidateTopicNames;
   }
 
   public List<SubscriptionEvent> pollTsFile(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 433052409d9..20a0d0983f0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import 
org.apache.iotdb.db.pipe.agent.task.execution.PipeSubtaskExecutorManager;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
-import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches;
@@ -343,8 +342,8 @@ public abstract class SubscriptionPrefetchingQueue {
         continue;
       }
 
-      if (event instanceof PipeTsFileInsertionEvent) {
-        if (onEvent((PipeTsFileInsertionEvent) event)) {
+      if (event instanceof TsFileInsertionEvent) {
+        if (onEvent((TsFileInsertionEvent) event)) {
           return;
         }
         continue;
@@ -448,7 +447,7 @@ public abstract class SubscriptionPrefetchingQueue {
                 this);
           }
 
-          ev.ack();
+          ev.ack(this::enqueueEventToPrefetchingQueue);
           ev.recordCommittedTimestamp(); // now committed
           acked.set(true);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionStates.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionStates.java
index caa1c0fd1e2..56d235adcc0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionStates.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionStates.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.subscription.broker;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 
 import org.apache.tsfile.utils.Pair;
@@ -51,6 +52,7 @@ public class SubscriptionStates {
    * @return a Pair containing two lists: the first list contains the events 
to poll, and the second
    *     list contains the events to nack
    */
+  @TestOnly
   public Pair<List<SubscriptionEvent>, List<SubscriptionEvent>> filter(
       final List<SubscriptionEvent> events, final long maxBytes) {
     final List<SubscriptionEvent> eventsToPoll = new ArrayList<>();
@@ -82,40 +84,40 @@ public class SubscriptionStates {
     }
 
     // Update the subscription state with the increments calculated during 
filtering
-    update(topicNameToIncrements);
+    updateStates(topicNameToIncrements);
     return new Pair<>(eventsToPoll, eventsToNack);
   }
 
+  /**
+   * Sorts a list of SubscriptionEvents according to the event count in the 
subscription states.
+   * Events with fewer counts are prioritized.
+   *
+   * @param events the list of events to sort
+   */
+  private void sort(final List<SubscriptionEvent> events) {
+    events.sort(
+        Comparator.comparingLong(event -> 
getStates(event.getCommitContext().getTopicName())));
+  }
+
   /**
    * Updates the subscription state by incrementing the event count for 
multiple topics.
    *
    * @param topicNameToIncrements a map where the key is the topic name and 
the value is the number
    *     of events to add to the count
    */
-  private void update(final Map<String, Long> topicNameToIncrements) {
+  public void updateStates(final Map<String, Long> topicNameToIncrements) {
     for (final Entry<String, Long> entry : topicNameToIncrements.entrySet()) {
       topicNameToEventCount.merge(entry.getKey(), entry.getValue(), Long::sum);
     }
   }
 
-  /**
-   * Sorts a list of SubscriptionEvents according to the event count in the 
subscription states.
-   * Events with fewer counts are prioritized.
-   *
-   * @param events the list of events to sort
-   */
-  private void sort(final List<SubscriptionEvent> events) {
-    events.sort(
-        Comparator.comparingLong(event -> 
getCount(event.getCommitContext().getTopicName())));
-  }
-
   /**
    * Returns the number of events received for a specific topic.
    *
    * @param topicName the name of the topic
    * @return the number of events received for the topic
    */
-  private long getCount(final String topicName) {
+  public long getStates(final String topicName) {
     return topicNameToEventCount.getOrDefault(topicName, 0L);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionCommitContextSupplier.java
similarity index 50%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionCommitContextSupplier.java
index 3ded870feed..cc73384d42c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionCommitContextSupplier.java
@@ -17,36 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.subscription.event.response;
+package org.apache.iotdb.db.subscription.event;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
 
-public interface SubscriptionEventResponse<E> {
+@FunctionalInterface
+public interface SubscriptionCommitContextSupplier {
 
-  /////////////////////////////// response ///////////////////////////////
-
-  E getCurrentResponse();
-
-  void prefetchRemainingResponses() throws Exception;
-
-  void fetchNextResponse(final long offset) throws Exception;
-
-  /////////////////////////////// byte buffer ///////////////////////////////
-
-  void trySerializeCurrentResponse();
-
-  void trySerializeRemainingResponses();
-
-  ByteBuffer getCurrentResponseByteBuffer() throws IOException;
-
-  void invalidateCurrentResponseByteBuffer();
-
-  /////////////////////////////// lifecycle ///////////////////////////////
-
-  void nack();
-
-  void cleanUp();
-
-  boolean isCommittable();
+  SubscriptionCommitContext get();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index ec0ed1fc079..97fa621faef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -43,6 +43,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 
 import static 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID;
 
@@ -87,9 +88,12 @@ public class SubscriptionEvent {
    * SubscriptionEventTabletResponse}.
    */
   public SubscriptionEvent(
-      final SubscriptionPipeTabletEventBatch batch, final 
SubscriptionCommitContext commitContext) {
+      final SubscriptionPipeTabletEventBatch batch,
+      final SubscriptionCommitContextSupplier commitContextSupplier) {
     this.pipeEvents = new SubscriptionPipeTabletBatchEvents(batch);
-    this.response = new SubscriptionEventTabletResponse(batch, commitContext);
+    final SubscriptionCommitContext commitContext = 
commitContextSupplier.get();
+    this.response =
+        new SubscriptionEventTabletResponse(batch, commitContext, 
commitContextSupplier);
     this.commitContext = commitContext;
   }
 
@@ -138,7 +142,11 @@ public class SubscriptionEvent {
     return response.isCommittable();
   }
 
-  public void ack() {
+  public void ack(final Consumer<SubscriptionEvent> onCommittedHook) {
+    // ack response
+    response.ack(onCommittedHook);
+
+    // ack pipe events
     pipeEvents.ack();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index fe5ffee564f..59528125f49 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -37,6 +37,7 @@ import org.apache.tsfile.write.record.Tablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -52,12 +53,15 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
   private long firstEventProcessingTime = Long.MIN_VALUE;
   private long totalBufferSize = 0;
 
-  private volatile Iterator<EnrichedEvent> enrichedEventsIterator;
+  private volatile Iterator<EnrichedEvent> currentEnrichedEventsIterator;
   private volatile Iterator<TabletInsertionEvent> 
currentTabletInsertionEventsIterator;
+  private volatile TsFileInsertionEvent currentTsFileInsertionEvent;
 
   private final Meter insertNodeTabletInsertionEventSizeEstimator;
   private final Meter rawTabletInsertionEventSizeEstimator;
 
+  private final List<EnrichedEvent> iteratedEnrichedEvents = new ArrayList<>();
+
   public SubscriptionPipeTabletEventBatch(
       final int regionId,
       final SubscriptionPrefetchingTabletQueue prefetchingQueue,
@@ -75,21 +79,29 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
 
   @Override
   public synchronized void ack() {
-    for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+    // only decrease the reference count of iterated events
+    for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
       enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
     }
+    iteratedEnrichedEvents.clear();
   }
 
   @Override
   public synchronized void cleanUp() {
+    // do nothing if it has next
+    if (hasNext()) {
+      return;
+    }
+
     // clear the reference count of events
     for (final EnrichedEvent enrichedEvent : enrichedEvents) {
       enrichedEvent.clearReferenceCount(this.getClass().getName());
     }
     enrichedEvents.clear();
 
-    enrichedEventsIterator = null;
+    currentEnrichedEventsIterator = null;
     currentTabletInsertionEventsIterator = null;
+    currentTsFileInsertionEvent = null;
   }
 
   /////////////////////////////// utility ///////////////////////////////
@@ -119,13 +131,17 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
 
     // update buffer size
     // TODO: more precise computation
+    // NOTE: Considering the possibility of large tsfile, the final generated 
response size may be
+    // larger than totalBufferSize, therefore limit control is also required in
+    // SubscriptionEventTabletResponse.
     totalBufferSize += ((PipeTsFileInsertionEvent) event).getTsFile().length();
   }
 
   @Override
   protected List<SubscriptionEvent> generateSubscriptionEvents() {
+    resetIterator();
     return Collections.singletonList(
-        new SubscriptionEvent(this, 
prefetchingQueue.generateSubscriptionCommitContext()));
+        new SubscriptionEvent(this, 
prefetchingQueue::generateSubscriptionCommitContext));
   }
 
   @Override
@@ -179,7 +195,11 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
   /////////////////////////////// iterator ///////////////////////////////
 
   public void resetIterator() {
-    enrichedEventsIterator = enrichedEvents.iterator();
+    currentEnrichedEventsIterator = enrichedEvents.iterator();
+    currentTabletInsertionEventsIterator = null;
+    currentTsFileInsertionEvent = null;
+
+    iteratedEnrichedEvents.clear();
   }
 
   @Override
@@ -190,19 +210,20 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
       } else {
         // reset
         currentTabletInsertionEventsIterator = null;
+        currentTsFileInsertionEvent = null;
         return false;
       }
     }
 
-    if (Objects.isNull(enrichedEventsIterator)) {
+    if (Objects.isNull(currentEnrichedEventsIterator)) {
       return false;
     }
 
-    if (enrichedEventsIterator.hasNext()) {
+    if (currentEnrichedEventsIterator.hasNext()) {
       return true;
     } else {
       // reset
-      enrichedEventsIterator = null;
+      currentEnrichedEventsIterator = null;
       return false;
     }
   }
@@ -211,33 +232,43 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
   public List<Tablet> next() {
     if (Objects.nonNull(currentTabletInsertionEventsIterator)) {
       if (currentTabletInsertionEventsIterator.hasNext()) {
-        return convertToTablets(currentTabletInsertionEventsIterator.next());
+        final TabletInsertionEvent tabletInsertionEvent =
+            currentTabletInsertionEventsIterator.next();
+        if (!currentTabletInsertionEventsIterator.hasNext()) {
+          iteratedEnrichedEvents.add((EnrichedEvent) 
currentTsFileInsertionEvent);
+        }
+        return convertToTablets(tabletInsertionEvent);
       } else {
         currentTabletInsertionEventsIterator = null;
+        currentTsFileInsertionEvent = null;
       }
     }
 
-    if (Objects.isNull(enrichedEventsIterator)) {
+    if (Objects.isNull(currentEnrichedEventsIterator)) {
       return null;
     }
 
-    if (!enrichedEventsIterator.hasNext()) {
+    if (!currentEnrichedEventsIterator.hasNext()) {
       return null;
     }
 
-    final EnrichedEvent enrichedEvent = enrichedEventsIterator.next();
+    final EnrichedEvent enrichedEvent = currentEnrichedEventsIterator.next();
     if (enrichedEvent instanceof TsFileInsertionEvent) {
       if (Objects.nonNull(currentTabletInsertionEventsIterator)) {
         LOGGER.warn(
             "SubscriptionPipeTabletEventBatch {} override non-null 
currentTabletInsertionEventsIterator when iterating (broken invariant).",
             this);
       }
+      final PipeTsFileInsertionEvent tsFileInsertionEvent =
+          (PipeTsFileInsertionEvent) enrichedEvent;
+      currentTsFileInsertionEvent = tsFileInsertionEvent;
       currentTabletInsertionEventsIterator =
-          ((PipeTsFileInsertionEvent) enrichedEvent)
+          tsFileInsertionEvent
               
.toTabletInsertionEvents(SubscriptionAgent.receiver().remainingMs())
               .iterator();
       return next();
     } else if (enrichedEvent instanceof TabletInsertionEvent) {
+      iteratedEnrichedEvents.add(enrichedEvent);
       return convertToTablets((TabletInsertionEvent) enrichedEvent);
     } else {
       LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
index dbbe9d898c8..dba2f48b466 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
@@ -22,9 +22,6 @@ package org.apache.iotdb.db.subscription.event.response;
 import 
org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
 import 
org.apache.iotdb.db.subscription.event.cache.SubscriptionPollResponseCache;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Deque;
@@ -42,9 +39,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 public abstract class SubscriptionEventExtendableResponse
     implements SubscriptionEventResponse<CachedSubscriptionPollResponse> {
 
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(SubscriptionEventTabletResponse.class);
-
   private final Deque<CachedSubscriptionPollResponse> responses;
   protected volatile boolean hasNoMore = false;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
index 3ded870feed..ed6ad138276 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
@@ -19,8 +19,11 @@
 
 package org.apache.iotdb.db.subscription.event.response;
 
+import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.function.Consumer;
 
 public interface SubscriptionEventResponse<E> {
 
@@ -44,6 +47,10 @@ public interface SubscriptionEventResponse<E> {
 
   /////////////////////////////// lifecycle ///////////////////////////////
 
+  default void ack(final Consumer<SubscriptionEvent> onCommittedHook) {
+    // do nothing
+  }
+
   void nack();
 
   void cleanUp();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
index 61cce24294d..35591ed405e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.subscription.event.response;
 
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import 
org.apache.iotdb.db.subscription.event.SubscriptionCommitContextSupplier;
+import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
 import 
org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
@@ -36,6 +38,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 /**
  * The {@code SubscriptionEventTabletResponse} class extends {@link
@@ -51,16 +54,27 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
   private static final long READ_TABLET_BUFFER_SIZE =
       SubscriptionConfig.getInstance().getSubscriptionReadTabletBufferSize();
 
+  private static final long PREFETCH_TABLET_BUFFER_SIZE =
+      
SubscriptionConfig.getInstance().getSubscriptionPrefetchTabletBatchMaxSizeInBytes();
+
   private final SubscriptionPipeTabletEventBatch batch;
   private final SubscriptionCommitContext commitContext;
+  private final SubscriptionCommitContextSupplier commitContextSupplier;
 
-  private volatile int tabletsSize;
+  private volatile int totalTablets;
   private final AtomicInteger nextOffset = new AtomicInteger(0);
 
+  // use these variables to limit control for large message
+  private volatile long totalBufferSize;
+  private volatile boolean availableForNext = false;
+
   public SubscriptionEventTabletResponse(
-      final SubscriptionPipeTabletEventBatch batch, final 
SubscriptionCommitContext commitContext) {
+      final SubscriptionPipeTabletEventBatch batch,
+      final SubscriptionCommitContext commitContext,
+      final SubscriptionCommitContextSupplier commitContextSupplier) {
     this.batch = batch;
     this.commitContext = commitContext;
+    this.commitContextSupplier = commitContextSupplier;
 
     init();
   }
@@ -80,13 +94,26 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
     }
   }
 
+  @Override
+  public synchronized void ack(final Consumer<SubscriptionEvent> 
onCommittedHook) {
+    if (availableForNext) {
+      // generate next subscription event with the same batch
+      onCommittedHook.accept(new SubscriptionEvent(batch, 
commitContextSupplier));
+    }
+  }
+
   @Override
   public synchronized void nack() {
     if (nextOffset.get() == 1) {
       // do nothing if with complete tablets
       return;
     }
+
     cleanUp();
+
+    // should not reset the iterator of batch when init
+    // TODO: avoid completely rewinding the iterator
+    batch.resetIterator();
     init();
   }
 
@@ -94,8 +121,16 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
   public synchronized void cleanUp() {
     super.cleanUp();
 
-    tabletsSize = 0;
+    totalTablets = 0;
     nextOffset.set(0);
+
+    totalBufferSize = 0;
+    availableForNext = false;
+  }
+
+  @Override
+  public boolean isCommittable() {
+    return (availableForNext || hasNoMore) && size() == 1;
   }
 
   /////////////////////////////// utility ///////////////////////////////
@@ -108,11 +143,17 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
       return;
     }
 
-    batch.resetIterator();
     offer(generateNextTabletResponse());
   }
 
   private synchronized CachedSubscriptionPollResponse 
generateNextTabletResponse() {
+    if (availableForNext) {
+      return new CachedSubscriptionPollResponse(
+          SubscriptionPollResponseType.TABLETS.getType(),
+          new TabletsPayload(Collections.emptyList(), -totalTablets),
+          commitContext);
+    }
+
     final List<Tablet> currentTablets = new ArrayList<>();
     long currentBufferSize = 0;
 
@@ -128,7 +169,8 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
               .map(PipeMemoryWeightUtil::calculateTabletSizeInBytes)
               .reduce(Long::sum)
               .orElse(0L);
-      tabletsSize += tablets.size();
+      totalTablets += tablets.size();
+      totalBufferSize += bufferSize;
 
       if (bufferSize > READ_TABLET_BUFFER_SIZE) {
         // TODO: split tablets
@@ -148,6 +190,12 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
       }
 
       currentBufferSize += bufferSize;
+
+      // limit control for large message
+      if (totalBufferSize > PREFETCH_TABLET_BUFFER_SIZE && batch.hasNext()) {
+        availableForNext = true;
+        break;
+      }
     }
 
     final CachedSubscriptionPollResponse response;
@@ -155,7 +203,7 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
       response =
           new CachedSubscriptionPollResponse(
               SubscriptionPollResponseType.TABLETS.getType(),
-              new TabletsPayload(Collections.emptyList(), -tabletsSize),
+              new TabletsPayload(Collections.emptyList(), -totalTablets),
               commitContext);
       hasNoMore = true;
     } else {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 73ee10c2854..7a7459d0ab2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -89,7 +89,7 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionReceiverV1.class);
 
-  private static final double POLL_PAYLOAD_SIZE_EXCEED_THRESHOLD = 0.95;
+  private static final double POLL_PAYLOAD_SIZE_EXCEED_THRESHOLD = 0.9;
 
   private static final IClientManager<ConfigRegionId, ConfigNodeClient> 
CONFIG_NODE_CLIENT_MANAGER =
       ConfigNodeClientManager.getInstance();


Reply via email to