This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f3f4e0d92f [IOTDB-5893] Pipe: PipeLauncher for data node setup 
process (#9879)
6f3f4e0d92f is described below

commit 6f3f4e0d92f9b01c76990e14490d62c5b6657ad2
Author: Caideyipi <[email protected]>
AuthorDate: Sun May 21 05:22:00 2023 +0800

    [IOTDB-5893] Pipe: PipeLauncher for data node setup process (#9879)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../iotdb/confignode/manager/ConfigManager.java    |  11 +-
 .../apache/iotdb/confignode/manager/IManager.java  |   8 -
 .../iotdb/confignode/manager/pipe/PipeManager.java |   3 +
 .../pipe/{ => plugin}/PipePluginCoordinator.java   |   2 +-
 .../manager/pipe/{ => runtime}/PipeMetaSyncer.java |   2 +-
 .../pipe/{ => runtime}/PipeRuntimeCoordinator.java |   2 +-
 .../pipe/{ => task}/PipeTaskCoordinator.java       |  23 ++-
 .../pipe/plugin/CreatePipePluginProcedure.java     |   2 +-
 .../impl/pipe/plugin/DropPipePluginProcedure.java  |   2 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   7 -
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  17 ---
 .../iotdb/db/pipe/agent/runtime/PipeLauncher.java  | 166 +++++++++++++++++++++
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |   9 ++
 .../java/org/apache/iotdb/db/service/DataNode.java |  94 +-----------
 .../db/sync/common/ClusterSyncInfoFetcher.java     |  11 +-
 .../src/main/thrift/confignode.thrift              |   9 +-
 16 files changed, 198 insertions(+), 170 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 11d2bd2912e..a23b6b44412 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -144,7 +144,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -1624,18 +1623,10 @@ public class ConfigManager implements IManager {
   public TGetAllPipeInfoResp getAllPipeInfo() {
     TSStatus status = confirmLeader();
     return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        ? pipeManager.getPipeTaskCoordinator().showPipes()
+        ? pipeManager.getPipeTaskCoordinator().getAllPipeInfo()
         : new TGetAllPipeInfoResp().setStatus(status);
   }
 
-  @Override
-  public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
-    TSStatus status = confirmLeader();
-    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        ? pipeManager.getPipeTaskCoordinator().recordPipeMessage(req)
-        : status;
-  }
-
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
     TSStatus status = confirmLeader();
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 4a305f822bc..5eea6b30fdf 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -87,7 +87,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -598,13 +597,6 @@ public interface IManager {
    */
   TGetAllPipeInfoResp getAllPipeInfo();
 
-  /**
-   * Record PipeMessage
-   *
-   * @return TSStatus
-   */
-  TSStatus recordPipeMessage(TRecordPipeMessageReq req);
-
   /**
    * Get RegionId。used for Show cluster slots information in
    * docs/zh/UserGuide/Cluster/Cluster-Maintenance.md.
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
index ec9ffd63009..be9440796fa 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.confignode.manager.pipe;
 
 import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator;
+import org.apache.iotdb.confignode.manager.pipe.runtime.PipeRuntimeCoordinator;
+import org.apache.iotdb.confignode.manager.pipe.task.PipeTaskCoordinator;
 import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
 
 public class PipeManager {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
similarity index 98%
rename from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
rename to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
index 8943a28e8c2..7435344edbf 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe;
+package org.apache.iotdb.confignode.manager.pipe.plugin;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
similarity index 97%
rename from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
rename to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
index 515abfdbdd1..ba9bba61c6a 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe;
+package org.apache.iotdb.confignode.manager.pipe.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
similarity index 98%
rename from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
rename to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 81df8587176..9a117a0fbd1 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe;
+package org.apache.iotdb.confignode.manager.pipe.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
similarity index 91%
rename from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
rename to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
index e01f4b785bc..573c1cb1a80 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.pipe;
+package org.apache.iotdb.confignode.manager.pipe.task;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2;
@@ -25,7 +25,6 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -76,7 +75,14 @@ public class PipeTaskCoordinator {
     return configManager.getProcedureManager().dropPipe(pipeName);
   }
 
-  public TGetAllPipeInfoResp showPipes() {
+  public TShowPipeResp showPipes(TShowPipeReq req) {
+    return ((PipeTableResp)
+            configManager.getConsensusManager().read(new 
ShowPipePlanV2()).getDataset())
+        .filter(req.whereClause, req.pipeName)
+        .convertToTShowPipeResp();
+  }
+
+  public TGetAllPipeInfoResp getAllPipeInfo() {
     try {
       return ((PipeTableResp)
               configManager.getConsensusManager().read(new 
ShowPipePlanV2()).getDataset())
@@ -89,15 +95,4 @@ public class PipeTaskCoordinator {
           Collections.emptyList());
     }
   }
-
-  public TShowPipeResp showPipes(TShowPipeReq req) {
-    return ((PipeTableResp)
-            configManager.getConsensusManager().read(new 
ShowPipePlanV2()).getDataset())
-        .filter(req.whereClause, req.pipeName)
-        .convertToTShowPipeResp();
-  }
-
-  public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
-    throw new UnsupportedOperationException("Not implemented yet");
-  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index b4790661278..88f40bdc114 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
 import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.pipe.PipePluginCoordinator;
+import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import 
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 7c8268ecd02..d44df9ac580 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
 
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
-import org.apache.iotdb.confignode.manager.pipe.PipePluginCoordinator;
+import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import 
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 22367d7c164..c2b44e07de1 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -127,7 +127,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -909,12 +908,6 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
     return configManager.getAllPipeInfo();
   }
 
-  @Override
-  @Deprecated
-  public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
-    return configManager.recordPipeMessage(req);
-  }
-
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
     if (req.isSetTimeStamp() && req.getType() != 
TConsensusGroupType.DataRegion) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java 
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index ca35b446fc3..4376f6a4eec 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -96,7 +96,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -1880,22 +1879,6 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
-  @Override
-  public TSStatus recordPipeMessage(TRecordPipeMessageReq req) throws 
TException {
-    for (int i = 0; i < RETRY_NUM; i++) {
-      try {
-        TSStatus status = client.recordPipeMessage(req);
-        if (!updateConfigNodeLeader(status)) {
-          return status;
-        }
-      } catch (TException e) {
-        configLeader = null;
-      }
-      waitAndReconnect();
-    }
-    throw new TException(MSG_RECONNECTION_FAIL);
-  }
-
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
new file mode 100644
index 00000000000..2d0d45ebc02
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.runtime;
+
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import 
org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
+import 
org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.service.ResourcesInformationHolder;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PipeLauncher {
+
+  private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
+  public void launchPipePluginAgent(ResourcesInformationHolder 
resourcesInformationHolder)
+      throws StartupException {
+    initPipePluginRelatedInstances();
+
+    if (resourcesInformationHolder.getPipePluginMetaList() == null
+        || resourcesInformationHolder.getPipePluginMetaList().isEmpty()) {
+      return;
+    }
+
+    final List<PipePluginMeta> uninstalledOrConflictedPipePluginMetaList =
+        
getUninstalledOrConflictedPipePluginMetaList(resourcesInformationHolder);
+    int index = 0;
+    while (index < uninstalledOrConflictedPipePluginMetaList.size()) {
+      List<PipePluginMeta> curList = new ArrayList<>();
+      int offset = 0;
+      while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
+          && index + offset < 
uninstalledOrConflictedPipePluginMetaList.size()) {
+        curList.add(uninstalledOrConflictedPipePluginMetaList.get(index + 
offset));
+        offset++;
+      }
+      index += (offset + 1);
+      fetchAndSavePipePluginJars(curList);
+    }
+
+    // create instances of pipe plugins and do registration
+    try {
+      for (PipePluginMeta meta : 
resourcesInformationHolder.getPipePluginMetaList()) {
+        if (meta.isBuiltin()) {
+          continue;
+        }
+        PipeAgent.plugin().doRegister(meta);
+      }
+    } catch (Exception e) {
+      throw new StartupException(e);
+    }
+  }
+
+  private void initPipePluginRelatedInstances() throws StartupException {
+    try {
+      PipePluginExecutableManager.setupAndGetInstance(
+          IOTDB_CONFIG.getPipeTemporaryLibDir(), IOTDB_CONFIG.getPipeDir());
+      
PipePluginClassLoaderManager.setupAndGetInstance(IOTDB_CONFIG.getPipeDir());
+    } catch (IOException e) {
+      throw new StartupException(e);
+    }
+  }
+
+  private List<PipePluginMeta> getUninstalledOrConflictedPipePluginMetaList(
+      ResourcesInformationHolder resourcesInformationHolder) {
+    final List<PipePluginMeta> pipePluginMetaList = new ArrayList<>();
+    for (PipePluginMeta pipePluginMeta : 
resourcesInformationHolder.getPipePluginMetaList()) {
+      if (pipePluginMeta.isBuiltin()) {
+        continue;
+      }
+      // If jar does not exist, add current pipePluginMeta to list
+      if (!PipePluginExecutableManager.getInstance()
+          .hasFileUnderInstallDir(pipePluginMeta.getJarName())) {
+        pipePluginMetaList.add(pipePluginMeta);
+      } else {
+        try {
+          // local jar has conflicts with jar on config node, add current 
pipePluginMeta to list
+          if 
(!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
+            pipePluginMetaList.add(pipePluginMeta);
+          }
+        } catch (PipeManagementException e) {
+          pipePluginMetaList.add(pipePluginMeta);
+        }
+      }
+    }
+    return pipePluginMetaList;
+  }
+
+  private void fetchAndSavePipePluginJars(List<PipePluginMeta> 
pipePluginMetaList)
+      throws StartupException {
+    try (ConfigNodeClient configNodeClient =
+        
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
+      final List<String> jarNameList =
+          
pipePluginMetaList.stream().map(PipePluginMeta::getJarName).collect(Collectors.toList());
+      final TGetJarInListResp resp =
+          configNodeClient.getPipePluginJar(new TGetJarInListReq(jarNameList));
+      if (resp.getStatus().getCode() == 
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
+        throw new StartupException("Failed to get pipe plugin jar from config 
node.");
+      }
+      final List<ByteBuffer> jarList = resp.getJarList();
+      for (int i = 0; i < pipePluginMetaList.size(); i++) {
+        PipePluginExecutableManager.getInstance()
+            .saveToInstallDir(jarList.get(i), 
pipePluginMetaList.get(i).getJarName());
+      }
+    } catch (IOException | TException | ClientManagerException e) {
+      throw new StartupException(e);
+    }
+  }
+
+  public void launchPipeTaskAgent() throws StartupException {
+    try (final ConfigNodeClient configNodeClient =
+        
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
+      final TGetAllPipeInfoResp getAllPipeInfoResp = 
configNodeClient.getAllPipeInfo();
+      if (getAllPipeInfoResp.getStatus().getCode()
+          == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
+        throw new StartupException("Failed to get pipe task meta from config 
node.");
+      }
+
+      PipeAgent.task()
+          .handlePipeMetaChanges(
+              getAllPipeInfoResp.getAllPipeInfo().stream()
+                  .map(PipeMeta::deserialize)
+                  .collect(Collectors.toList()));
+    } catch (StartupException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new StartupException(e);
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 1160ff4a845..2d85aac9e8c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.pipe.agent.runtime;
 
+import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.db.service.ResourcesInformationHolder;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
 
 import org.slf4j.Logger;
@@ -29,6 +31,13 @@ public class PipeRuntimeAgent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeRuntimeAgent.class);
 
+  public synchronized void launch(ResourcesInformationHolder 
resourcesInformationHolder)
+      throws StartupException {
+    final PipeLauncher pipeLauncher = new PipeLauncher();
+    pipeLauncher.launchPipePluginAgent(resourcesInformationHolder);
+    pipeLauncher.launchPipeTaskAgent();
+  }
+
   public void report(PipeSubtask subtask) {
     // TODO: terminate the task by the given taskID
     LOGGER.warn(
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 3e0f39ac4ff..44a5690db8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -31,8 +31,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
-import 
org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
-import 
org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.service.RegisterManager;
 import org.apache.iotdb.commons.service.metric.MetricService;
@@ -84,7 +82,6 @@ import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.utils.WALMode;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.metrics.utils.InternalReporterType;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.udf.api.exception.UDFManagementException;
 
@@ -461,7 +458,7 @@ public class DataNode implements DataNodeMBean {
   private void prepareResources() throws StartupException {
     prepareUDFResources();
     prepareTriggerResources();
-    preparePipePluginResources();
+    preparePipeResources();
   }
 
   /** register services and set up DataNode */
@@ -836,93 +833,8 @@ public class DataNode implements DataNodeMBean {
     }
   }
 
-  private void preparePipePluginResources() throws StartupException {
-    initPipePluginRelatedInstance();
-    if (resourcesInformationHolder.getPipePluginMetaList() == null
-        || resourcesInformationHolder.getPipePluginMetaList().isEmpty()) {
-      return;
-    }
-
-    // get jars from config node
-    List<PipePluginMeta> pipePluginNeedJarList = getJarListForPipePlugin();
-    int index = 0;
-    while (index < pipePluginNeedJarList.size()) {
-      List<PipePluginMeta> curList = new ArrayList<>();
-      int offset = 0;
-      while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
-          && index + offset < pipePluginNeedJarList.size()) {
-        curList.add(pipePluginNeedJarList.get(index + offset));
-        offset++;
-      }
-      index += (offset + 1);
-      getJarOfPipePlugins(curList);
-    }
-
-    // create instances of pipe plugins and do registration
-    try {
-      for (PipePluginMeta meta : 
resourcesInformationHolder.getPipePluginMetaList()) {
-        if (meta.isBuiltin()) {
-          continue;
-        }
-        PipeAgent.plugin().doRegister(meta);
-      }
-    } catch (Exception e) {
-      throw new StartupException(e);
-    }
-  }
-
-  private void initPipePluginRelatedInstance() throws StartupException {
-    try {
-      PipePluginExecutableManager.setupAndGetInstance(
-          config.getPipeTemporaryLibDir(), config.getPipeDir());
-      PipePluginClassLoaderManager.setupAndGetInstance(config.getPipeDir());
-    } catch (IOException e) {
-      throw new StartupException(e);
-    }
-  }
-
-  private List<PipePluginMeta> getJarListForPipePlugin() {
-    List<PipePluginMeta> res = new ArrayList<>();
-    for (PipePluginMeta pipePluginMeta : 
resourcesInformationHolder.getPipePluginMetaList()) {
-      if (pipePluginMeta.isBuiltin()) {
-        continue;
-      }
-      // If jar does not exist, add current pipePluginMeta to list
-      if (!PipePluginExecutableManager.getInstance()
-          .hasFileUnderInstallDir(pipePluginMeta.getJarName())) {
-        res.add(pipePluginMeta);
-      } else {
-        try {
-          // local jar has conflicts with jar on config node, add current 
pipePluginMeta to list
-          if 
(!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
-            res.add(pipePluginMeta);
-          }
-        } catch (PipeManagementException e) {
-          res.add(pipePluginMeta);
-        }
-      }
-    }
-    return res;
-  }
-
-  private void getJarOfPipePlugins(List<PipePluginMeta> pipePluginMetaList)
-      throws StartupException {
-    try (ConfigNodeClient configNodeClient =
-        
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
-      List<String> jarNameList =
-          
pipePluginMetaList.stream().map(PipePluginMeta::getJarName).collect(Collectors.toList());
-      TGetJarInListResp resp = configNodeClient.getPipePluginJar(new 
TGetJarInListReq(jarNameList));
-      if (resp.getStatus().getCode() == 
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
-        throw new StartupException("Failed to get pipe plugin jar from config 
node.");
-      }
-      List<ByteBuffer> jarList = resp.getJarList();
-      for (int i = 0; i < pipePluginMetaList.size(); i++) {
-        PipePluginExecutableManager.getInstance()
-            .saveToInstallDir(jarList.get(i), 
pipePluginMetaList.get(i).getJarName());
-      }
-    } catch (IOException | TException | ClientManagerException e) {
-      throw new StartupException(e);
-    }
+  private void preparePipeResources() throws StartupException {
+    PipeAgent.runtime().launch(resourcesInformationHolder);
   }
 
   private void getPipeInformationList(List<ByteBuffer> allPipeInformation) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
index 6c4c6282a7f..929a7ce4f47 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
@@ -126,15 +125,7 @@ public class ClusterSyncInfoFetcher implements 
ISyncInfoFetcher {
 
   @Override
   public TSStatus recordMsg(String pipeName, PipeMessage message) {
-    try (ConfigNodeClient configNodeClient =
-        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      TRecordPipeMessageReq req =
-          new TRecordPipeMessageReq(pipeName, message.serializeToByteBuffer());
-      return configNodeClient.recordPipeMessage(req);
-    } catch (Exception e) {
-      LOGGER.error("RecordMsg error because {}", e.getMessage(), e);
-      return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, e.getMessage());
-    }
+    return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, "method not supported");
   }
 
   // endregion
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift 
b/thrift-confignode/src/main/thrift/confignode.thrift
index 7607d0cedfc..13f3461981a 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -600,11 +600,7 @@ struct TGetPathsSetTemplatesResp {
   2: optional list<string> pathList
 }
 
-// SYNC
-struct TRecordPipeMessageReq{
-  1: required string pipeName
-  2: required binary message
-}
+// Pipe
 
 struct TShowPipeInfo {
   1: required string id
@@ -1294,9 +1290,6 @@ service IConfigNodeRPCService {
   /* Get all pipe information. It is used for DataNode registration and 
restart*/
   TGetAllPipeInfoResp getAllPipeInfo();
 
-  /* Get all pipe information. It is used for DataNode registration and 
restart*/
-  common.TSStatus recordPipeMessage(TRecordPipeMessageReq req);
-
   // ======================================================
   // TestTools
   // ======================================================

Reply via email to