This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new de0a48fc6e6 Optimize pipe event batching and listener stop (#17864)
(#17885)
de0a48fc6e6 is described below
commit de0a48fc6e61874739aecc3e813515e47b935de0
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 10 10:39:26 2026 +0800
Optimize pipe event batching and listener stop (#17864) (#17885)
---
.../batch/PipeTransferBatchReqBuilder.java | 43 +++++++++++++---------
.../listener/PipeInsertionDataNodeListener.java | 2 +-
2 files changed, 26 insertions(+), 19 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 45264138596..52827ac0008 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -39,10 +39,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_KEY;
@@ -84,8 +84,7 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
// If the leader cache is enabled, the batch will be divided by the leader
endpoint,
// each endpoint has a batch.
// This is only used in plain batch since tsfile does not return redirection
info.
- private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch =
- new ConcurrentHashMap<>();
+ private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch =
new HashMap<>();
public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
final boolean usingTsFileBatch =
@@ -178,22 +177,29 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
public synchronized List<Pair<TEndPoint, PipeTabletEventBatch>>
getAllNonEmptyAndShouldEmitBatches() {
final List<Pair<TEndPoint, PipeTabletEventBatch>>
nonEmptyAndShouldEmitBatches =
- new ArrayList<>();
+ new ArrayList<>(endPointToBatch.size() + 1);
if (!defaultBatch.isEmpty() && defaultBatch.shouldEmit()) {
nonEmptyAndShouldEmitBatches.add(new Pair<>(null, defaultBatch));
}
- endPointToBatch.forEach(
- (endPoint, batch) -> {
- if (!batch.isEmpty() && batch.shouldEmit()) {
- nonEmptyAndShouldEmitBatches.add(new Pair<>(endPoint, batch));
- }
- });
+ for (final Map.Entry<TEndPoint, PipeTabletEventPlainBatch> entry :
endPointToBatch.entrySet()) {
+ final PipeTabletEventPlainBatch batch = entry.getValue();
+ if (!batch.isEmpty() && batch.shouldEmit()) {
+ nonEmptyAndShouldEmitBatches.add(new Pair<>(entry.getKey(), batch));
+ }
+ }
return nonEmptyAndShouldEmitBatches;
}
- public boolean isEmpty() {
- return defaultBatch.isEmpty()
- &&
endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
+ public synchronized boolean isEmpty() {
+ if (!defaultBatch.isEmpty()) {
+ return false;
+ }
+ for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) {
+ if (!batch.isEmpty()) {
+ return false;
+ }
+ }
+ return true;
}
public synchronized void discardEventsOfPipe(
@@ -206,12 +212,13 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
endPointToBatch.values().forEach(batch ->
batch.discardEventsOfPipe(committerKey));
}
- public int size() {
+ public synchronized int size() {
try {
- return defaultBatch.events.size()
- + endPointToBatch.values().stream()
- .map(batch -> batch.events.size())
- .reduce(0, Integer::sum);
+ int size = defaultBatch.events.size();
+ for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) {
+ size += batch.events.size();
+ }
+ return size;
} catch (final Exception e) {
LOGGER.warn(
"Failed to get the size of PipeTransferBatchReqBuilder, return 0.
Exception: {}",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index ad3586df830..9345d257462 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -63,7 +63,7 @@ public class PipeInsertionDataNodeListener {
});
}
- public synchronized void stopListenAndAssign(
+ public void stopListenAndAssign(
final String dataRegionId, final PipeRealtimeDataRegionSource source) {
PipeDataRegionAssigner assignerToClose = null;