This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b754b03ea0 [IOTDB-5787] PipeTaskAgent: Pipe task management on data 
nodes (#9782)
b754b03ea0 is described below

commit b754b03ea081fda17e0bda9d27d8b7c177374613
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon May 8 09:03:00 2023 +0800

    [IOTDB-5787] PipeTaskAgent: Pipe task management on data nodes (#9782)
---
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  28 ++-
 .../pipe/task/AbstractOperatePipeProcedureV2.java  |   7 +-
 .../impl/pipe/task/CreatePipeProcedureV2.java      |   6 +-
 .../commons/pipe/task/meta/PipeStaticMeta.java     |  20 +-
 .../org/apache/iotdb/db/pipe/agent/PipeAgent.java  |   2 +-
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 270 ++++++++++++++++++++-
 6 files changed, 307 insertions(+), 26 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index c3975a817d..fa8c1ae9cd 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -81,11 +81,17 @@ public class PipeTaskInfo implements SnapshotProcessor {
       return false;
     }
 
-    if (getPipeStatus(pipeName) == PipeStatus.RUNNING) {
+    final PipeStatus pipeStatus = getPipeStatus(pipeName);
+    if (pipeStatus == PipeStatus.RUNNING) {
       LOGGER.info(
           String.format("Failed to start pipe [%s], the pipe is already 
running", pipeName));
       return false;
     }
+    if (pipeStatus == PipeStatus.DROPPED) {
+      LOGGER.info(
+          String.format("Failed to start pipe [%s], the pipe is already 
dropped", pipeName));
+      return false;
+    }
 
     return true;
   }
@@ -96,21 +102,29 @@ public class PipeTaskInfo implements SnapshotProcessor {
       return false;
     }
 
-    if (getPipeStatus(pipeName) == PipeStatus.STOPPED) {
+    final PipeStatus pipeStatus = getPipeStatus(pipeName);
+    if (pipeStatus == PipeStatus.STOPPED) {
       LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already 
stop", pipeName));
       return false;
     }
+    if (pipeStatus == PipeStatus.DROPPED) {
+      LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already 
dropped", pipeName));
+      return false;
+    }
 
     return true;
   }
 
   public boolean checkBeforeDropPipe(String pipeName) {
-    if (isPipeExisted(pipeName)) {
-      return true;
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Check before drop pipe {}, pipe exists: {}.",
+          pipeName,
+          isPipeExisted(pipeName) ? "true" : "false");
     }
-
-    LOGGER.info(String.format("Failed to drop pipe [%s], the pipe does not 
exist", pipeName));
-    return false;
+    // no matter whether the pipe exists, we allow the drop operation executed 
on all nodes to
+    // ensure the consistency
+    return true;
   }
 
   private boolean isPipeExisted(String pipeName) {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
index 0d9f98101f..7b2f5cac3a 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
@@ -129,8 +129,11 @@ abstract class AbstractOperatePipeProcedureV2 extends 
AbstractNodeProcedure<Oper
       throws IOException, InterruptedException, ProcedureException {
     switch (state) {
       case VALIDATE_TASK:
-        rollbackFromValidateTask(env);
-        
env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+        try {
+          rollbackFromValidateTask(env);
+        } finally {
+          
env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+        }
         break;
       case CALCULATE_INFO_FOR_TASK:
         rollbackFromCalculateInfoForTask(env);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 995b2a977d..d81a88fe53 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -103,9 +103,9 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         .getLoadManager()
         .getRegionLeaderMap()
         .forEach(
-            (region, leader) -> {
-              consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0, 
leader));
-            });
+            (region, leader) ->
+                // TODO: make index configurable
+                consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0, 
leader)));
     pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
   }
 
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index 14185dd8a5..eab7408386 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -32,7 +32,7 @@ import java.util.Map;
 public class PipeStaticMeta {
 
   private String pipeName;
-  private long createTime;
+  private long creationTime;
 
   private PipeParameters collectorParameters;
   private PipeParameters processorParameters;
@@ -42,12 +42,12 @@ public class PipeStaticMeta {
 
   public PipeStaticMeta(
       String pipeName,
-      long createTime,
+      long creationTime,
       Map<String, String> collectorAttributes,
       Map<String, String> processorAttributes,
       Map<String, String> connectorAttributes) {
     this.pipeName = pipeName.toUpperCase();
-    this.createTime = createTime;
+    this.creationTime = creationTime;
     collectorParameters = new PipeParameters(collectorAttributes);
     processorParameters = new PipeParameters(processorAttributes);
     connectorParameters = new PipeParameters(connectorAttributes);
@@ -57,8 +57,8 @@ public class PipeStaticMeta {
     return pipeName;
   }
 
-  public long getCreateTime() {
-    return createTime;
+  public long getCreationTime() {
+    return creationTime;
   }
 
   public PipeParameters getCollectorParameters() {
@@ -82,7 +82,7 @@ public class PipeStaticMeta {
 
   public void serialize(DataOutputStream outputStream) throws IOException {
     ReadWriteIOUtils.write(pipeName, outputStream);
-    ReadWriteIOUtils.write(createTime, outputStream);
+    ReadWriteIOUtils.write(creationTime, outputStream);
 
     outputStream.writeInt(collectorParameters.getAttribute().size());
     for (Map.Entry<String, String> entry : 
collectorParameters.getAttribute().entrySet()) {
@@ -110,7 +110,7 @@ public class PipeStaticMeta {
     final PipeStaticMeta pipeStaticMeta = new PipeStaticMeta();
 
     pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(byteBuffer);
-    pipeStaticMeta.createTime = ReadWriteIOUtils.readLong(byteBuffer);
+    pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(byteBuffer);
 
     pipeStaticMeta.collectorParameters = new PipeParameters(new HashMap<>());
     pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>());
@@ -151,7 +151,7 @@ public class PipeStaticMeta {
     }
     PipeStaticMeta that = (PipeStaticMeta) obj;
     return pipeName.equals(that.pipeName)
-        && createTime == that.createTime
+        && creationTime == that.creationTime
         && collectorParameters.equals(that.collectorParameters)
         && processorParameters.equals(that.processorParameters)
         && connectorParameters.equals(that.connectorParameters);
@@ -168,8 +168,8 @@ public class PipeStaticMeta {
         + "pipeName='"
         + pipeName
         + '\''
-        + ", createTime="
-        + createTime
+        + ", creationTime="
+        + creationTime
         + ", collectorParameters="
         + collectorParameters.getAttribute()
         + ", processorParameters="
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
index f490a171f1..501cbf0016 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.pipe.agent.plugin.PipePluginAgent;
 import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
 import org.apache.iotdb.db.pipe.agent.task.PipeTaskAgent;
 
-/** PipeAgent is the entry point of the pipe module in DatNode. */
+/** PipeAgent is the entry point of the pipe module in DataNode. */
 public class PipeAgent {
 
   private final PipePluginAgent pipePluginAgent;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 9fffa86706..c0df1e1761 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -19,19 +19,283 @@
 
 package org.apache.iotdb.db.pipe.agent.task;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * State transition diagram of a pipe task:
+ *
+ * <p><code>
+ * |----------------|                     |---------| --> start pipe --> 
|---------|                   |---------|
+ * | initial status | --> create pipe --> | STOPPED |                    | 
RUNNING | --> drop pipe --> | DROPPED |
+ * |----------------|                     |---------| <-- stop  pipe <-- 
|---------|                   |---------|
+ *                                             |                               
                             |
+ *                                             | ----------------------> drop 
pipe -----------------------> |
+ * </code>
+ *
+ * <p>Other transitions are not allowed, will be ignored when received in the 
pipe task agent.
+ */
 public class PipeTaskAgent {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTaskAgent.class);
+
   private final PipeMetaKeeper pipeMetaKeeper;
 
   public PipeTaskAgent() {
     pipeMetaKeeper = new PipeMetaKeeper();
   }
 
-  // TODO: remove this method
-  public PipeMeta getPipeMeta(String pipeName) {
-    return pipeMetaKeeper.getPipeMeta(pipeName);
+  ////////////////////////// Pipe Task Management //////////////////////////
+
+  public void createPipe(PipeMeta pipeMeta) {
+    final String pipeName = pipeMeta.getStaticMeta().getPipeName();
+    final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
+
+    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+    if (existedPipeMeta != null) {
+      if (existedPipeMeta.getStaticMeta().getCreationTime() == creationTime) {
+        switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+          case STOPPED:
+          case RUNNING:
+            LOGGER.info(
+                "Pipe {} (creation time = {}) has already been created. 
Current status = {}. Skip creating.",
+                pipeName,
+                creationTime,
+                existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+            return;
+          case DROPPED:
+            LOGGER.info(
+                "Pipe {} (creation time = {}) has already been dropped, but 
the pipe task meta has not been cleaned up. "
+                    + "Current status = {}. Try dropping the pipe and 
recreating it.",
+                pipeName,
+                creationTime,
+                existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+            // break to drop the pipe and recreate it
+            break;
+          default:
+            throw new IllegalStateException(
+                "Unexpected status: " + 
existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        }
+      }
+
+      // drop the pipe if
+      // 1. the pipe with the same name but with different creation time has 
been created before
+      // 2. the pipe with the same name and the same creation time has been 
dropped before, but the
+      //  pipe task meta has not been cleaned up
+      dropPipe(pipeName, existedPipeMeta.getStaticMeta().getCreationTime());
+    }
+
+    // build pipe task by consensus group
+    pipeMeta
+        .getRuntimeMeta()
+        .getConsensusGroupIdToTaskMetaMap()
+        .forEach(
+            ((consensusGroupId, pipeTaskMeta) -> {
+              createPipeTaskByConsensusGroup(
+                  pipeName, creationTime, consensusGroupId, pipeTaskMeta);
+            }));
+    // add pipe meta to pipe meta keeper
+    // note that we do not need to set the status of pipe meta here, because 
the status of pipe meta
+    // is already set to STOPPED when it is created
+    pipeMetaKeeper.addPipeMeta(pipeName, pipeMeta);
+  }
+
+  public void createPipeTaskByConsensusGroup(
+      String pipeName,
+      long creationTime,
+      TConsensusGroupId consensusGroupId,
+      PipeTaskMeta pipeTaskMeta) {}
+
+  public void dropPipe(String pipeName, long creationTime) {
+    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+    if (existedPipeMeta == null) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has already been dropped or has not 
been created. Skip dropping.",
+          pipeName,
+          creationTime);
+      return;
+    }
+    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has been created but does not match 
the creation time ({}) in dropPipe request. Skip dropping.",
+          pipeName,
+          existedPipeMeta.getStaticMeta().getCreationTime(),
+          creationTime);
+      return;
+    }
+
+    // mark pipe meta as dropped first. this will help us detect if the pipe 
meta has been dropped
+    // but the pipe task meta has not been cleaned up (in case of failure when 
executing
+    // dropPipeTaskByConsensusGroup).
+    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
+    // drop pipe task by consensus group
+    existedPipeMeta
+        .getRuntimeMeta()
+        .getConsensusGroupIdToTaskMetaMap()
+        .forEach(
+            ((consensusGroupId, pipeTaskMeta) -> {
+              dropPipeTaskByConsensusGroup(pipeName, creationTime, 
consensusGroupId);
+            }));
+    // remove pipe meta from pipe meta keeper
+    pipeMetaKeeper.removePipeMeta(pipeName);
+  }
+
+  public void dropPipeTaskByConsensusGroup(
+      String pipeName, long creationTime, TConsensusGroupId consensusGroupId) 
{}
+
+  public void dropPipe(String pipeName) {
+    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+    if (existedPipeMeta == null) {
+      LOGGER.info(
+          "Pipe {} has already been dropped or has not been created. Skip 
dropping.", pipeName);
+      return;
+    }
+
+    // mark pipe meta as dropped first. this will help us detect if the pipe 
meta has been dropped
+    // but the pipe task meta has not been cleaned up (in case of failure when 
executing
+    // dropPipeTaskByConsensusGroup).
+    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
+    // drop pipe task by consensus group
+    existedPipeMeta
+        .getRuntimeMeta()
+        .getConsensusGroupIdToTaskMetaMap()
+        .forEach(
+            ((consensusGroupId, pipeTaskMeta) -> {
+              dropPipeTaskByConsensusGroup(pipeName, consensusGroupId);
+            }));
+    // remove pipe meta from pipe meta keeper
+    pipeMetaKeeper.removePipeMeta(pipeName);
+  }
+
+  public void dropPipeTaskByConsensusGroup(String pipeName, TConsensusGroupId 
consensusGroupId) {}
+
+  public void startPipe(String pipeName, long creationTime) {
+    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+    if (existedPipeMeta == null) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has already been dropped or has not 
been created. Skip starting.",
+          pipeName,
+          creationTime);
+      return;
+    }
+    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has been created but does not match 
the creation time ({}) in startPipe request. Skip starting.",
+          pipeName,
+          existedPipeMeta.getStaticMeta().getCreationTime(),
+          creationTime);
+      return;
+    }
+
+    switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+      case STOPPED:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has been created. Current status = 
{}. Starting.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        break;
+      case RUNNING:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has already been started. Current 
status = {}. Skip starting.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        return;
+      case DROPPED:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has already been dropped. Current 
status = {}. Skip starting.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        return;
+      default:
+        throw new IllegalStateException(
+            "Unexpected status: " + 
existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+    }
+
+    // start pipe task by consensus group
+    existedPipeMeta
+        .getRuntimeMeta()
+        .getConsensusGroupIdToTaskMetaMap()
+        .forEach(
+            ((consensusGroupId, pipeTaskMeta) -> {
+              startPipeTaskByConsensusGroup(pipeName, creationTime, 
consensusGroupId);
+            }));
+    // set pipe meta status to RUNNING
+    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
+  }
+
+  public void startPipeTaskByConsensusGroup(
+      String pipeName, long creationTime, TConsensusGroupId consensusGroupId) 
{}
+
+  public void stopPipe(String pipeName, long creationTime) {
+    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+    if (existedPipeMeta == null) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has already been dropped or has not 
been created. Skip stopping.",
+          pipeName,
+          creationTime);
+      return;
+    }
+    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has been created but does not match 
the creation time ({}) in stopPipe request. Skip stopping.",
+          pipeName,
+          existedPipeMeta.getStaticMeta().getCreationTime(),
+          creationTime);
+      return;
+    }
+
+    switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+      case STOPPED:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has already been stopped. Current 
status = {}. Skip stopping.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        return;
+      case RUNNING:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has been started. Current status = 
{}. Stopping.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        break;
+      case DROPPED:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has already been dropped. Current 
status = {}. Skip stopping.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        return;
+      default:
+        throw new IllegalStateException(
+            "Unexpected status: " + 
existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+    }
+
+    // stop pipe task by consensus group
+    existedPipeMeta
+        .getRuntimeMeta()
+        .getConsensusGroupIdToTaskMetaMap()
+        .forEach(
+            ((consensusGroupId, pipeTaskMeta) -> {
+              stopPipeTaskByConsensusGroup(pipeName, creationTime, 
consensusGroupId);
+            }));
+    // set pipe meta status to STOPPED
+    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
   }
+
+  public void stopPipeTaskByConsensusGroup(
+      String pipeName, long creationTime, TConsensusGroupId consensusGroupId) 
{}
 }

Reply via email to