This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5caeeb3e3b7 Pipe: automatically start pipe upon creation & allow to 
start RUNNING pipe & allow to stop STOPPED pipe  (#11690)
5caeeb3e3b7 is described below

commit 5caeeb3e3b7b19a9a06b54f25c9d02e4f39d4f7e
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Dec 12 14:46:18 2023 +0800

    Pipe: automatically start pipe upon creation & allow to start RUNNING pipe 
& allow to stop STOPPED pipe  (#11690)
---
 .../iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java     | 22 ++++++++++-------
 .../apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java    |  2 +-
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 28 ++++++++++++++--------
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  2 ++
 .../impl/pipe/task/StartPipeProcedureV2.java       | 19 ++++++++++++++-
 .../impl/pipe/task/StopPipeProcedureV2.java        | 19 ++++++++++++++-
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |  6 ++---
 7 files changed, 74 insertions(+), 24 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
index 6c5838488ca..da1b7cbca30 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
@@ -83,11 +83,11 @@ public class IoTDBPipeSwitchStatusIT extends 
AbstractPipeDualIT {
 
       List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
       Assert.assertTrue(
-          showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("STOPPED")));
+          showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("RUNNING")));
       Assert.assertTrue(
-          showPipeResult.stream().anyMatch((o) -> o.id.equals("p2") && 
o.state.equals("STOPPED")));
+          showPipeResult.stream().anyMatch((o) -> o.id.equals("p2") && 
o.state.equals("RUNNING")));
       Assert.assertTrue(
-          showPipeResult.stream().anyMatch((o) -> o.id.equals("p3") && 
o.state.equals("STOPPED")));
+          showPipeResult.stream().anyMatch((o) -> o.id.equals("p3") && 
o.state.equals("RUNNING")));
 
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
@@ -148,9 +148,8 @@ public class IoTDBPipeSwitchStatusIT extends 
AbstractPipeDualIT {
 
       List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
       Assert.assertTrue(
-          showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("STOPPED")));
+          showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("RUNNING")));
 
-      Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.stopPipe("p1").getCode());
       status =
           client.createPipe(
               new TCreatePipeReq("p1", connectorAttributes)
@@ -168,10 +167,17 @@ public class IoTDBPipeSwitchStatusIT extends 
AbstractPipeDualIT {
           showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("RUNNING")));
 
       Assert.assertEquals(
-          TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.startPipe("p1").getCode());
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.stopPipe("p1").getCode());
       showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
       Assert.assertTrue(
-          showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("RUNNING")));
+          showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("STOPPED")));
+      Assert.assertEquals(1, showPipeResult.stream().filter((o) -> 
o.id.equals("p1")).count());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.stopPipe("p1").getCode());
+      showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+      Assert.assertTrue(
+          showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("STOPPED")));
       Assert.assertEquals(1, showPipeResult.stream().filter((o) -> 
o.id.equals("p1")).count());
     }
   }
@@ -262,7 +268,7 @@ public class IoTDBPipeSwitchStatusIT extends 
AbstractPipeDualIT {
       Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.startPipe("*").getCode());
       List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
       Assert.assertTrue(
-          showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("STOPPED")));
+          showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("RUNNING")));
 
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
index 58e248dc251..fe0be803332 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
@@ -80,7 +80,7 @@ public class IoTDBPipeSyntaxIT extends AbstractPipeDualIT {
       for (String pipeName : validPipeNames) {
         Assert.assertTrue(
             showPipeResult.stream()
-                .anyMatch((o) -> o.id.equals(pipeName) && 
o.state.equals("STOPPED")));
+                .anyMatch((o) -> o.id.equals(pipeName) && 
o.state.equals("RUNNING")));
       }
 
       for (String pipeName : validPipeNames) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index f6e0003695b..a71f7deb2d3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -158,16 +158,18 @@ public class PipeTaskInfo implements SnapshotProcessor {
     throw new PipeException(exceptionMessage);
   }
 
-  public void checkBeforeStartPipe(String pipeName) throws PipeException {
+  /** @return true if the pipe status is RUNNING before starting the pipe */
+  public boolean checkBeforeStartPipe(String pipeName) throws PipeException {
     acquireReadLock();
     try {
-      checkBeforeStartPipeInternal(pipeName);
+      return checkBeforeStartPipeInternal(pipeName);
     } finally {
       releaseReadLock();
     }
   }
 
-  private void checkBeforeStartPipeInternal(String pipeName) throws 
PipeException {
+  /** @return true if the pipe status is RUNNING before starting the pipe */
+  private boolean checkBeforeStartPipeInternal(String pipeName) throws 
PipeException {
     if (!isPipeExisted(pipeName)) {
       final String exceptionMessage =
           String.format("Failed to start pipe %s, the pipe does not exist", 
pipeName);
@@ -179,8 +181,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
     if (pipeStatus == PipeStatus.RUNNING) {
       final String exceptionMessage =
           String.format("Failed to start pipe %s, the pipe is already 
running", pipeName);
-      LOGGER.info(exceptionMessage);
-      throw new PipeException(exceptionMessage);
+      LOGGER.warn(exceptionMessage);
+      return true;
     }
     if (pipeStatus == PipeStatus.DROPPED) {
       final String exceptionMessage =
@@ -188,18 +190,22 @@ public class PipeTaskInfo implements SnapshotProcessor {
       LOGGER.info(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
+
+    return false;
   }
 
-  public void checkBeforeStopPipe(String pipeName) throws PipeException {
+  /** @return true if the pipe status is STOPPED before stopping the pipe */
+  public boolean checkBeforeStopPipe(String pipeName) throws PipeException {
     acquireReadLock();
     try {
-      checkBeforeStopPipeInternal(pipeName);
+      return checkBeforeStopPipeInternal(pipeName);
     } finally {
       releaseReadLock();
     }
   }
 
-  private void checkBeforeStopPipeInternal(String pipeName) throws 
PipeException {
+  /** @return true if the pipe status is STOPPED before stopping the pipe */
+  private boolean checkBeforeStopPipeInternal(String pipeName) throws 
PipeException {
     if (!isPipeExisted(pipeName)) {
       final String exceptionMessage =
           String.format("Failed to stop pipe %s, the pipe does not exist", 
pipeName);
@@ -211,8 +217,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
     if (pipeStatus == PipeStatus.STOPPED) {
       final String exceptionMessage =
           String.format("Failed to stop pipe %s, the pipe is already stop", 
pipeName);
-      LOGGER.info(exceptionMessage);
-      throw new PipeException(exceptionMessage);
+      LOGGER.warn(exceptionMessage);
+      return true;
     }
     if (pipeStatus == PipeStatus.DROPPED) {
       final String exceptionMessage =
@@ -220,6 +226,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
       LOGGER.info(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
+
+    return false;
   }
 
   public void checkBeforeDropPipe(String pipeName) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 9eac9be52d3..2e3e9b568b7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.schema.SchemaConstant;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
@@ -120,6 +121,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
               }
             });
     pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
+    pipeRuntimeMeta.getStatus().set(PipeStatus.RUNNING);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index 5b43ac83d8d..d07634ee8c1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -44,6 +44,12 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
 
   private String pipeName;
 
+  // This variable is used to record whether the pipe status is RUNNING and to 
determine whether to
+  // skip this procedure.
+  //
+  // Pure in-memory object, not involved in snapshot serialization and 
deserialization.
+  private boolean canSkipSubsequentStages;
+
   public StartPipeProcedureV2() {
     super();
   }
@@ -51,6 +57,7 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   public StartPipeProcedureV2(String pipeName) throws PipeException {
     super();
     this.pipeName = pipeName;
+    this.canSkipSubsequentStages = false;
   }
 
   @Override
@@ -62,7 +69,7 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws 
PipeException {
     LOGGER.info("StartPipeProcedureV2: executeFromValidateTask({})", pipeName);
 
-    pipeTaskInfo.get().checkBeforeStartPipe(pipeName);
+    canSkipSubsequentStages = 
pipeTaskInfo.get().checkBeforeStartPipe(pipeName);
   }
 
   @Override
@@ -76,6 +83,11 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       throws PipeException {
     LOGGER.info("StartPipeProcedureV2: 
executeFromWriteConfigNodeConsensus({})", pipeName);
 
+    if (canSkipSubsequentStages) {
+      LOGGER.warn("Pipe status is RUNNING, skip 
executeFromWriteConfigNodeConsensus({})", pipeName);
+      return;
+    }
+
     TSStatus response;
     try {
       response =
@@ -96,6 +108,11 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
     LOGGER.info("StartPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
+    if (canSkipSubsequentStages) {
+      LOGGER.warn("Pipe status is RUNNING, skip 
executeFromOperateOnDataNodes({})", pipeName);
+      return;
+    }
+
     String exceptionMessage =
         parsePushPipeMetaExceptionForPipe(pipeName, 
pushSinglePipeMetaToDataNodes(pipeName, env));
     if (!exceptionMessage.isEmpty()) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index 3ac00496a00..7e8cef01877 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -44,6 +44,12 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
 
   private String pipeName;
 
+  // This variable is used to record whether the pipe status is STOPPED and to 
determine whether to
+  // skip this procedure.
+  //
+  // Pure in-memory object, not involved in snapshot serialization and 
deserialization.
+  private boolean canSkipSubsequentStages;
+
   public StopPipeProcedureV2() {
     super();
   }
@@ -51,6 +57,7 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   public StopPipeProcedureV2(String pipeName) throws PipeException {
     super();
     this.pipeName = pipeName;
+    this.canSkipSubsequentStages = false;
   }
 
   @Override
@@ -62,7 +69,7 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws 
PipeException {
     LOGGER.info("StopPipeProcedureV2: executeFromValidateTask({})", pipeName);
 
-    pipeTaskInfo.get().checkBeforeStopPipe(pipeName);
+    canSkipSubsequentStages = pipeTaskInfo.get().checkBeforeStopPipe(pipeName);
   }
 
   @Override
@@ -76,6 +83,11 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       throws PipeException {
     LOGGER.info("StopPipeProcedureV2: 
executeFromWriteConfigNodeConsensus({})", pipeName);
 
+    if (canSkipSubsequentStages) {
+      LOGGER.warn("Pipe status is STOPPED, skip 
executeFromWriteConfigNodeConsensus({})", pipeName);
+      return;
+    }
+
     TSStatus response;
     try {
       response =
@@ -96,6 +108,11 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
     LOGGER.info("StopPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
+    if (canSkipSubsequentStages) {
+      LOGGER.warn("Pipe status is STOPPED, skip 
executeFromOperateOnDataNodes({})", pipeName);
+      return;
+    }
+
     String exceptionMessage =
         parsePushPipeMetaExceptionForPipe(pipeName, 
pushSinglePipeMetaToDataNodes(pipeName, env));
     if (!exceptionMessage.isEmpty()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 1ab5eb7cc81..c1f6170e85a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -66,9 +66,9 @@ import java.util.stream.Collectors;
  * State transition diagram of a pipe task:
  *
  * <p><code>
- * |----------------|                     |---------| --> start pipe --> 
|---------|                   |---------|
- * | initial status | --> create pipe --> | STOPPED |                    | 
RUNNING | --> drop pipe --> | DROPPED |
- * |----------------|                     |---------| <-- stop  pipe <-- 
|---------|                   |---------|
+ * |----------------|                     |---------| --> stop  pipe --> 
|---------|                   |---------|
+ * | initial status | --> create pipe --> | RUNNING |                    | 
STOPPED | --> drop pipe --> | DROPPED |
+ * |----------------|                     |---------| <-- start pipe <-- 
|---------|                   |---------|
  *                                             |                               
                             |
  *                                             | ----------------------> drop 
pipe -----------------------> |
  * </code>

Reply via email to