This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ef6dc0a8e14 Pipe: enhance the idempotence of start / stop pipe  
(#11713)
ef6dc0a8e14 is described below

commit ef6dc0a8e149cfaee705bc7795ada75f8c92025c
Author: V_Galaxy <[email protected]>
AuthorDate: Sat Dec 16 16:49:07 2023 +0800

    Pipe: enhance the idempotence of start / stop pipe  (#11713)
    
    As title. This commit is a follow-up to #11690, from now onwards, no-op 
start pipe and stop pipe will directly skip the entire procedure execution, 
avoiding the impact of rollback and other exceptional situations. At the same 
time, the procedure result is set to facilitate differentiation on CN side for 
identifying no-op start pipe and stop pipe.
    
    It is important to note that when starting a RUNNING pipe or stopping a 
STOPPED pipe, we need to consider the `isStoppedByRuntimeException` status of 
the pipe. Otherwise, certain procedures might be mistakenly skipped.
---
 .../iotdb/confignode/manager/ProcedureManager.java | 17 ++++---
 .../manager/pipe/task/PipeTaskCoordinator.java     | 10 +----
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 52 +++++++++++-----------
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  | 18 +++++++-
 .../runtime/PipeHandleLeaderChangeProcedure.java   |  3 +-
 .../runtime/PipeHandleMetaChangeProcedure.java     |  3 +-
 .../impl/pipe/runtime/PipeMetaSyncProcedure.java   |  3 +-
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  4 +-
 .../impl/pipe/task/DropPipeProcedureV2.java        |  4 +-
 .../impl/pipe/task/StartPipeProcedureV2.java       | 24 +++-------
 .../impl/pipe/task/StopPipeProcedureV2.java        | 24 +++-------
 11 files changed, 78 insertions(+), 84 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index af3adc40c99..d616b109660 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -105,6 +105,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -753,7 +754,7 @@ public class ProcedureManager {
       boolean isSucceed =
           waitingProcedureFinished(Collections.singletonList(procedureId), 
statusList);
       if (isSucceed) {
-        return RpcUtils.SUCCESS_STATUS;
+        return statusList.get(0);
       } else {
         return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
             .setMessage(statusList.get(0).getMessage());
@@ -770,7 +771,7 @@ public class ProcedureManager {
       boolean isSucceed =
           waitingProcedureFinished(Collections.singletonList(procedureId), 
statusList);
       if (isSucceed) {
-        return RpcUtils.SUCCESS_STATUS;
+        return statusList.get(0);
       } else {
         return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
             .setMessage(statusList.get(0).getMessage());
@@ -787,7 +788,7 @@ public class ProcedureManager {
       boolean isSucceed =
           waitingProcedureFinished(Collections.singletonList(procedureId), 
statusList);
       if (isSucceed) {
-        return RpcUtils.SUCCESS_STATUS;
+        return statusList.get(0);
       } else {
         return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
             .setMessage(statusList.get(0).getMessage());
@@ -804,7 +805,7 @@ public class ProcedureManager {
       boolean isSucceed =
           waitingProcedureFinished(Collections.singletonList(procedureId), 
statusList);
       if (isSucceed) {
-        return RpcUtils.SUCCESS_STATUS;
+        return statusList.get(0);
       } else {
         return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
             .setMessage(statusList.get(0).getMessage());
@@ -924,7 +925,13 @@ public class ProcedureManager {
         continue;
       }
       if (finishedProcedure.isSuccess()) {
-        statusList.add(StatusUtils.OK);
+        if (Objects.nonNull(finishedProcedure.getResult())) {
+          statusList.add(
+              RpcUtils.getStatus(
+                  TSStatusCode.SUCCESS_STATUS, 
Arrays.toString(finishedProcedure.getResult())));
+        } else {
+          statusList.add(StatusUtils.OK);
+        }
       } else {
         if (finishedProcedure.getException().getCause() instanceof 
IoTDBException) {
           IoTDBException e = (IoTDBException) 
finishedProcedure.getException().getCause();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
index f677fecb64d..e54710c167b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
@@ -120,15 +120,9 @@ public class PipeTaskCoordinator {
   public TSStatus stopPipe(String pipeName) {
     final boolean isStoppedByRuntimeException = 
pipeTaskInfo.isStoppedByRuntimeException(pipeName);
     final TSStatus status = 
configManager.getProcedureManager().stopPipe(pipeName);
-    if (status == RpcUtils.SUCCESS_STATUS) {
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       if (isStoppedByRuntimeException) {
-        // Under normal circumstances, this branch is not executed because when
-        // `isStoppedByRuntimeException` is true, the pipe status is most 
likely to be STOPPED,
-        // unless this method is called between the execution of 
`autoRestartWithLock` and
-        // `handleSuccessfulRestartWithLock` in `PipeMetaSyncer` (in this 
case, the pipe status is
-        // RUNNING and `isStoppedByRuntimeException` is true).
-
-        // Even if return status is success, it doesn't imply the success of 
the
+        // Even if the return status is success, it doesn't imply the success 
of the
         // `executeFromOperateOnDataNodes` phase of stopping pipe. However, we 
still need to set
         // `isStoppedByRuntimeException` to false to avoid auto-restart. 
Meanwhile,
         // `isStoppedByRuntimeException` does not need to be synchronized with 
DNs.
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index a71f7deb2d3..aa9a985973e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -158,18 +158,16 @@ public class PipeTaskInfo implements SnapshotProcessor {
     throw new PipeException(exceptionMessage);
   }
 
-  /** @return true if the pipe status is RUNNING before starting the pipe */
-  public boolean checkBeforeStartPipe(String pipeName) throws PipeException {
+  public void checkBeforeStartPipe(String pipeName) throws PipeException {
     acquireReadLock();
     try {
-      return checkBeforeStartPipeInternal(pipeName);
+      checkBeforeStartPipeInternal(pipeName);
     } finally {
       releaseReadLock();
     }
   }
 
-  /** @return true if the pipe status is RUNNING before starting the pipe */
-  private boolean checkBeforeStartPipeInternal(String pipeName) throws 
PipeException {
+  private void checkBeforeStartPipeInternal(String pipeName) throws 
PipeException {
     if (!isPipeExisted(pipeName)) {
       final String exceptionMessage =
           String.format("Failed to start pipe %s, the pipe does not exist", 
pipeName);
@@ -178,34 +176,24 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
 
     final PipeStatus pipeStatus = getPipeStatus(pipeName);
-    if (pipeStatus == PipeStatus.RUNNING) {
-      final String exceptionMessage =
-          String.format("Failed to start pipe %s, the pipe is already 
running", pipeName);
-      LOGGER.warn(exceptionMessage);
-      return true;
-    }
     if (pipeStatus == PipeStatus.DROPPED) {
       final String exceptionMessage =
           String.format("Failed to start pipe %s, the pipe is already 
dropped", pipeName);
       LOGGER.info(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
-
-    return false;
   }
 
-  /** @return true if the pipe status is STOPPED before stopping the pipe */
-  public boolean checkBeforeStopPipe(String pipeName) throws PipeException {
+  public void checkBeforeStopPipe(String pipeName) throws PipeException {
     acquireReadLock();
     try {
-      return checkBeforeStopPipeInternal(pipeName);
+      checkBeforeStopPipeInternal(pipeName);
     } finally {
       releaseReadLock();
     }
   }
 
-  /** @return true if the pipe status is STOPPED before stopping the pipe */
-  private boolean checkBeforeStopPipeInternal(String pipeName) throws 
PipeException {
+  private void checkBeforeStopPipeInternal(String pipeName) throws 
PipeException {
     if (!isPipeExisted(pipeName)) {
       final String exceptionMessage =
           String.format("Failed to stop pipe %s, the pipe does not exist", 
pipeName);
@@ -214,20 +202,12 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
 
     final PipeStatus pipeStatus = getPipeStatus(pipeName);
-    if (pipeStatus == PipeStatus.STOPPED) {
-      final String exceptionMessage =
-          String.format("Failed to stop pipe %s, the pipe is already stop", 
pipeName);
-      LOGGER.warn(exceptionMessage);
-      return true;
-    }
     if (pipeStatus == PipeStatus.DROPPED) {
       final String exceptionMessage =
           String.format("Failed to stop pipe %s, the pipe is already dropped", 
pipeName);
       LOGGER.info(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
-
-    return false;
   }
 
   public void checkBeforeDropPipe(String pipeName) {
@@ -269,6 +249,26 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
+  public boolean isPipeRunning(String pipeName) {
+    acquireReadLock();
+    try {
+      return pipeMetaKeeper.containsPipeMeta(pipeName)
+          && PipeStatus.RUNNING.equals(getPipeStatus(pipeName));
+    } finally {
+      releaseReadLock();
+    }
+  }
+
+  public boolean isPipeStopped(String pipeName) {
+    acquireReadLock();
+    try {
+      return pipeMetaKeeper.containsPipeMeta(pipeName)
+          && PipeStatus.STOPPED.equals(getPipeStatus(pipeName));
+    } finally {
+      releaseReadLock();
+    }
+  }
+
   /////////////////////////////// Pipe Task Management 
///////////////////////////////
 
   public TSStatus createPipe(CreatePipePlanV2 plan) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 6fb364c6466..17ad3a4f4fc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -71,6 +72,9 @@ public abstract class AbstractOperatePipeProcedureV2
   // putting it here is just for convenience
   protected AtomicReference<PipeTaskInfo> pipeTaskInfo;
 
+  private static final String SKIP_PIPE_PROCEDURE_MESSAGE =
+      "Try to start a RUNNING pipe or stop a STOPPED pipe, do nothing.";
+
   @Override
   protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv 
configNodeProcedureEnv) {
     LOGGER.info("ProcedureId {} try to acquire pipe lock.", getProcId());
@@ -162,9 +166,12 @@ public abstract class AbstractOperatePipeProcedureV2
   /**
    * Execute at state {@link OperatePipeTaskState#VALIDATE_TASK}.
    *
+   * @return true if this procedure can skip subsequent stages (start RUNNING 
pipe or stop STOPPED
+   *     pipe without runtime exception)
    * @throws PipeException if validation for pipe parameters failed
    */
-  protected abstract void executeFromValidateTask(ConfigNodeProcedureEnv env) 
throws PipeException;
+  protected abstract boolean executeFromValidateTask(ConfigNodeProcedureEnv 
env)
+      throws PipeException;
 
   /** Execute at state {@link OperatePipeTaskState#CALCULATE_INFO_FOR_TASK}. */
   protected abstract void 
executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env);
@@ -199,7 +206,14 @@ public abstract class AbstractOperatePipeProcedureV2
     try {
       switch (state) {
         case VALIDATE_TASK:
-          executeFromValidateTask(env);
+          if (executeFromValidateTask(env)) {
+            LOGGER.warn("ProcedureId {}: {}", getProcId(), 
SKIP_PIPE_PROCEDURE_MESSAGE);
+            // On client side, the message returned after the successful 
execution of the pipe
+            // command corresponding to this procedure is "Msg: The statement 
is executed
+            // successfully."
+            
this.setResult(SKIP_PIPE_PROCEDURE_MESSAGE.getBytes(StandardCharsets.UTF_8));
+            return Flow.NO_MORE_STATE;
+          }
           setNextState(OperatePipeTaskState.CALCULATE_INFO_FOR_TASK);
           break;
         case CALCULATE_INFO_FOR_TASK:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 60626062808..a93d8fef580 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -67,10 +67,11 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
   }
 
   @Override
-  protected void executeFromValidateTask(ConfigNodeProcedureEnv env) {
+  protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
     LOGGER.info("PipeHandleLeaderChangeProcedure: executeFromValidateTask");
 
     // Nothing needs to be checked
+    return false;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 2e59aa287ca..9ab9c51d715 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -65,10 +65,11 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
   }
 
   @Override
-  protected void executeFromValidateTask(ConfigNodeProcedureEnv env) {
+  protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
     LOGGER.info("PipeHandleMetaChangeProcedure: executeFromValidateTask");
 
     // Do nothing
+    return false;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 4962c868eff..d1a68199c1d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -47,10 +47,11 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromValidateTask(ConfigNodeProcedureEnv env) {
+  protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
     LOGGER.info("PipeMetaSyncProcedure: executeFromValidateTask");
 
     // Do nothing
+    return false;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 2e3e9b568b7..cafc91e77ee 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -75,7 +75,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws 
PipeException {
+  protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws 
PipeException {
     LOGGER.info(
         "CreatePipeProcedureV2: executeFromValidateTask({})", 
createPipeRequest.getPipeName());
 
@@ -85,6 +85,8 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         .getPipePluginInfo()
         .checkBeforeCreatePipe(createPipeRequest);
     pipeTaskInfo.get().checkBeforeCreatePipe(createPipeRequest);
+
+    return false;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index 8958597b2a4..c2323fd76ec 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -58,10 +58,12 @@ public class DropPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws 
PipeException {
+  protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws 
PipeException {
     LOGGER.info("DropPipeProcedureV2: executeFromValidateTask({})", pipeName);
 
     pipeTaskInfo.get().checkBeforeDropPipe(pipeName);
+
+    return false;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index d07634ee8c1..4b5e6e740c5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -44,12 +44,6 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
 
   private String pipeName;
 
-  // This variable is used to record whether the pipe status is RUNNING and to 
determine whether to
-  // skip this procedure.
-  //
-  // Pure in-memory object, not involved in snapshot serialization and 
deserialization.
-  private boolean canSkipSubsequentStages;
-
   public StartPipeProcedureV2() {
     super();
   }
@@ -57,7 +51,6 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   public StartPipeProcedureV2(String pipeName) throws PipeException {
     super();
     this.pipeName = pipeName;
-    this.canSkipSubsequentStages = false;
   }
 
   @Override
@@ -66,10 +59,13 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws 
PipeException {
+  protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws 
PipeException {
     LOGGER.info("StartPipeProcedureV2: executeFromValidateTask({})", pipeName);
 
-    canSkipSubsequentStages = 
pipeTaskInfo.get().checkBeforeStartPipe(pipeName);
+    pipeTaskInfo.get().checkBeforeStartPipe(pipeName);
+
+    return pipeTaskInfo.get().isPipeRunning(pipeName)
+        && !pipeTaskInfo.get().isStoppedByRuntimeException(pipeName);
   }
 
   @Override
@@ -83,11 +79,6 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       throws PipeException {
     LOGGER.info("StartPipeProcedureV2: 
executeFromWriteConfigNodeConsensus({})", pipeName);
 
-    if (canSkipSubsequentStages) {
-      LOGGER.warn("Pipe status is RUNNING, skip 
executeFromWriteConfigNodeConsensus({})", pipeName);
-      return;
-    }
-
     TSStatus response;
     try {
       response =
@@ -108,11 +99,6 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
     LOGGER.info("StartPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
-    if (canSkipSubsequentStages) {
-      LOGGER.warn("Pipe status is RUNNING, skip 
executeFromOperateOnDataNodes({})", pipeName);
-      return;
-    }
-
     String exceptionMessage =
         parsePushPipeMetaExceptionForPipe(pipeName, 
pushSinglePipeMetaToDataNodes(pipeName, env));
     if (!exceptionMessage.isEmpty()) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index 7e8cef01877..2a88b4ffe73 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -44,12 +44,6 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
 
   private String pipeName;
 
-  // This variable is used to record whether the pipe status is STOPPED and to 
determine whether to
-  // skip this procedure.
-  //
-  // Pure in-memory object, not involved in snapshot serialization and 
deserialization.
-  private boolean canSkipSubsequentStages;
-
   public StopPipeProcedureV2() {
     super();
   }
@@ -57,7 +51,6 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   public StopPipeProcedureV2(String pipeName) throws PipeException {
     super();
     this.pipeName = pipeName;
-    this.canSkipSubsequentStages = false;
   }
 
   @Override
@@ -66,10 +59,13 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws 
PipeException {
+  protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws 
PipeException {
     LOGGER.info("StopPipeProcedureV2: executeFromValidateTask({})", pipeName);
 
-    canSkipSubsequentStages = pipeTaskInfo.get().checkBeforeStopPipe(pipeName);
+    pipeTaskInfo.get().checkBeforeStopPipe(pipeName);
+
+    return pipeTaskInfo.get().isPipeStopped(pipeName)
+        && !pipeTaskInfo.get().isStoppedByRuntimeException(pipeName);
   }
 
   @Override
@@ -83,11 +79,6 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       throws PipeException {
     LOGGER.info("StopPipeProcedureV2: 
executeFromWriteConfigNodeConsensus({})", pipeName);
 
-    if (canSkipSubsequentStages) {
-      LOGGER.warn("Pipe status is STOPPED, skip 
executeFromWriteConfigNodeConsensus({})", pipeName);
-      return;
-    }
-
     TSStatus response;
     try {
       response =
@@ -108,11 +99,6 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
     LOGGER.info("StopPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
-    if (canSkipSubsequentStages) {
-      LOGGER.warn("Pipe status is STOPPED, skip 
executeFromOperateOnDataNodes({})", pipeName);
-      return;
-    }
-
     String exceptionMessage =
         parsePushPipeMetaExceptionForPipe(pipeName, 
pushSinglePipeMetaToDataNodes(pipeName, env));
     if (!exceptionMessage.isEmpty()) {

Reply via email to