This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6532d297bca Optimize pipe event batching and listener stop (#17864)
6532d297bca is described below

commit 6532d297bca61aee8272476e04c9fcecec2e787b
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 12:13:21 2026 +0800

    Optimize pipe event batching and listener stop (#17864)
---
 .../evolvable/batch/PipeTabletEventPlainBatch.java | 22 ++++++-----
 .../batch/PipeTransferBatchReqBuilder.java         | 43 +++++++++++++---------
 .../listener/PipeInsertionDataNodeListener.java    |  2 +-
 3 files changed, 39 insertions(+), 28 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index 34837424b98..b32479e2f1a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -47,7 +47,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Objects;
 
 public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
 
@@ -105,14 +105,18 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
           insertTablets.getValue().entrySet()) {
         // needCopyFlag and tablet
         final List<Pair<Boolean, Tablet>> batchTablets = new ArrayList<>();
+        final int totalRowSize = tabletEntry.getValue().getLeft();
         for (final Tablet tablet : tabletEntry.getValue().getRight()) {
           boolean success = false;
           for (final Pair<Boolean, Tablet> tabletPair : batchTablets) {
+            if (!canAppendTablet(tabletPair.getRight(), tablet)) {
+              continue;
+            }
             if (tabletPair.getLeft()) {
               tabletPair.setRight(copyTablet(tabletPair.getRight()));
               tabletPair.setLeft(Boolean.FALSE);
             }
-            if (tabletPair.getRight().append(tablet, 
tabletEntry.getValue().getLeft())) {
+            if (tabletPair.getRight().append(tablet, totalRowSize)) {
               success = true;
               break;
             }
@@ -203,21 +207,21 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
   }
 
   private long constructTabletBatch(final Tablet tablet, final String 
databaseName) {
-    final AtomicLong size = new AtomicLong(0);
     final Pair<Integer, List<Tablet>> currentBatch =
         tableModelTabletMap
-            .computeIfAbsent(
-                databaseName,
-                k -> {
-                  size.addAndGet(RamUsageEstimator.sizeOf(databaseName));
-                  return new HashMap<>();
-                })
+            .computeIfAbsent(databaseName, k -> new HashMap<>())
             .computeIfAbsent(tablet.getTableName(), k -> new Pair<>(0, new 
ArrayList<>()));
     currentBatch.setLeft(currentBatch.getLeft() + tablet.getRowSize());
     currentBatch.getRight().add(tablet);
     return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 4;
   }
 
+  private static boolean canAppendTablet(final Tablet target, final Tablet 
source) {
+    return Objects.equals(target.getDeviceId(), source.getDeviceId())
+        && Objects.equals(target.getSchemas(), source.getSchemas())
+        && Objects.equals(target.getColumnTypes(), source.getColumnTypes());
+  }
+
   public static Tablet copyTablet(final Tablet tablet) {
     final Object[] copiedValues = new Object[tablet.getValues().length];
     for (int i = 0; i < tablet.getValues().length; i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index b3a8884a146..ce70cf6f6e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -40,10 +40,10 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_KEY;
@@ -85,8 +85,7 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
   // If the leader cache is enabled, the batch will be divided by the leader 
endpoint,
   // each endpoint has a batch.
   // This is only used in plain batch since tsfile does not return redirection 
info.
-  private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch =
-      new ConcurrentHashMap<>();
+  private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch = 
new HashMap<>();
 
   public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
     final boolean usingTsFileBatch =
@@ -182,22 +181,29 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
   public synchronized List<Pair<TEndPoint, PipeTabletEventBatch>>
       getAllNonEmptyAndShouldEmitBatches() {
     final List<Pair<TEndPoint, PipeTabletEventBatch>> 
nonEmptyAndShouldEmitBatches =
-        new ArrayList<>();
+        new ArrayList<>(endPointToBatch.size() + 1);
     if (!defaultBatch.isEmpty() && defaultBatch.shouldEmit()) {
       nonEmptyAndShouldEmitBatches.add(new Pair<>(null, defaultBatch));
     }
-    endPointToBatch.forEach(
-        (endPoint, batch) -> {
-          if (!batch.isEmpty() && batch.shouldEmit()) {
-            nonEmptyAndShouldEmitBatches.add(new Pair<>(endPoint, batch));
-          }
-        });
+    for (final Map.Entry<TEndPoint, PipeTabletEventPlainBatch> entry : 
endPointToBatch.entrySet()) {
+      final PipeTabletEventPlainBatch batch = entry.getValue();
+      if (!batch.isEmpty() && batch.shouldEmit()) {
+        nonEmptyAndShouldEmitBatches.add(new Pair<>(entry.getKey(), batch));
+      }
+    }
     return nonEmptyAndShouldEmitBatches;
   }
 
-  public boolean isEmpty() {
-    return defaultBatch.isEmpty()
-        && 
endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
+  public synchronized boolean isEmpty() {
+    if (!defaultBatch.isEmpty()) {
+      return false;
+    }
+    for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) {
+      if (!batch.isEmpty()) {
+        return false;
+      }
+    }
+    return true;
   }
 
   public synchronized void discardEventsOfPipe(
@@ -210,12 +216,13 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
     endPointToBatch.values().forEach(batch -> 
batch.discardEventsOfPipe(committerKey));
   }
 
-  public int size() {
+  public synchronized int size() {
     try {
-      return defaultBatch.events.size()
-          + endPointToBatch.values().stream()
-              .map(batch -> batch.events.size())
-              .reduce(0, Integer::sum);
+      int size = defaultBatch.events.size();
+      for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) {
+        size += batch.events.size();
+      }
+      return size;
     } catch (final Exception e) {
       LOGGER.warn(
           
DataNodePipeMessages.FAILED_TO_GET_THE_SIZE_OF_PIPETRANSFERBATCHREQBUILDER,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 76c001b2696..7e3c25e8062 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -66,7 +66,7 @@ public class PipeInsertionDataNodeListener {
         });
   }
 
-  public synchronized void stopListenAndAssign(
+  public void stopListenAndAssign(
       final int dataRegionId, final PipeRealtimeDataRegionSource source) {
     PipeDataRegionAssigner assignerToClose = null;
 

Reply via email to