This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch pipe-serialize-sink-by-region
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3789288816534be05e68810fe3d7abb8d61be655
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 09:51:02 2026 +0800

    Pipe: serialize sink transfers by region
---
 .../manual/enhanced/IoTDBPipeSinkParallelIT.java   |  1 +
 .../auto/basic/IoTDBPipeSinkParallelIT.java        |  1 +
 .../task/subtask/sink/PipeSinkSubtaskManager.java  | 46 ++++++++++-----
 .../thrift/async/IoTDBDataRegionAsyncSink.java     | 49 +++++++++++++++-
 .../apache/iotdb/db/pipe/sink/PipeSinkTest.java    | 65 ++++++++++++++++++++++
 .../pipe/config/constant/PipeSinkConstant.java     |  4 ++
 .../commons/pipe/sink/protocol/IoTDBSink.java      | 11 ++++
 7 files changed, 159 insertions(+), 18 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java
index 85f3b93aa76..ae466349d72 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java
@@ -80,6 +80,7 @@ public class IoTDBPipeSinkParallelIT extends 
AbstractPipeTableModelDualManualIT
       connectorAttributes.put("connector.batch.enable", "true");
       connectorAttributes.put("connector.ip", receiverIp);
       connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      connectorAttributes.put("connector.serialize-by-region", "false");
       connectorAttributes.put("connector.parallel.tasks", "3");
 
       final TSStatus status =
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java
index c21e6456ef1..df8e27a8546 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java
@@ -71,6 +71,7 @@ public class IoTDBPipeSinkParallelIT extends 
AbstractPipeDualTreeModelAutoIT {
       sinkAttributes.put("sink.batch.enable", "false");
       sinkAttributes.put("sink.ip", receiverIp);
       sinkAttributes.put("sink.port", Integer.toString(receiverPort));
+      sinkAttributes.put("sink.serialize-by-region", "false");
       sinkAttributes.put("sink.parallel.tasks", "3");
 
       final TSStatus status =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 01552eec5ae..a82e9432c09 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -93,30 +93,42 @@ public class PipeSinkSubtaskManager {
 
     final int sinkNum;
     boolean realTimeFirst = false;
+    boolean serializeByRegion = false;
     String attributeSortedString = 
generateAttributeSortedString(pipeSinkParameters);
     final String attributeDisplayString = 
generateAttributeDisplayString(pipeSinkParameters);
     if (isDataRegionSink) {
-      sinkNum =
-          pipeSinkParameters.getIntOrDefault(
+      serializeByRegion =
+          pipeSinkParameters.getBooleanOrDefault(
               Arrays.asList(
-                  PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
-                  PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
-              PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(
-                      pipeSinkParameters
-                          .getStringOrDefault(
-                              Arrays.asList(
-                                  PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
-                              
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
-                          .toLowerCase())
-                  ? 1
-                  : 
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
+                  PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY,
+                  PipeSinkConstant.SINK_SERIALIZE_BY_REGION_KEY),
+              PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_DEFAULT_VALUE);
+      sinkNum =
+          serializeByRegion
+              ? 1
+              : pipeSinkParameters.getIntOrDefault(
+                  Arrays.asList(
+                      PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
+                      PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
+                  PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(
+                          pipeSinkParameters
+                              .getStringOrDefault(
+                                  Arrays.asList(
+                                      PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
+                                  
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
+                              .toLowerCase())
+                      ? 1
+                      : 
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
       realTimeFirst =
           pipeSinkParameters.getBooleanOrDefault(
               Arrays.asList(
                   PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
                   PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
               PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
-      attributeSortedString = "data_" + attributeSortedString;
+      attributeSortedString =
+          serializeByRegion
+              ? "data_region_" + environment.getRegionId() + "_" + 
attributeSortedString
+              : "data_" + attributeSortedString;
     } else {
       // Do not allow parallel tasks for schema region connectors
       // to avoid the potential disorder of the schema region data transfer
@@ -124,7 +136,11 @@ public class PipeSinkSubtaskManager {
       attributeSortedString = "schema_" + attributeSortedString;
     }
     final String attributeDisplayStringWithPrefix =
-        isDataRegionSink ? "data_" + attributeDisplayString : "schema_" + 
attributeDisplayString;
+        isDataRegionSink
+            ? serializeByRegion
+                ? "data_region_" + environment.getRegionId() + "_" + 
attributeDisplayString
+                : "data_" + attributeDisplayString
+            : "schema_" + attributeDisplayString;
     environment.setAttributeSortedString(attributeDisplayStringWithPrefix);
 
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index b8b169b1f6a..4db4942139b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -129,6 +129,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
   private final Map<PipeTransferTrackableHandler, 
PipeTransferTrackableHandler> pendingHandlers =
       new ConcurrentHashMap<>();
+  private final Object pendingHandlersMonitor = new Object();
 
   private final Set<CommitterKey> droppedPipeTaskKeys = 
ConcurrentHashMap.newKeySet();
 
@@ -207,6 +208,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
 
   @Override
   public void heartbeat() throws Exception {
+    waitForNoPendingHandlerIfNecessary();
     if (!isClosed()) {
       syncSink.heartbeat();
     }
@@ -214,6 +216,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
 
   @Override
   public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
+    waitForNoPendingHandlerIfNecessary();
     transferQueuedEventsIfNecessary(false);
 
     if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
@@ -346,6 +349,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
       final PipeTransferTabletBatchEventHandler 
pipeTransferTabletBatchEventHandler) {
     AsyncPipeDataTransferServiceClient client = null;
     try {
+      waitForNoPendingHandlerIfNecessary();
       client = clientManager.borrowClient(endPoint);
       pipeTransferTabletBatchEventHandler.transfer(client);
     } catch (final Exception ex) {
@@ -359,6 +363,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
       final PipeTransferTabletInsertNodeEventHandler 
pipeTransferInsertNodeReqHandler) {
     AsyncPipeDataTransferServiceClient client = null;
     try {
+      waitForNoPendingHandlerIfNecessary();
       client = clientManager.borrowClient(deviceId);
       pipeTransferInsertNodeReqHandler.transfer(client);
     } catch (final Exception ex) {
@@ -371,6 +376,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
       final String deviceId, final PipeTransferTabletRawEventHandler 
pipeTransferTabletReqHandler) {
     AsyncPipeDataTransferServiceClient client = null;
     try {
+      waitForNoPendingHandlerIfNecessary();
       client = clientManager.borrowClient(deviceId);
       pipeTransferTabletReqHandler.transfer(client);
     } catch (final Exception ex) {
@@ -381,6 +387,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
 
   @Override
   public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
+    waitForNoPendingHandlerIfNecessary();
     transferQueuedEventsIfNecessary(false);
     transferBatchedEventsIfNecessary();
 
@@ -442,6 +449,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   }
 
   private void transfer(final PipeTransferTsFileHandler 
pipeTransferTsFileHandler) {
+    waitForNoPendingHandlerIfNecessary();
     transferTsFileCounter.incrementAndGet();
     CompletableFuture<Void> completableFuture =
         CompletableFuture.supplyAsync(
@@ -460,7 +468,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
             },
             transferTsFileClientManager.getExecutor());
 
-    if (PipeConfig.getInstance().isTransferTsFileSync()) {
+    if (isSerializeByRegion || 
PipeConfig.getInstance().isTransferTsFileSync()) {
       try {
         completableFuture.get();
       } catch (final Exception e) {
@@ -493,6 +501,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
 
   @Override
   public void transfer(final Event event) throws Exception {
+    waitForNoPendingHandlerIfNecessary();
     transferQueuedEventsIfNecessary(true);
     transferBatchedEventsIfNecessary();
 
@@ -506,6 +515,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
       return;
     }
 
+    waitForNoPendingHandlerIfNecessary();
     syncSink.transfer(event);
   }
 
@@ -620,6 +630,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
         }
       }
 
+      waitForNoPendingHandlerIfNecessary();
+
       // Stop retrying if the execution time exceeds the threshold for better 
realtime performance
       if (System.currentTimeMillis() - retryStartTime
           > 
PipeConfig.getInstance().getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall()) {
@@ -798,6 +810,9 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   // synchronized to avoid close connector when transfer event
   public synchronized void close() {
     isClosed.set(true);
+    synchronized (pendingHandlersMonitor) {
+      pendingHandlersMonitor.notifyAll();
+    }
 
     syncSink.close();
 
@@ -866,7 +881,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   }
 
   public void trackHandler(final PipeTransferTrackableHandler handler) {
-    pendingHandlers.put(handler, handler);
+    synchronized (pendingHandlersMonitor) {
+      pendingHandlers.put(handler, handler);
+      pendingHandlersMonitor.notifyAll();
+    }
   }
 
   public void eliminateHandler(
@@ -875,13 +893,38 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
       handler.closeClient();
     }
     handler.close();
-    pendingHandlers.remove(handler);
+    synchronized (pendingHandlersMonitor) {
+      pendingHandlers.remove(handler);
+      pendingHandlersMonitor.notifyAll();
+    }
   }
 
   public boolean hasPendingHandlers() {
     return !pendingHandlers.isEmpty();
   }
 
+  public boolean isSerializeByRegion() {
+    return isSerializeByRegion;
+  }
+
+  private void waitForNoPendingHandlerIfNecessary() {
+    if (!isSerializeByRegion) {
+      return;
+    }
+
+    synchronized (pendingHandlersMonitor) {
+      while (!isClosed.get() && !pendingHandlers.isEmpty()) {
+        try {
+          pendingHandlersMonitor.wait();
+        } catch (final InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new PipeException(
+              "Interrupted when waiting for previous async sink transfer to 
finish.", e);
+        }
+      }
+    }
+  }
+
   public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) {
     this.transferTsFileCounter = transferTsFileCounter;
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
index cf311639ee9..00d2a40aa0f 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
@@ -109,6 +109,71 @@ public class PipeSinkTest {
     }
   }
 
+  @Test
+  public void testAsyncSinkSerializeByRegionConfig() throws Exception {
+    try (IoTDBDataRegionAsyncSink connector = new IoTDBDataRegionAsyncSink()) {
+      final PipeParameters parameters =
+          new PipeParameters(
+              new HashMap<String, String>() {
+                {
+                  put(
+                      PipeSinkConstant.CONNECTOR_KEY,
+                      
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
+                  put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, 
"127.0.0.1:6668");
+                }
+              });
+
+      connector.validate(new PipeParameterValidator(parameters));
+      connector.customize(
+          parameters,
+          new PipeTaskRuntimeConfiguration(new 
PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1)));
+
+      Assert.assertTrue(connector.isSerializeByRegion());
+    }
+
+    try (IoTDBDataRegionAsyncSink connector = new IoTDBDataRegionAsyncSink()) {
+      final PipeParameters parameters =
+          new PipeParameters(
+              new HashMap<String, String>() {
+                {
+                  put(
+                      PipeSinkConstant.CONNECTOR_KEY,
+                      
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
+                  put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, 
"127.0.0.1:6668");
+                  put(PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY, 
"false");
+                }
+              });
+
+      connector.validate(new PipeParameterValidator(parameters));
+      connector.customize(
+          parameters,
+          new PipeTaskRuntimeConfiguration(new 
PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1)));
+
+      Assert.assertFalse(connector.isSerializeByRegion());
+    }
+
+    try (IoTDBDataRegionAsyncSink connector = new IoTDBDataRegionAsyncSink()) {
+      final PipeParameters parameters =
+          new PipeParameters(
+              new HashMap<String, String>() {
+                {
+                  put(
+                      PipeSinkConstant.CONNECTOR_KEY,
+                      
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
+                  put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, 
"127.0.0.1:6668");
+                  put(PipeSinkConstant.SINK_SERIALIZE_BY_REGION_KEY, "false");
+                }
+              });
+
+      connector.validate(new PipeParameterValidator(parameters));
+      connector.customize(
+          parameters,
+          new PipeTaskRuntimeConfiguration(new 
PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1)));
+
+      Assert.assertFalse(connector.isSerializeByRegion());
+    }
+  }
+
   @Test
   public void testAsyncSinkDropDoesNotRequeueDroppedPipeEvents() throws 
Exception {
     try (final IoTDBDataRegionAsyncSink connector = new 
IoTDBDataRegionAsyncSink()) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index 058b17e2f4f..4fd0f1e82b8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -69,6 +69,10 @@ public class PipeSinkConstant {
   public static final String SINK_REALTIME_FIRST_KEY = "sink.realtime-first";
   public static final boolean CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE = true;
 
+  public static final String CONNECTOR_SERIALIZE_BY_REGION_KEY = 
"connector.serialize-by-region";
+  public static final String SINK_SERIALIZE_BY_REGION_KEY = 
"sink.serialize-by-region";
+  public static final boolean CONNECTOR_SERIALIZE_BY_REGION_DEFAULT_VALUE = 
true;
+
   public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = 
"connector.batch.enable";
   public static final String SINK_IOTDB_BATCH_MODE_ENABLE_KEY = 
"sink.batch.enable";
   public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE 
= true;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index b5662aeec2c..1ea8410c0e0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -192,6 +192,7 @@ public abstract class IoTDBSink implements PipeConnector, 
PipeConnectorWithEvent
   protected String sinkTaskId;
   protected Timer compressionTimer;
   protected boolean isRealtimeFirst;
+  protected boolean isSerializeByRegion;
 
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
@@ -498,6 +499,16 @@ public abstract class IoTDBSink implements PipeConnector, 
PipeConnectorWithEvent
             PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
     LOGGER.info(
         "IoTDBSink {} = {}", PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, 
isRealtimeFirst);
+    isSerializeByRegion =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(
+                PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY,
+                PipeSinkConstant.SINK_SERIALIZE_BY_REGION_KEY),
+            PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_DEFAULT_VALUE);
+    LOGGER.info(
+        "IoTDBSink {} = {}",
+        PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY,
+        isSerializeByRegion);
   }
 
   protected LinkedHashSet<TEndPoint> parseNodeUrls(final PipeParameters 
parameters)

Reply via email to