This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new aeeb0baf3a0 Pipe: Utilize parallelStream for concurrent execution of 
create, start, stop, and drop pipe tasks to enhance performance (#11892)
aeeb0baf3a0 is described below

commit aeeb0baf3a035bdec355a034c3cd1fc0d29de43a
Author: V_Galaxy <[email protected]>
AuthorDate: Sun Jan 14 19:04:01 2024 +0800

    Pipe: Utilize parallelStream for concurrent execution of create, start, 
stop, and drop pipe tasks to enhance performance (#11892)
    
    Currently, when creating a pipe, if there is historical data in the 
cluster, the pipe will be automatically started. This involves **serially** 
extracting historical data in each data region. When dealing with large data, 
timeouts may occur, leading to pipe creation failure. To address this, we are 
considering parallelizing the above operations using `parallelStream` (inspired 
by @SteveYurongSu).
---
 .../iotdb/db/pipe/task/PipeDataNodeTask.java       | 30 ++++++++++
 .../commons/pipe/agent/task/PipeTaskAgent.java     | 66 +++++++++++++++-------
 2 files changed, 76 insertions(+), 20 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java
index 677cdec9744..86f9462bdcc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java
@@ -23,8 +23,13 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.task.PipeTask;
 import org.apache.iotdb.commons.pipe.task.stage.PipeTaskStage;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class PipeDataNodeTask implements PipeTask {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeDataNodeTask.class);
+
   protected final String pipeName;
   protected final TConsensusGroupId regionId;
 
@@ -48,30 +53,50 @@ public class PipeDataNodeTask implements PipeTask {
 
   @Override
   public void create() {
+    final long startTime = System.currentTimeMillis();
     extractorStage.create();
     processorStage.create();
     connectorStage.create();
+    LOGGER.info(
+        "Create pipe DN task {} successfully within {} ms",
+        this,
+        System.currentTimeMillis() - startTime);
   }
 
   @Override
   public void drop() {
+    final long startTime = System.currentTimeMillis();
     extractorStage.drop();
     processorStage.drop();
     connectorStage.drop();
+    LOGGER.info(
+        "Drop pipe DN task {} successfully within {} ms",
+        this,
+        System.currentTimeMillis() - startTime);
   }
 
   @Override
   public void start() {
+    final long startTime = System.currentTimeMillis();
     extractorStage.start();
     processorStage.start();
     connectorStage.start();
+    LOGGER.info(
+        "Start pipe DN task {} successfully within {} ms",
+        this,
+        System.currentTimeMillis() - startTime);
   }
 
   @Override
   public void stop() {
+    final long startTime = System.currentTimeMillis();
     extractorStage.stop();
     processorStage.stop();
     connectorStage.stop();
+    LOGGER.info(
+        "Stop pipe DN task {} successfully within {} ms",
+        this,
+        System.currentTimeMillis() - startTime);
   }
 
   public TConsensusGroupId getRegionId() {
@@ -81,4 +106,9 @@ public class PipeDataNodeTask implements PipeTask {
   public String getPipeName() {
     return pipeName;
   }
+
+  @Override
+  public String toString() {
+    return pipeName + "@" + regionId;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 8bc4c33810e..79575a6bb17 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -408,11 +408,17 @@ public abstract class PipeTaskAgent {
       dropPipe(pipeName, existedPipeMeta.getStaticMeta().getCreationTime());
     }
 
-    // Create pipe tasks and trigger create() method for each pipe task
+    // Create pipe tasks
     final Map<TConsensusGroupId, PipeTask> pipeTasks = 
buildPipeTasks(pipeMetaFromCoordinator);
-    for (PipeTask pipeTask : pipeTasks.values()) {
-      pipeTask.create();
-    }
+
+    // Trigger create() method for each pipe task by parallel stream
+    final long startTime = System.currentTimeMillis();
+    pipeTasks.values().parallelStream().forEach(PipeTask::create);
+    LOGGER.info(
+        "Create all pipe tasks on Pipe {} successfully within {} ms",
+        pipeName,
+        System.currentTimeMillis() - startTime);
+
     pipeTaskManager.addPipeTasks(pipeMetaFromCoordinator.getStaticMeta(), 
pipeTasks);
 
     // No matter the pipe status from coordinator is RUNNING or STOPPED, we 
always set the status
@@ -444,7 +450,7 @@ public abstract class PipeTaskAgent {
     // dropPipeTaskByConsensusGroup).
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
 
-    // Drop pipe tasks and trigger drop() method for each pipe task
+    // Drop pipe tasks
     final Map<TConsensusGroupId, PipeTask> pipeTasks =
         pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
     if (pipeTasks == null) {
@@ -455,9 +461,14 @@ public abstract class PipeTaskAgent {
           creationTime);
       return;
     }
-    for (PipeTask pipeTask : pipeTasks.values()) {
-      pipeTask.drop();
-    }
+
+    // Trigger drop() method for each pipe task by parallel stream
+    final long startTime = System.currentTimeMillis();
+    pipeTasks.values().parallelStream().forEach(PipeTask::drop);
+    LOGGER.info(
+        "Drop all pipe tasks on Pipe {} successfully within {} ms",
+        pipeName,
+        System.currentTimeMillis() - startTime);
 
     // Remove pipe meta from pipe meta keeper
     pipeMetaKeeper.removePipeMeta(pipeName);
@@ -475,7 +486,7 @@ public abstract class PipeTaskAgent {
     // dropPipeTaskByConsensusGroup).
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
 
-    // Drop pipe tasks and trigger drop() method for each pipe task
+    // Drop pipe tasks
     final Map<TConsensusGroupId, PipeTask> pipeTasks =
         pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
     if (pipeTasks == null) {
@@ -483,9 +494,14 @@ public abstract class PipeTaskAgent {
           "Pipe {} has already been dropped or has not been created. Skip 
dropping.", pipeName);
       return;
     }
-    for (PipeTask pipeTask : pipeTasks.values()) {
-      pipeTask.drop();
-    }
+
+    // Trigger drop() method for each pipe task by parallel stream
+    final long startTime = System.currentTimeMillis();
+    pipeTasks.values().parallelStream().forEach(PipeTask::drop);
+    LOGGER.info(
+        "Drop all pipe tasks on Pipe {} successfully within {} ms",
+        pipeName,
+        System.currentTimeMillis() - startTime);
 
     // Remove pipe meta from pipe meta keeper
     pipeMetaKeeper.removePipeMeta(pipeName);
@@ -498,7 +514,7 @@ public abstract class PipeTaskAgent {
       return;
     }
 
-    // Trigger start() method for each pipe task
+    // Get pipe tasks
     final Map<TConsensusGroupId, PipeTask> pipeTasks =
         pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
     if (pipeTasks == null) {
@@ -509,9 +525,14 @@ public abstract class PipeTaskAgent {
           creationTime);
       return;
     }
-    for (PipeTask pipeTask : pipeTasks.values()) {
-      pipeTask.start();
-    }
+
+    // Trigger start() method for each pipe task by parallel stream
+    final long startTime = System.currentTimeMillis();
+    pipeTasks.values().parallelStream().forEach(PipeTask::start);
+    LOGGER.info(
+        "Start all pipe tasks on Pipe {} successfully within {} ms",
+        pipeName,
+        System.currentTimeMillis() - startTime);
 
     // Set pipe meta status to RUNNING
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
@@ -530,7 +551,7 @@ public abstract class PipeTaskAgent {
       return;
     }
 
-    // Trigger stop() method for each pipe task
+    // Get pipe tasks
     final Map<TConsensusGroupId, PipeTask> pipeTasks =
         pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
     if (pipeTasks == null) {
@@ -541,9 +562,14 @@ public abstract class PipeTaskAgent {
           creationTime);
       return;
     }
-    for (PipeTask pipeTask : pipeTasks.values()) {
-      pipeTask.stop();
-    }
+
+    // Trigger stop() method for each pipe task by parallel stream
+    final long startTime = System.currentTimeMillis();
+    pipeTasks.values().parallelStream().forEach(PipeTask::stop);
+    LOGGER.info(
+        "Stop all pipe tasks on Pipe {} successfully within {} ms",
+        pipeName,
+        System.currentTimeMillis() - startTime);
 
     // Set pipe meta status to STOPPED
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);

Reply via email to