This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new de0a48fc6e6 Optimize pipe event batching and listener stop (#17864) 
(#17885)
de0a48fc6e6 is described below

commit de0a48fc6e61874739aecc3e813515e47b935de0
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 10 10:39:26 2026 +0800

    Optimize pipe event batching and listener stop (#17864) (#17885)
---
 .../batch/PipeTransferBatchReqBuilder.java         | 43 +++++++++++++---------
 .../listener/PipeInsertionDataNodeListener.java    |  2 +-
 2 files changed, 26 insertions(+), 19 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 45264138596..52827ac0008 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -39,10 +39,10 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_KEY;
@@ -84,8 +84,7 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
   // If the leader cache is enabled, the batch will be divided by the leader 
endpoint,
   // each endpoint has a batch.
   // This is only used in plain batch since tsfile does not return redirection 
info.
-  private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch =
-      new ConcurrentHashMap<>();
+  private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch = 
new HashMap<>();
 
   public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
     final boolean usingTsFileBatch =
@@ -178,22 +177,29 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
   public synchronized List<Pair<TEndPoint, PipeTabletEventBatch>>
       getAllNonEmptyAndShouldEmitBatches() {
     final List<Pair<TEndPoint, PipeTabletEventBatch>> 
nonEmptyAndShouldEmitBatches =
-        new ArrayList<>();
+        new ArrayList<>(endPointToBatch.size() + 1);
     if (!defaultBatch.isEmpty() && defaultBatch.shouldEmit()) {
       nonEmptyAndShouldEmitBatches.add(new Pair<>(null, defaultBatch));
     }
-    endPointToBatch.forEach(
-        (endPoint, batch) -> {
-          if (!batch.isEmpty() && batch.shouldEmit()) {
-            nonEmptyAndShouldEmitBatches.add(new Pair<>(endPoint, batch));
-          }
-        });
+    for (final Map.Entry<TEndPoint, PipeTabletEventPlainBatch> entry : 
endPointToBatch.entrySet()) {
+      final PipeTabletEventPlainBatch batch = entry.getValue();
+      if (!batch.isEmpty() && batch.shouldEmit()) {
+        nonEmptyAndShouldEmitBatches.add(new Pair<>(entry.getKey(), batch));
+      }
+    }
     return nonEmptyAndShouldEmitBatches;
   }
 
-  public boolean isEmpty() {
-    return defaultBatch.isEmpty()
-        && 
endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
+  public synchronized boolean isEmpty() {
+    if (!defaultBatch.isEmpty()) {
+      return false;
+    }
+    for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) {
+      if (!batch.isEmpty()) {
+        return false;
+      }
+    }
+    return true;
   }
 
   public synchronized void discardEventsOfPipe(
@@ -206,12 +212,13 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
     endPointToBatch.values().forEach(batch -> 
batch.discardEventsOfPipe(committerKey));
   }
 
-  public int size() {
+  public synchronized int size() {
     try {
-      return defaultBatch.events.size()
-          + endPointToBatch.values().stream()
-              .map(batch -> batch.events.size())
-              .reduce(0, Integer::sum);
+      int size = defaultBatch.events.size();
+      for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) {
+        size += batch.events.size();
+      }
+      return size;
     } catch (final Exception e) {
       LOGGER.warn(
           "Failed to get the size of PipeTransferBatchReqBuilder, return 0. 
Exception: {}",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index ad3586df830..9345d257462 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -63,7 +63,7 @@ public class PipeInsertionDataNodeListener {
         });
   }
 
-  public synchronized void stopListenAndAssign(
+  public void stopListenAndAssign(
       final String dataRegionId, final PipeRealtimeDataRegionSource source) {
     PipeDataRegionAssigner assignerToClose = null;
 

Reply via email to