This is an automated email from the ASF dual-hosted git repository. JackieTien97 pushed a commit to branch rc/2.0.10 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2afbe178f4301f6dbe8164d957bbc0492c929eb8 Author: Caideyipi <[email protected]> AuthorDate: Tue Jun 9 12:13:21 2026 +0800 Optimize pipe event batching and listener stop (#17864) --- .../evolvable/batch/PipeTabletEventPlainBatch.java | 22 ++++++----- .../batch/PipeTransferBatchReqBuilder.java | 43 +++++++++++++--------- .../listener/PipeInsertionDataNodeListener.java | 2 +- 3 files changed, 39 insertions(+), 28 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java index 34837424b98..b32479e2f1a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java @@ -47,7 +47,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; +import java.util.Objects; public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { @@ -105,14 +105,18 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { insertTablets.getValue().entrySet()) { // needCopyFlag and tablet final List<Pair<Boolean, Tablet>> batchTablets = new ArrayList<>(); + final int totalRowSize = tabletEntry.getValue().getLeft(); for (final Tablet tablet : tabletEntry.getValue().getRight()) { boolean success = false; for (final Pair<Boolean, Tablet> tabletPair : batchTablets) { + if (!canAppendTablet(tabletPair.getRight(), tablet)) { + continue; + } if (tabletPair.getLeft()) { tabletPair.setRight(copyTablet(tabletPair.getRight())); tabletPair.setLeft(Boolean.FALSE); } - if (tabletPair.getRight().append(tablet, tabletEntry.getValue().getLeft())) { + if (tabletPair.getRight().append(tablet, totalRowSize)) { success = true; break; } @@ -203,21 +207,21 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { } private long constructTabletBatch(final Tablet tablet, final String databaseName) { - final AtomicLong size = new AtomicLong(0); final Pair<Integer, List<Tablet>> currentBatch = tableModelTabletMap - .computeIfAbsent( - databaseName, - k -> { - size.addAndGet(RamUsageEstimator.sizeOf(databaseName)); - return new HashMap<>(); - }) + .computeIfAbsent(databaseName, k -> new HashMap<>()) .computeIfAbsent(tablet.getTableName(), k -> new Pair<>(0, new ArrayList<>())); currentBatch.setLeft(currentBatch.getLeft() + tablet.getRowSize()); currentBatch.getRight().add(tablet); return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 4; } + private static boolean canAppendTablet(final Tablet target, final Tablet source) { + return Objects.equals(target.getDeviceId(), source.getDeviceId()) + && Objects.equals(target.getSchemas(), source.getSchemas()) + && Objects.equals(target.getColumnTypes(), source.getColumnTypes()); + } + public static Tablet copyTablet(final Tablet tablet) { final Object[] copiedValues = new Object[tablet.getValues().length]; for (int i = 0; i < tablet.getValues().length; i++) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java index b3a8884a146..ce70cf6f6e3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java @@ -40,10 +40,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_KEY; @@ -85,8 +85,7 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable { // If the leader cache is enabled, the batch will be divided by the leader endpoint, // each endpoint has a batch. // This is only used in plain batch since tsfile does not return redirection info. - private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch = - new ConcurrentHashMap<>(); + private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch = new HashMap<>(); public PipeTransferBatchReqBuilder(final PipeParameters parameters) { final boolean usingTsFileBatch = @@ -182,22 +181,29 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable { public synchronized List<Pair<TEndPoint, PipeTabletEventBatch>> getAllNonEmptyAndShouldEmitBatches() { final List<Pair<TEndPoint, PipeTabletEventBatch>> nonEmptyAndShouldEmitBatches = - new ArrayList<>(); + new ArrayList<>(endPointToBatch.size() + 1); if (!defaultBatch.isEmpty() && defaultBatch.shouldEmit()) { nonEmptyAndShouldEmitBatches.add(new Pair<>(null, defaultBatch)); } - endPointToBatch.forEach( - (endPoint, batch) -> { - if (!batch.isEmpty() && batch.shouldEmit()) { - nonEmptyAndShouldEmitBatches.add(new Pair<>(endPoint, batch)); - } - }); + for (final Map.Entry<TEndPoint, PipeTabletEventPlainBatch> entry : endPointToBatch.entrySet()) { + final PipeTabletEventPlainBatch batch = entry.getValue(); + if (!batch.isEmpty() && batch.shouldEmit()) { + nonEmptyAndShouldEmitBatches.add(new Pair<>(entry.getKey(), batch)); + } + } return nonEmptyAndShouldEmitBatches; } - public boolean isEmpty() { - return defaultBatch.isEmpty() - && endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty); + public synchronized boolean isEmpty() { + if (!defaultBatch.isEmpty()) { + return false; + } + for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) { + if (!batch.isEmpty()) { + return false; + } + } + return true; } public synchronized void discardEventsOfPipe( @@ -210,12 +216,13 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable { endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(committerKey)); } - public int size() { + public synchronized int size() { try { - return defaultBatch.events.size() - + endPointToBatch.values().stream() - .map(batch -> batch.events.size()) - .reduce(0, Integer::sum); + int size = defaultBatch.events.size(); + for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) { + size += batch.events.size(); + } + return size; } catch (final Exception e) { LOGGER.warn( DataNodePipeMessages.FAILED_TO_GET_THE_SIZE_OF_PIPETRANSFERBATCHREQBUILDER, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java index 76c001b2696..7e3c25e8062 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java @@ -66,7 +66,7 @@ public class PipeInsertionDataNodeListener { }); } - public synchronized void stopListenAndAssign( + public void stopListenAndAssign( final int dataRegionId, final PipeRealtimeDataRegionSource source) { PipeDataRegionAssigner assignerToClose = null;
