This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d0928eb15f6 Pipe: stop pipe using restarting strategy to unpin the
wal's reference count to avoid WAL stacking (#11971)
d0928eb15f6 is described below
commit d0928eb15f68e21d85218520864cd07e92e22ab6
Author: Itami Sho <[email protected]>
AuthorDate: Tue Jan 30 20:37:24 2024 +0800
Pipe: stop pipe using restarting strategy to unpin the wal's reference
count to avoid WAL stacking (#11971)
---
.../commons/pipe/agent/task/PipeTaskAgent.java | 31 +++++++++-------------
1 file changed, 13 insertions(+), 18 deletions(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 3b7bf7712f7..9d8996053e0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -555,34 +555,29 @@ public abstract class PipeTaskAgent {
}
protected void stopPipe(String pipeName, long creationTime) {
- final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
-
- if (!checkBeforeStopPipe(existedPipeMeta, pipeName, creationTime)) {
- return;
- }
+ final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
- // Get pipe tasks
- final Map<TConsensusGroupId, PipeTask> pipeTasks =
- pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
- if (pipeTasks == null) {
+ if (!checkBeforeStopPipe(pipeMeta, pipeName, creationTime)) {
LOGGER.info(
- "Pipe {} (creation time = {}) has already been dropped or has not
been created. "
- + "Skip stopping.",
- pipeName,
- creationTime);
+ "Stop Pipe: Pipe {} has already been dropped or has not been
created. Skip stopping.",
+ pipeName);
return;
}
- // Trigger stop() method for each pipe task by parallel stream
+ // 1. Drop the pipe task
final long startTime = System.currentTimeMillis();
- pipeTasks.values().parallelStream().forEach(PipeTask::stop);
+ handleDropPipeInternal(pipeMeta.getStaticMeta().getPipeName());
+
+ // 2. Set pipe meta status to STOPPED
+ pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
+
+ // 3. create a new pipe with the same pipeMeta
+ createPipe(pipeMeta);
+
LOGGER.info(
"Stop all pipe tasks on Pipe {} successfully within {} ms",
pipeName,
System.currentTimeMillis() - startTime);
-
- // Set pipe meta status to STOPPED
- existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
}
////////////////////////// Checker //////////////////////////