This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b833e3b66d1 [IOTDB-5895][IOTDB-5904] Pipe: handling DataNode removal 
(#9881)
b833e3b66d1 is described below

commit b833e3b66d1ce2ea21b5f58c9a5d3b4eba6292cd
Author: Caideyipi <[email protected]>
AuthorDate: Mon May 22 01:29:08 2023 +0800

    [IOTDB-5895][IOTDB-5904] Pipe: handling DataNode removal (#9881)
    
    - [IOTDB-5895] Added handling DataNode removal logic for Pipe
    
    - [IOTDB-5904] CI fails on IoTDBClusterRestartIT.clusterRestartTest
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../apache/iotdb/commons/service/ServiceType.java  |   1 +
 .../iotdb/db/pipe/agent/runtime/PipeLauncher.java  |  18 +-
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |  43 ++-
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 350 +++++++++++----------
 .../execution/executor/PipeSubtaskExecutor.java    |   2 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |  10 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   6 +-
 7 files changed, 244 insertions(+), 186 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 94d524defcf..7bde2af3102 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -75,6 +75,7 @@ public enum ServiceType {
   IOT_CONSENSUS_SERVICE("IoTConsensus Service", "IoTConsensusRPCService"),
   PIPE_PLUGIN_CLASSLOADER_MANAGER_SERVICE(
       "Pipe Plugin Classloader Manager Service", "PipePluginClassLoader"),
+  PIPE_RUNTIME_AGENT("Pipe Runtime Agent", "PipeRuntimeAgent"),
   MLNode_RPC_SERVICE("Rpc Service for MLNode", "MLNodeRPCService");
   private final String name;
   private final String jmxName;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
index 2d0d45ebc02..30ab6aee862 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
@@ -46,12 +46,16 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
-public class PipeLauncher {
+class PipeLauncher {
 
   private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
-  public void launchPipePluginAgent(ResourcesInformationHolder 
resourcesInformationHolder)
-      throws StartupException {
+  private PipeLauncher() {
+    // forbidding instantiation
+  }
+
+  public static synchronized void launchPipePluginAgent(
+      ResourcesInformationHolder resourcesInformationHolder) throws 
StartupException {
     initPipePluginRelatedInstances();
 
     if (resourcesInformationHolder.getPipePluginMetaList() == null
@@ -87,7 +91,7 @@ public class PipeLauncher {
     }
   }
 
-  private void initPipePluginRelatedInstances() throws StartupException {
+  private static void initPipePluginRelatedInstances() throws StartupException 
{
     try {
       PipePluginExecutableManager.setupAndGetInstance(
           IOTDB_CONFIG.getPipeTemporaryLibDir(), IOTDB_CONFIG.getPipeDir());
@@ -97,7 +101,7 @@ public class PipeLauncher {
     }
   }
 
-  private List<PipePluginMeta> getUninstalledOrConflictedPipePluginMetaList(
+  private static List<PipePluginMeta> 
getUninstalledOrConflictedPipePluginMetaList(
       ResourcesInformationHolder resourcesInformationHolder) {
     final List<PipePluginMeta> pipePluginMetaList = new ArrayList<>();
     for (PipePluginMeta pipePluginMeta : 
resourcesInformationHolder.getPipePluginMetaList()) {
@@ -122,7 +126,7 @@ public class PipeLauncher {
     return pipePluginMetaList;
   }
 
-  private void fetchAndSavePipePluginJars(List<PipePluginMeta> 
pipePluginMetaList)
+  private static void fetchAndSavePipePluginJars(List<PipePluginMeta> 
pipePluginMetaList)
       throws StartupException {
     try (ConfigNodeClient configNodeClient =
         
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
@@ -143,7 +147,7 @@ public class PipeLauncher {
     }
   }
 
-  public void launchPipeTaskAgent() throws StartupException {
+  public static synchronized void launchPipeTaskAgent() throws 
StartupException {
     try (final ConfigNodeClient configNodeClient =
         
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
       final TGetAllPipeInfoResp getAllPipeInfoResp = 
configNodeClient.getAllPipeInfo();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 2d85aac9e8c..5253d5a2db2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.pipe.agent.runtime;
 
 import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 import org.apache.iotdb.db.service.ResourcesInformationHolder;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
@@ -27,15 +30,43 @@ import 
org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PipeRuntimeAgent {
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class PipeRuntimeAgent implements IService {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeRuntimeAgent.class);
 
-  public synchronized void launch(ResourcesInformationHolder 
resourcesInformationHolder)
-      throws StartupException {
-    final PipeLauncher pipeLauncher = new PipeLauncher();
-    pipeLauncher.launchPipePluginAgent(resourcesInformationHolder);
-    pipeLauncher.launchPipeTaskAgent();
+  private static final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+  public synchronized void launchPipePluginAgent(
+      ResourcesInformationHolder resourcesInformationHolder) throws 
StartupException {
+    PipeLauncher.launchPipePluginAgent(resourcesInformationHolder);
+  }
+
+  @Override
+  public synchronized void start() throws StartupException {
+    PipeLauncher.launchPipeTaskAgent();
+
+    isShutdown.set(false);
+  }
+
+  @Override
+  public synchronized void stop() {
+    if (isShutdown.get()) {
+      return;
+    }
+    isShutdown.set(true);
+
+    PipeAgent.task().dropAllPipeTasks();
+  }
+
+  public boolean isShutdown() {
+    return isShutdown.get();
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.PIPE_RUNTIME_AGENT;
   }
 
   public void report(PipeSubtask subtask) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 04a3c6a05a1..e231d818dbf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.task.PipeBuilder;
 import org.apache.iotdb.db.pipe.task.PipeTask;
 import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
@@ -67,9 +68,184 @@ public class PipeTaskAgent {
     pipeTaskManager = new PipeTaskManager();
   }
 
-  ////////////////////////// Pipe Task Management //////////////////////////
+  ////////////////////////// Pipe Task Management Entry 
//////////////////////////
 
-  public synchronized void createPipe(PipeMeta pipeMeta) {
+  // TODO: handle progress index
+  public synchronized void handlePipeMetaChanges(List<PipeMeta> 
pipeMetaListFromConfigNode) {
+    // do nothing if data node is removing or removed
+    if (PipeAgent.runtime().isShutdown()) {
+      return;
+    }
+
+    // iterate through pipe meta list from config node, check if pipe meta 
exists on data node
+    // or has changed
+    for (final PipeMeta metaFromConfigNode : pipeMetaListFromConfigNode) {
+      final String pipeName = metaFromConfigNode.getStaticMeta().getPipeName();
+
+      final PipeMeta metaOnDataNode = pipeMetaKeeper.getPipeMeta(pipeName);
+
+      // if pipe meta does not exist on data node, create a new pipe
+      if (metaOnDataNode == null) {
+        createPipe(metaFromConfigNode);
+        if (metaFromConfigNode.getRuntimeMeta().getStatus().get() == 
PipeStatus.RUNNING) {
+          startPipe(pipeName, 
metaFromConfigNode.getStaticMeta().getCreationTime());
+        }
+        // if the status is STOPPED or DROPPED, do nothing
+        continue;
+      }
+
+      // if pipe meta exists on data node, check if it has changed
+      final PipeStaticMeta staticMetaOnDataNode = 
metaOnDataNode.getStaticMeta();
+      final PipeStaticMeta staticMetaFromConfigNode = 
metaFromConfigNode.getStaticMeta();
+
+      // first check if pipe static meta has changed, if so, drop the pipe and 
create a new one
+      if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
+        dropPipe(pipeName);
+        createPipe(metaFromConfigNode);
+        if (metaFromConfigNode.getRuntimeMeta().getStatus().get() == 
PipeStatus.RUNNING) {
+          startPipe(pipeName, 
metaFromConfigNode.getStaticMeta().getCreationTime());
+        }
+        // if the status is STOPPED or DROPPED, do nothing
+        continue;
+      }
+
+      // then check if pipe runtime meta has changed, if so, update the pipe
+      final PipeRuntimeMeta runtimeMetaOnDataNode = 
metaOnDataNode.getRuntimeMeta();
+      final PipeRuntimeMeta runtimeMetaFromConfigNode = 
metaFromConfigNode.getRuntimeMeta();
+      handlePipeRuntimeMetaChanges(
+          staticMetaFromConfigNode, runtimeMetaFromConfigNode, 
runtimeMetaOnDataNode);
+    }
+
+    // check if there are pipes on data node that do not exist on config node, 
if so, drop them
+    final Set<String> pipeNamesFromConfigNode =
+        pipeMetaListFromConfigNode.stream()
+            .map(meta -> meta.getStaticMeta().getPipeName())
+            .collect(Collectors.toSet());
+    for (final PipeMeta metaOnDataNode : pipeMetaKeeper.getPipeMetaList()) {
+      if 
(!pipeNamesFromConfigNode.contains(metaOnDataNode.getStaticMeta().getPipeName()))
 {
+        dropPipe(metaOnDataNode.getStaticMeta().getPipeName());
+      }
+    }
+  }
+
+  private void handlePipeRuntimeMetaChanges(
+      @NotNull PipeStaticMeta pipeStaticMeta,
+      @NotNull PipeRuntimeMeta runtimeMetaFromConfigNode,
+      @NotNull PipeRuntimeMeta runtimeMetaOnDataNode) {
+    // 1. handle data region group leader changed first
+    final Map<TConsensusGroupId, PipeTaskMeta> 
consensusGroupIdToTaskMetaMapFromConfigNode =
+        runtimeMetaFromConfigNode.getConsensusGroupIdToTaskMetaMap();
+    final Map<TConsensusGroupId, PipeTaskMeta> 
consensusGroupIdToTaskMetaMapOnDataNode =
+        runtimeMetaOnDataNode.getConsensusGroupIdToTaskMetaMap();
+
+    // 1.1 iterate over all consensus group ids in config node's pipe runtime 
meta, decide if we
+    // need to drop and create a new task for each consensus group id
+    for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> entryFromConfigNode :
+        consensusGroupIdToTaskMetaMapFromConfigNode.entrySet()) {
+      final TConsensusGroupId consensusGroupIdFromConfigNode = 
entryFromConfigNode.getKey();
+
+      final PipeTaskMeta taskMetaFromConfigNode = 
entryFromConfigNode.getValue();
+      final PipeTaskMeta taskMetaOnDataNode =
+          
consensusGroupIdToTaskMetaMapOnDataNode.get(consensusGroupIdFromConfigNode);
+
+      // if task meta does not exist on data node, create a new task
+      if (taskMetaOnDataNode == null) {
+        createPipeTask(
+            consensusGroupIdFromConfigNode,
+            pipeStaticMeta,
+            taskMetaFromConfigNode.getProgressIndex(),
+            taskMetaFromConfigNode.getRegionLeader());
+        // we keep the new created task's status consistent with the status 
recorded in data node's
+        // pipe runtime meta. please note that the status recorded in data 
node's pipe runtime meta
+        // is not reliable, but we will have a check later to make sure the 
status is correct.
+        if (runtimeMetaOnDataNode.getStatus().get() == PipeStatus.RUNNING) {
+          startPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
+        }
+        continue;
+      }
+
+      // if task meta exists on data node, check if it has changed
+      final int regionLeaderFromConfigNode = 
taskMetaFromConfigNode.getRegionLeader();
+      final int regionLeaderOnDataNode = taskMetaOnDataNode.getRegionLeader();
+
+      if (regionLeaderFromConfigNode != regionLeaderOnDataNode) {
+        dropPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
+        createPipeTask(
+            consensusGroupIdFromConfigNode,
+            pipeStaticMeta,
+            taskMetaFromConfigNode.getProgressIndex(),
+            taskMetaFromConfigNode.getRegionLeader());
+        // we keep the new created task's status consistent with the status 
recorded in data node's
+        // pipe runtime meta. please note that the status recorded in data 
node's pipe runtime meta
+        // is not reliable, but we will have a check later to make sure the 
status is correct.
+        if (runtimeMetaOnDataNode.getStatus().get() == PipeStatus.RUNNING) {
+          startPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
+        }
+      }
+    }
+
+    // 1.2 iterate over all consensus group ids on data node's pipe runtime 
meta, decide if we need
+    // to drop any task. we do not need to create any new task here because we 
have already done
+    // that in 1.1.
+    for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> entryOnDataNode :
+        consensusGroupIdToTaskMetaMapOnDataNode.entrySet()) {
+      final TConsensusGroupId consensusGroupIdOnDataNode = 
entryOnDataNode.getKey();
+      final PipeTaskMeta taskMetaFromConfigNode =
+          
consensusGroupIdToTaskMetaMapFromConfigNode.get(consensusGroupIdOnDataNode);
+      if (taskMetaFromConfigNode == null) {
+        dropPipeTask(consensusGroupIdOnDataNode, pipeStaticMeta);
+      }
+    }
+
+    // 2. handle pipe runtime meta status changes
+    final PipeStatus statusFromConfigNode = 
runtimeMetaFromConfigNode.getStatus().get();
+    final PipeStatus statusOnDataNode = 
runtimeMetaOnDataNode.getStatus().get();
+    if (statusFromConfigNode == statusOnDataNode) {
+      return;
+    }
+
+    switch (statusFromConfigNode) {
+      case RUNNING:
+        if (Objects.requireNonNull(statusOnDataNode) == PipeStatus.STOPPED) {
+          startPipe(pipeStaticMeta.getPipeName(), 
pipeStaticMeta.getCreationTime());
+        } else {
+          throw new IllegalStateException(
+              String.format(
+                  "Unknown pipe status %s for pipe %s",
+                  statusOnDataNode, pipeStaticMeta.getPipeName()));
+        }
+        break;
+      case STOPPED:
+        if (Objects.requireNonNull(statusOnDataNode) == PipeStatus.RUNNING) {
+          stopPipe(pipeStaticMeta.getPipeName(), 
pipeStaticMeta.getCreationTime());
+        } else {
+          throw new IllegalStateException(
+              String.format(
+                  "Unknown pipe status %s for pipe %s",
+                  statusOnDataNode, pipeStaticMeta.getPipeName()));
+        }
+        break;
+      case DROPPED:
+        // this should not happen, but we still handle it here
+        dropPipe(pipeStaticMeta.getPipeName(), 
pipeStaticMeta.getCreationTime());
+        break;
+      default:
+        throw new IllegalStateException(
+            String.format(
+                "Unknown pipe status %s for pipe %s",
+                statusFromConfigNode, pipeStaticMeta.getPipeName()));
+    }
+  }
+
+  public synchronized void dropAllPipeTasks() {
+    for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+      dropPipe(pipeMeta.getStaticMeta().getPipeName(), 
pipeMeta.getStaticMeta().getCreationTime());
+    }
+  }
+
+  ////////////////////////// Manage by Pipe Name //////////////////////////
+
+  private void createPipe(PipeMeta pipeMeta) {
     final String pipeName = pipeMeta.getStaticMeta().getPipeName();
     final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
 
@@ -120,7 +296,7 @@ public class PipeTaskAgent {
     pipeMetaKeeper.addPipeMeta(pipeName, pipeMeta);
   }
 
-  public synchronized void dropPipe(String pipeName, long creationTime) {
+  private void dropPipe(String pipeName, long creationTime) {
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
 
     if (existedPipeMeta == null) {
@@ -162,7 +338,7 @@ public class PipeTaskAgent {
     pipeMetaKeeper.removePipeMeta(pipeName);
   }
 
-  public synchronized void dropPipe(String pipeName) {
+  private void dropPipe(String pipeName) {
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
 
     if (existedPipeMeta == null) {
@@ -192,7 +368,7 @@ public class PipeTaskAgent {
     pipeMetaKeeper.removePipeMeta(pipeName);
   }
 
-  public synchronized void startPipe(String pipeName, long creationTime) {
+  private void startPipe(String pipeName, long creationTime) {
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
 
     if (existedPipeMeta == null) {
@@ -256,7 +432,7 @@ public class PipeTaskAgent {
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
   }
 
-  public synchronized void stopPipe(String pipeName, long creationTime) {
+  private void stopPipe(String pipeName, long creationTime) {
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
 
     if (existedPipeMeta == null) {
@@ -320,167 +496,7 @@ public class PipeTaskAgent {
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
   }
 
-  // TODO: handle progress index
-  public synchronized void handlePipeMetaChanges(List<PipeMeta> 
pipeMetaListFromConfigNode) {
-    // iterate through pipe meta list from config node, check if pipe meta 
exists on data node
-    // or has changed
-    for (final PipeMeta metaFromConfigNode : pipeMetaListFromConfigNode) {
-      final String pipeName = metaFromConfigNode.getStaticMeta().getPipeName();
-
-      final PipeMeta metaOnDataNode = pipeMetaKeeper.getPipeMeta(pipeName);
-
-      // if pipe meta does not exist on data node, create a new pipe
-      if (metaOnDataNode == null) {
-        createPipe(metaFromConfigNode);
-        if (metaFromConfigNode.getRuntimeMeta().getStatus().get() == 
PipeStatus.RUNNING) {
-          startPipe(pipeName, 
metaFromConfigNode.getStaticMeta().getCreationTime());
-        }
-        // if the status is STOPPED or DROPPED, do nothing
-        continue;
-      }
-
-      // if pipe meta exists on data node, check if it has changed
-      final PipeStaticMeta staticMetaOnDataNode = 
metaOnDataNode.getStaticMeta();
-      final PipeStaticMeta staticMetaFromConfigNode = 
metaFromConfigNode.getStaticMeta();
-
-      // first check if pipe static meta has changed, if so, drop the pipe and 
create a new one
-      if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
-        dropPipe(pipeName);
-        createPipe(metaFromConfigNode);
-        if (metaFromConfigNode.getRuntimeMeta().getStatus().get() == 
PipeStatus.RUNNING) {
-          startPipe(pipeName, 
metaFromConfigNode.getStaticMeta().getCreationTime());
-        }
-        // if the status is STOPPED or DROPPED, do nothing
-        continue;
-      }
-
-      // then check if pipe runtime meta has changed, if so, update the pipe
-      final PipeRuntimeMeta runtimeMetaOnDataNode = 
metaOnDataNode.getRuntimeMeta();
-      final PipeRuntimeMeta runtimeMetaFromConfigNode = 
metaFromConfigNode.getRuntimeMeta();
-      handlePipeRuntimeMetaChanges(
-          staticMetaFromConfigNode, runtimeMetaFromConfigNode, 
runtimeMetaOnDataNode);
-    }
-
-    // check if there are pipes on data node that do not exist on config node, 
if so, drop them
-    final Set<String> pipeNamesFromConfigNode =
-        pipeMetaListFromConfigNode.stream()
-            .map(meta -> meta.getStaticMeta().getPipeName())
-            .collect(Collectors.toSet());
-    for (final PipeMeta metaOnDataNode : pipeMetaKeeper.getPipeMetaList()) {
-      if 
(!pipeNamesFromConfigNode.contains(metaOnDataNode.getStaticMeta().getPipeName()))
 {
-        dropPipe(metaOnDataNode.getStaticMeta().getPipeName());
-      }
-    }
-  }
-
-  private void handlePipeRuntimeMetaChanges(
-      @NotNull PipeStaticMeta pipeStaticMeta,
-      @NotNull PipeRuntimeMeta runtimeMetaFromConfigNode,
-      @NotNull PipeRuntimeMeta runtimeMetaOnDataNode) {
-    // 1. handle data region group leader changed first
-    final Map<TConsensusGroupId, PipeTaskMeta> 
consensusGroupIdToTaskMetaMapFromConfigNode =
-        runtimeMetaFromConfigNode.getConsensusGroupIdToTaskMetaMap();
-    final Map<TConsensusGroupId, PipeTaskMeta> 
consensusGroupIdToTaskMetaMapOnDataNode =
-        runtimeMetaOnDataNode.getConsensusGroupIdToTaskMetaMap();
-
-    // 1.1 iterate over all consensus group ids in config node's pipe runtime 
meta, decide if we
-    // need to drop and create a new task for each consensus group id
-    for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> entryFromConfigNode :
-        consensusGroupIdToTaskMetaMapFromConfigNode.entrySet()) {
-      final TConsensusGroupId consensusGroupIdFromConfigNode = 
entryFromConfigNode.getKey();
-
-      final PipeTaskMeta taskMetaFromConfigNode = 
entryFromConfigNode.getValue();
-      final PipeTaskMeta taskMetaOnDataNode =
-          
consensusGroupIdToTaskMetaMapOnDataNode.get(consensusGroupIdFromConfigNode);
-
-      // if task meta does not exist on data node, create a new task
-      if (taskMetaOnDataNode == null) {
-        createPipeTask(
-            consensusGroupIdFromConfigNode,
-            pipeStaticMeta,
-            taskMetaFromConfigNode.getProgressIndex(),
-            taskMetaFromConfigNode.getRegionLeader());
-        // we keep the new created task's status consistent with the status 
recorded in data node's
-        // pipe runtime meta. please note that the status recorded in data 
node's pipe runtime meta
-        // is not reliable, but we will have a check later to make sure the 
status is correct.
-        if (runtimeMetaOnDataNode.getStatus().get() == PipeStatus.RUNNING) {
-          startPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
-        }
-        continue;
-      }
-
-      // if task meta exists on data node, check if it has changed
-      final int regionLeaderFromConfigNode = 
taskMetaFromConfigNode.getRegionLeader();
-      final int regionLeaderOnDataNode = taskMetaOnDataNode.getRegionLeader();
-
-      if (regionLeaderFromConfigNode != regionLeaderOnDataNode) {
-        dropPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
-        createPipeTask(
-            consensusGroupIdFromConfigNode,
-            pipeStaticMeta,
-            taskMetaFromConfigNode.getProgressIndex(),
-            taskMetaFromConfigNode.getRegionLeader());
-        // we keep the new created task's status consistent with the status 
recorded in data node's
-        // pipe runtime meta. please note that the status recorded in data 
node's pipe runtime meta
-        // is not reliable, but we will have a check later to make sure the 
status is correct.
-        if (runtimeMetaOnDataNode.getStatus().get() == PipeStatus.RUNNING) {
-          startPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
-        }
-      }
-    }
-
-    // 1.2 iterate over all consensus group ids on data node's pipe runtime 
meta, decide if we need
-    // to drop any task. we do not need to create any new task here because we 
have already done
-    // that in 1.1.
-    for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> entryOnDataNode :
-        consensusGroupIdToTaskMetaMapOnDataNode.entrySet()) {
-      final TConsensusGroupId consensusGroupIdOnDataNode = 
entryOnDataNode.getKey();
-      final PipeTaskMeta taskMetaFromConfigNode =
-          
consensusGroupIdToTaskMetaMapFromConfigNode.get(consensusGroupIdOnDataNode);
-      if (taskMetaFromConfigNode == null) {
-        dropPipeTask(consensusGroupIdOnDataNode, pipeStaticMeta);
-      }
-    }
-
-    // 2. handle pipe runtime meta status changes
-    final PipeStatus statusFromConfigNode = 
runtimeMetaFromConfigNode.getStatus().get();
-    final PipeStatus statusOnDataNode = 
runtimeMetaOnDataNode.getStatus().get();
-    if (statusFromConfigNode == statusOnDataNode) {
-      return;
-    }
-
-    switch (statusFromConfigNode) {
-      case RUNNING:
-        if (Objects.requireNonNull(statusOnDataNode) == PipeStatus.STOPPED) {
-          startPipe(pipeStaticMeta.getPipeName(), 
pipeStaticMeta.getCreationTime());
-        } else {
-          throw new IllegalStateException(
-              String.format(
-                  "Unknown pipe status %s for pipe %s",
-                  statusOnDataNode, pipeStaticMeta.getPipeName()));
-        }
-        break;
-      case STOPPED:
-        if (Objects.requireNonNull(statusOnDataNode) == PipeStatus.RUNNING) {
-          stopPipe(pipeStaticMeta.getPipeName(), 
pipeStaticMeta.getCreationTime());
-        } else {
-          throw new IllegalStateException(
-              String.format(
-                  "Unknown pipe status %s for pipe %s",
-                  statusOnDataNode, pipeStaticMeta.getPipeName()));
-        }
-        break;
-      case DROPPED:
-        // this should not happen, but we still handle it here
-        dropPipe(pipeStaticMeta.getPipeName(), 
pipeStaticMeta.getCreationTime());
-        break;
-      default:
-        throw new IllegalStateException(
-            String.format(
-                "Unknown pipe status %s for pipe %s",
-                statusFromConfigNode, pipeStaticMeta.getPipeName()));
-    }
-  }
+  ///////////////////////// Manage by dataRegionGroupId 
/////////////////////////
 
   private void createPipeTask(
       TConsensusGroupId dataRegionGroupId,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
index 2952476dbbe..762561546b3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -58,7 +58,7 @@ public abstract class PipeSubtaskExecutor {
 
   /////////////////////// subtask management ///////////////////////
 
-  public final void register(PipeSubtask subtask) {
+  public final synchronized void register(PipeSubtask subtask) {
     if (registeredIdSubtaskMapper.containsKey(subtask.getTaskID())) {
       LOGGER.warn("The subtask {} is already registered.", 
subtask.getTaskID());
       return;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 44a5690db8c..db28843d7c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -540,6 +540,8 @@ public class DataNode implements DataNodeMBean {
     registerManager.register(RegionMigrateService.getInstance());
 
     registerManager.register(CompactionTaskManager.getInstance());
+
+    registerManager.register(PipeAgent.runtime());
   }
 
   /** set up RPC and protocols after DataNode is available */
@@ -834,17 +836,17 @@ public class DataNode implements DataNodeMBean {
   }
 
   private void preparePipeResources() throws StartupException {
-    PipeAgent.runtime().launch(resourcesInformationHolder);
+    PipeAgent.runtime().launchPipePluginAgent(resourcesInformationHolder);
   }
 
   private void getPipeInformationList(List<ByteBuffer> allPipeInformation) {
-    if (allPipeInformation != null && !allPipeInformation.isEmpty()) {
-      List<PipePluginMeta> list = new ArrayList<>();
+    final List<PipePluginMeta> list = new ArrayList<>();
+    if (allPipeInformation != null) {
       for (ByteBuffer pipeInformationByteBuffer : allPipeInformation) {
         list.add(PipePluginMeta.deserialize(pipeInformationByteBuffer));
       }
-      resourcesInformationHolder.setPipePluginMetaList(list);
     }
+    resourcesInformationHolder.setPipePluginMetaList(list);
   }
 
   private void initSchemaEngine() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 6e0f82681a6..86feb49b8db 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1180,7 +1180,11 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   @Override
   public TSStatus setSystemStatus(String status) throws TException {
     try {
-      
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.parse(status));
+      final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
+      commonConfig.setNodeStatus(NodeStatus.parse(status));
+      if (commonConfig.getNodeStatus().equals(NodeStatus.Removing)) {
+        PipeAgent.runtime().stop();
+      }
     } catch (Exception e) {
       return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
     }

Reply via email to