This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new aeeb0baf3a0 Pipe: Utilize parallelStream for concurrent execution of
create, start, stop, and drop pipe tasks to enhance performance (#11892)
aeeb0baf3a0 is described below
commit aeeb0baf3a035bdec355a034c3cd1fc0d29de43a
Author: V_Galaxy <[email protected]>
AuthorDate: Sun Jan 14 19:04:01 2024 +0800
Pipe: Utilize parallelStream for concurrent execution of create, start,
stop, and drop pipe tasks to enhance performance (#11892)
Currently, when creating a pipe, if there is historical data in the
cluster, the pipe will be automatically started. This involves **serially**
extracting historical data in each data region. When dealing with large data,
timeouts may occur, leading to pipe creation failure. To address this, we are
considering parallelizing the above operations using `parallelStream` (inspired
by @SteveYurongSu).
---
.../iotdb/db/pipe/task/PipeDataNodeTask.java | 30 ++++++++++
.../commons/pipe/agent/task/PipeTaskAgent.java | 66 +++++++++++++++-------
2 files changed, 76 insertions(+), 20 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java
index 677cdec9744..86f9462bdcc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java
@@ -23,8 +23,13 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.pipe.task.PipeTask;
import org.apache.iotdb.commons.pipe.task.stage.PipeTaskStage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class PipeDataNodeTask implements PipeTask {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeDataNodeTask.class);
+
protected final String pipeName;
protected final TConsensusGroupId regionId;
@@ -48,30 +53,50 @@ public class PipeDataNodeTask implements PipeTask {
@Override
public void create() {
+ final long startTime = System.currentTimeMillis();
extractorStage.create();
processorStage.create();
connectorStage.create();
+ LOGGER.info(
+ "Create pipe DN task {} successfully within {} ms",
+ this,
+ System.currentTimeMillis() - startTime);
}
@Override
public void drop() {
+ final long startTime = System.currentTimeMillis();
extractorStage.drop();
processorStage.drop();
connectorStage.drop();
+ LOGGER.info(
+ "Drop pipe DN task {} successfully within {} ms",
+ this,
+ System.currentTimeMillis() - startTime);
}
@Override
public void start() {
+ final long startTime = System.currentTimeMillis();
extractorStage.start();
processorStage.start();
connectorStage.start();
+ LOGGER.info(
+ "Start pipe DN task {} successfully within {} ms",
+ this,
+ System.currentTimeMillis() - startTime);
}
@Override
public void stop() {
+ final long startTime = System.currentTimeMillis();
extractorStage.stop();
processorStage.stop();
connectorStage.stop();
+ LOGGER.info(
+ "Stop pipe DN task {} successfully within {} ms",
+ this,
+ System.currentTimeMillis() - startTime);
}
public TConsensusGroupId getRegionId() {
@@ -81,4 +106,9 @@ public class PipeDataNodeTask implements PipeTask {
public String getPipeName() {
return pipeName;
}
+
+ @Override
+ public String toString() {
+ return pipeName + "@" + regionId;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 8bc4c33810e..79575a6bb17 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -408,11 +408,17 @@ public abstract class PipeTaskAgent {
dropPipe(pipeName, existedPipeMeta.getStaticMeta().getCreationTime());
}
- // Create pipe tasks and trigger create() method for each pipe task
+ // Create pipe tasks
final Map<TConsensusGroupId, PipeTask> pipeTasks =
buildPipeTasks(pipeMetaFromCoordinator);
- for (PipeTask pipeTask : pipeTasks.values()) {
- pipeTask.create();
- }
+
+ // Trigger create() method for each pipe task by parallel stream
+ final long startTime = System.currentTimeMillis();
+ pipeTasks.values().parallelStream().forEach(PipeTask::create);
+ LOGGER.info(
+ "Create all pipe tasks on Pipe {} successfully within {} ms",
+ pipeName,
+ System.currentTimeMillis() - startTime);
+
pipeTaskManager.addPipeTasks(pipeMetaFromCoordinator.getStaticMeta(),
pipeTasks);
// No matter the pipe status from coordinator is RUNNING or STOPPED, we
always set the status
@@ -444,7 +450,7 @@ public abstract class PipeTaskAgent {
// dropPipeTaskByConsensusGroup).
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
- // Drop pipe tasks and trigger drop() method for each pipe task
+ // Drop pipe tasks
final Map<TConsensusGroupId, PipeTask> pipeTasks =
pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
if (pipeTasks == null) {
@@ -455,9 +461,14 @@ public abstract class PipeTaskAgent {
creationTime);
return;
}
- for (PipeTask pipeTask : pipeTasks.values()) {
- pipeTask.drop();
- }
+
+ // Trigger drop() method for each pipe task by parallel stream
+ final long startTime = System.currentTimeMillis();
+ pipeTasks.values().parallelStream().forEach(PipeTask::drop);
+ LOGGER.info(
+ "Drop all pipe tasks on Pipe {} successfully within {} ms",
+ pipeName,
+ System.currentTimeMillis() - startTime);
// Remove pipe meta from pipe meta keeper
pipeMetaKeeper.removePipeMeta(pipeName);
@@ -475,7 +486,7 @@ public abstract class PipeTaskAgent {
// dropPipeTaskByConsensusGroup).
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
- // Drop pipe tasks and trigger drop() method for each pipe task
+ // Drop pipe tasks
final Map<TConsensusGroupId, PipeTask> pipeTasks =
pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
if (pipeTasks == null) {
@@ -483,9 +494,14 @@ public abstract class PipeTaskAgent {
"Pipe {} has already been dropped or has not been created. Skip
dropping.", pipeName);
return;
}
- for (PipeTask pipeTask : pipeTasks.values()) {
- pipeTask.drop();
- }
+
+ // Trigger drop() method for each pipe task by parallel stream
+ final long startTime = System.currentTimeMillis();
+ pipeTasks.values().parallelStream().forEach(PipeTask::drop);
+ LOGGER.info(
+ "Drop all pipe tasks on Pipe {} successfully within {} ms",
+ pipeName,
+ System.currentTimeMillis() - startTime);
// Remove pipe meta from pipe meta keeper
pipeMetaKeeper.removePipeMeta(pipeName);
@@ -498,7 +514,7 @@ public abstract class PipeTaskAgent {
return;
}
- // Trigger start() method for each pipe task
+ // Get pipe tasks
final Map<TConsensusGroupId, PipeTask> pipeTasks =
pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
if (pipeTasks == null) {
@@ -509,9 +525,14 @@ public abstract class PipeTaskAgent {
creationTime);
return;
}
- for (PipeTask pipeTask : pipeTasks.values()) {
- pipeTask.start();
- }
+
+ // Trigger start() method for each pipe task by parallel stream
+ final long startTime = System.currentTimeMillis();
+ pipeTasks.values().parallelStream().forEach(PipeTask::start);
+ LOGGER.info(
+ "Start all pipe tasks on Pipe {} successfully within {} ms",
+ pipeName,
+ System.currentTimeMillis() - startTime);
// Set pipe meta status to RUNNING
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
@@ -530,7 +551,7 @@ public abstract class PipeTaskAgent {
return;
}
- // Trigger stop() method for each pipe task
+ // Get pipe tasks
final Map<TConsensusGroupId, PipeTask> pipeTasks =
pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
if (pipeTasks == null) {
@@ -541,9 +562,14 @@ public abstract class PipeTaskAgent {
creationTime);
return;
}
- for (PipeTask pipeTask : pipeTasks.values()) {
- pipeTask.stop();
- }
+
+ // Trigger stop() method for each pipe task by parallel stream
+ final long startTime = System.currentTimeMillis();
+ pipeTasks.values().parallelStream().forEach(PipeTask::stop);
+ LOGGER.info(
+ "Stop all pipe tasks on Pipe {} successfully within {} ms",
+ pipeName,
+ System.currentTimeMillis() - startTime);
// Set pipe meta status to STOPPED
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);