This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d0928eb15f6 Pipe: stop pipe using restarting strategy to unpin the 
wal's reference count to avoid WAL stacking (#11971)
d0928eb15f6 is described below

commit d0928eb15f68e21d85218520864cd07e92e22ab6
Author: Itami Sho <[email protected]>
AuthorDate: Tue Jan 30 20:37:24 2024 +0800

    Pipe: stop pipe using restarting strategy to unpin the wal's reference 
count to avoid WAL stacking (#11971)
---
 .../commons/pipe/agent/task/PipeTaskAgent.java     | 31 +++++++++-------------
 1 file changed, 13 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 3b7bf7712f7..9d8996053e0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -555,34 +555,29 @@ public abstract class PipeTaskAgent {
   }
 
   protected void stopPipe(String pipeName, long creationTime) {
-    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
-
-    if (!checkBeforeStopPipe(existedPipeMeta, pipeName, creationTime)) {
-      return;
-    }
+    final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
 
-    // Get pipe tasks
-    final Map<TConsensusGroupId, PipeTask> pipeTasks =
-        pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
-    if (pipeTasks == null) {
+    if (!checkBeforeStopPipe(pipeMeta, pipeName, creationTime)) {
       LOGGER.info(
-          "Pipe {} (creation time = {}) has already been dropped or has not 
been created. "
-              + "Skip stopping.",
-          pipeName,
-          creationTime);
+          "Stop Pipe: Pipe {} has already been dropped or has not been 
created. Skip stopping.",
+          pipeName);
       return;
     }
 
-    // Trigger stop() method for each pipe task by parallel stream
+    // 1. Drop the pipe task
     final long startTime = System.currentTimeMillis();
-    pipeTasks.values().parallelStream().forEach(PipeTask::stop);
+    handleDropPipeInternal(pipeMeta.getStaticMeta().getPipeName());
+
+    // 2. Set pipe meta status to STOPPED
+    pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
+
+    // 3. create a new pipe with the same pipeMeta
+    createPipe(pipeMeta);
+
     LOGGER.info(
         "Stop all pipe tasks on Pipe {} successfully within {} ms",
         pipeName,
         System.currentTimeMillis() - startTime);
-
-    // Set pipe meta status to STOPPED
-    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
   }
 
   ////////////////////////// Checker //////////////////////////

Reply via email to