This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6f3f4e0d92f [IOTDB-5893] Pipe: PipeLauncher for data node setup
process (#9879)
6f3f4e0d92f is described below
commit 6f3f4e0d92f9b01c76990e14490d62c5b6657ad2
Author: Caideyipi <[email protected]>
AuthorDate: Sun May 21 05:22:00 2023 +0800
[IOTDB-5893] Pipe: PipeLauncher for data node setup process (#9879)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../iotdb/confignode/manager/ConfigManager.java | 11 +-
.../apache/iotdb/confignode/manager/IManager.java | 8 -
.../iotdb/confignode/manager/pipe/PipeManager.java | 3 +
.../pipe/{ => plugin}/PipePluginCoordinator.java | 2 +-
.../manager/pipe/{ => runtime}/PipeMetaSyncer.java | 2 +-
.../pipe/{ => runtime}/PipeRuntimeCoordinator.java | 2 +-
.../pipe/{ => task}/PipeTaskCoordinator.java | 23 ++-
.../pipe/plugin/CreatePipePluginProcedure.java | 2 +-
.../impl/pipe/plugin/DropPipePluginProcedure.java | 2 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 7 -
.../apache/iotdb/db/client/ConfigNodeClient.java | 17 ---
.../iotdb/db/pipe/agent/runtime/PipeLauncher.java | 166 +++++++++++++++++++++
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 9 ++
.../java/org/apache/iotdb/db/service/DataNode.java | 94 +-----------
.../db/sync/common/ClusterSyncInfoFetcher.java | 11 +-
.../src/main/thrift/confignode.thrift | 9 +-
16 files changed, 198 insertions(+), 170 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 11d2bd2912e..a23b6b44412 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -144,7 +144,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -1624,18 +1623,10 @@ public class ConfigManager implements IManager {
public TGetAllPipeInfoResp getAllPipeInfo() {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- ? pipeManager.getPipeTaskCoordinator().showPipes()
+ ? pipeManager.getPipeTaskCoordinator().getAllPipeInfo()
: new TGetAllPipeInfoResp().setStatus(status);
}
- @Override
- public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
- TSStatus status = confirmLeader();
- return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- ? pipeManager.getPipeTaskCoordinator().recordPipeMessage(req)
- : status;
- }
-
@Override
public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
TSStatus status = confirmLeader();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 4a305f822bc..5eea6b30fdf 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -87,7 +87,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -598,13 +597,6 @@ public interface IManager {
*/
TGetAllPipeInfoResp getAllPipeInfo();
- /**
- * Record PipeMessage
- *
- * @return TSStatus
- */
- TSStatus recordPipeMessage(TRecordPipeMessageReq req);
-
/**
* Get RegionId。used for Show cluster slots information in
* docs/zh/UserGuide/Cluster/Cluster-Maintenance.md.
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
index ec9ffd63009..be9440796fa 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
@@ -20,6 +20,9 @@
package org.apache.iotdb.confignode.manager.pipe;
import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator;
+import org.apache.iotdb.confignode.manager.pipe.runtime.PipeRuntimeCoordinator;
+import org.apache.iotdb.confignode.manager.pipe.task.PipeTaskCoordinator;
import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
public class PipeManager {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
similarity index 98%
rename from
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
rename to
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
index 8943a28e8c2..7435344edbf 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.manager.pipe;
+package org.apache.iotdb.confignode.manager.pipe.plugin;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
similarity index 97%
rename from
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
rename to
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
index 515abfdbdd1..ba9bba61c6a 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.manager.pipe;
+package org.apache.iotdb.confignode.manager.pipe.runtime;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
similarity index 98%
rename from
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
rename to
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 81df8587176..9a117a0fbd1 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.manager.pipe;
+package org.apache.iotdb.confignode.manager.pipe.runtime;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
similarity index 91%
rename from
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
rename to
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
index e01f4b785bc..573c1cb1a80 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.pipe;
+package org.apache.iotdb.confignode.manager.pipe.task;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2;
@@ -25,7 +25,6 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -76,7 +75,14 @@ public class PipeTaskCoordinator {
return configManager.getProcedureManager().dropPipe(pipeName);
}
- public TGetAllPipeInfoResp showPipes() {
+ public TShowPipeResp showPipes(TShowPipeReq req) {
+ return ((PipeTableResp)
+ configManager.getConsensusManager().read(new
ShowPipePlanV2()).getDataset())
+ .filter(req.whereClause, req.pipeName)
+ .convertToTShowPipeResp();
+ }
+
+ public TGetAllPipeInfoResp getAllPipeInfo() {
try {
return ((PipeTableResp)
configManager.getConsensusManager().read(new
ShowPipePlanV2()).getDataset())
@@ -89,15 +95,4 @@ public class PipeTaskCoordinator {
Collections.emptyList());
}
}
-
- public TShowPipeResp showPipes(TShowPipeReq req) {
- return ((PipeTableResp)
- configManager.getConsensusManager().read(new
ShowPipePlanV2()).getDataset())
- .filter(req.whereClause, req.pipeName)
- .convertToTShowPipeResp();
- }
-
- public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
- throw new UnsupportedOperationException("Not implemented yet");
- }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index b4790661278..88f40bdc114 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.pipe.PipePluginCoordinator;
+import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 7c8268ecd02..d44df9ac580 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
-import org.apache.iotdb.confignode.manager.pipe.PipePluginCoordinator;
+import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 22367d7c164..c2b44e07de1 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -127,7 +127,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -909,12 +908,6 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
return configManager.getAllPipeInfo();
}
- @Override
- @Deprecated
- public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
- return configManager.recordPipeMessage(req);
- }
-
@Override
public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
if (req.isSetTimeStamp() && req.getType() !=
TConsensusGroupType.DataRegion) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index ca35b446fc3..4376f6a4eec 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -96,7 +96,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -1880,22 +1879,6 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
throw new TException(MSG_RECONNECTION_FAIL);
}
- @Override
- public TSStatus recordPipeMessage(TRecordPipeMessageReq req) throws
TException {
- for (int i = 0; i < RETRY_NUM; i++) {
- try {
- TSStatus status = client.recordPipeMessage(req);
- if (!updateConfigNodeLeader(status)) {
- return status;
- }
- } catch (TException e) {
- configLeader = null;
- }
- waitAndReconnect();
- }
- throw new TException(MSG_RECONNECTION_FAIL);
- }
-
@Override
public TGetRegionIdResp getRegionId(TGetRegionIdReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
new file mode 100644
index 00000000000..2d0d45ebc02
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.runtime;
+
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import
org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
+import
org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.service.ResourcesInformationHolder;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PipeLauncher {
+
+ private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+
+ public void launchPipePluginAgent(ResourcesInformationHolder
resourcesInformationHolder)
+ throws StartupException {
+ initPipePluginRelatedInstances();
+
+ if (resourcesInformationHolder.getPipePluginMetaList() == null
+ || resourcesInformationHolder.getPipePluginMetaList().isEmpty()) {
+ return;
+ }
+
+ final List<PipePluginMeta> uninstalledOrConflictedPipePluginMetaList =
+
getUninstalledOrConflictedPipePluginMetaList(resourcesInformationHolder);
+ int index = 0;
+ while (index < uninstalledOrConflictedPipePluginMetaList.size()) {
+ List<PipePluginMeta> curList = new ArrayList<>();
+ int offset = 0;
+ while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
+ && index + offset <
uninstalledOrConflictedPipePluginMetaList.size()) {
+ curList.add(uninstalledOrConflictedPipePluginMetaList.get(index +
offset));
+ offset++;
+ }
+ index += (offset + 1);
+ fetchAndSavePipePluginJars(curList);
+ }
+
+ // create instances of pipe plugins and do registration
+ try {
+ for (PipePluginMeta meta :
resourcesInformationHolder.getPipePluginMetaList()) {
+ if (meta.isBuiltin()) {
+ continue;
+ }
+ PipeAgent.plugin().doRegister(meta);
+ }
+ } catch (Exception e) {
+ throw new StartupException(e);
+ }
+ }
+
+ private void initPipePluginRelatedInstances() throws StartupException {
+ try {
+ PipePluginExecutableManager.setupAndGetInstance(
+ IOTDB_CONFIG.getPipeTemporaryLibDir(), IOTDB_CONFIG.getPipeDir());
+
PipePluginClassLoaderManager.setupAndGetInstance(IOTDB_CONFIG.getPipeDir());
+ } catch (IOException e) {
+ throw new StartupException(e);
+ }
+ }
+
+ private List<PipePluginMeta> getUninstalledOrConflictedPipePluginMetaList(
+ ResourcesInformationHolder resourcesInformationHolder) {
+ final List<PipePluginMeta> pipePluginMetaList = new ArrayList<>();
+ for (PipePluginMeta pipePluginMeta :
resourcesInformationHolder.getPipePluginMetaList()) {
+ if (pipePluginMeta.isBuiltin()) {
+ continue;
+ }
+ // If jar does not exist, add current pipePluginMeta to list
+ if (!PipePluginExecutableManager.getInstance()
+ .hasFileUnderInstallDir(pipePluginMeta.getJarName())) {
+ pipePluginMetaList.add(pipePluginMeta);
+ } else {
+ try {
+ // local jar has conflicts with jar on config node, add current
pipePluginMeta to list
+ if
(!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
+ pipePluginMetaList.add(pipePluginMeta);
+ }
+ } catch (PipeManagementException e) {
+ pipePluginMetaList.add(pipePluginMeta);
+ }
+ }
+ }
+ return pipePluginMetaList;
+ }
+
+ private void fetchAndSavePipePluginJars(List<PipePluginMeta>
pipePluginMetaList)
+ throws StartupException {
+ try (ConfigNodeClient configNodeClient =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ final List<String> jarNameList =
+
pipePluginMetaList.stream().map(PipePluginMeta::getJarName).collect(Collectors.toList());
+ final TGetJarInListResp resp =
+ configNodeClient.getPipePluginJar(new TGetJarInListReq(jarNameList));
+ if (resp.getStatus().getCode() ==
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
+ throw new StartupException("Failed to get pipe plugin jar from config
node.");
+ }
+ final List<ByteBuffer> jarList = resp.getJarList();
+ for (int i = 0; i < pipePluginMetaList.size(); i++) {
+ PipePluginExecutableManager.getInstance()
+ .saveToInstallDir(jarList.get(i),
pipePluginMetaList.get(i).getJarName());
+ }
+ } catch (IOException | TException | ClientManagerException e) {
+ throw new StartupException(e);
+ }
+ }
+
+ public void launchPipeTaskAgent() throws StartupException {
+ try (final ConfigNodeClient configNodeClient =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ final TGetAllPipeInfoResp getAllPipeInfoResp =
configNodeClient.getAllPipeInfo();
+ if (getAllPipeInfoResp.getStatus().getCode()
+ == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
+ throw new StartupException("Failed to get pipe task meta from config
node.");
+ }
+
+ PipeAgent.task()
+ .handlePipeMetaChanges(
+ getAllPipeInfoResp.getAllPipeInfo().stream()
+ .map(PipeMeta::deserialize)
+ .collect(Collectors.toList()));
+ } catch (StartupException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new StartupException(e);
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 1160ff4a845..2d85aac9e8c 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.pipe.agent.runtime;
+import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.db.service.ResourcesInformationHolder;
import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
import org.slf4j.Logger;
@@ -29,6 +31,13 @@ public class PipeRuntimeAgent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeRuntimeAgent.class);
+ public synchronized void launch(ResourcesInformationHolder
resourcesInformationHolder)
+ throws StartupException {
+ final PipeLauncher pipeLauncher = new PipeLauncher();
+ pipeLauncher.launchPipePluginAgent(resourcesInformationHolder);
+ pipeLauncher.launchPipeTaskAgent();
+ }
+
public void report(PipeSubtask subtask) {
// TODO: terminate the task by the given taskID
LOGGER.warn(
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 3e0f39ac4ff..44a5690db8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -31,8 +31,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
-import
org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
-import
org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.service.metric.MetricService;
@@ -84,7 +82,6 @@ import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.utils.WALMode;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.utils.InternalReporterType;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.udf.api.exception.UDFManagementException;
@@ -461,7 +458,7 @@ public class DataNode implements DataNodeMBean {
private void prepareResources() throws StartupException {
prepareUDFResources();
prepareTriggerResources();
- preparePipePluginResources();
+ preparePipeResources();
}
/** register services and set up DataNode */
@@ -836,93 +833,8 @@ public class DataNode implements DataNodeMBean {
}
}
- private void preparePipePluginResources() throws StartupException {
- initPipePluginRelatedInstance();
- if (resourcesInformationHolder.getPipePluginMetaList() == null
- || resourcesInformationHolder.getPipePluginMetaList().isEmpty()) {
- return;
- }
-
- // get jars from config node
- List<PipePluginMeta> pipePluginNeedJarList = getJarListForPipePlugin();
- int index = 0;
- while (index < pipePluginNeedJarList.size()) {
- List<PipePluginMeta> curList = new ArrayList<>();
- int offset = 0;
- while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
- && index + offset < pipePluginNeedJarList.size()) {
- curList.add(pipePluginNeedJarList.get(index + offset));
- offset++;
- }
- index += (offset + 1);
- getJarOfPipePlugins(curList);
- }
-
- // create instances of pipe plugins and do registration
- try {
- for (PipePluginMeta meta :
resourcesInformationHolder.getPipePluginMetaList()) {
- if (meta.isBuiltin()) {
- continue;
- }
- PipeAgent.plugin().doRegister(meta);
- }
- } catch (Exception e) {
- throw new StartupException(e);
- }
- }
-
- private void initPipePluginRelatedInstance() throws StartupException {
- try {
- PipePluginExecutableManager.setupAndGetInstance(
- config.getPipeTemporaryLibDir(), config.getPipeDir());
- PipePluginClassLoaderManager.setupAndGetInstance(config.getPipeDir());
- } catch (IOException e) {
- throw new StartupException(e);
- }
- }
-
- private List<PipePluginMeta> getJarListForPipePlugin() {
- List<PipePluginMeta> res = new ArrayList<>();
- for (PipePluginMeta pipePluginMeta :
resourcesInformationHolder.getPipePluginMetaList()) {
- if (pipePluginMeta.isBuiltin()) {
- continue;
- }
- // If jar does not exist, add current pipePluginMeta to list
- if (!PipePluginExecutableManager.getInstance()
- .hasFileUnderInstallDir(pipePluginMeta.getJarName())) {
- res.add(pipePluginMeta);
- } else {
- try {
- // local jar has conflicts with jar on config node, add current
pipePluginMeta to list
- if
(!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
- res.add(pipePluginMeta);
- }
- } catch (PipeManagementException e) {
- res.add(pipePluginMeta);
- }
- }
- }
- return res;
- }
-
- private void getJarOfPipePlugins(List<PipePluginMeta> pipePluginMetaList)
- throws StartupException {
- try (ConfigNodeClient configNodeClient =
-
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
- List<String> jarNameList =
-
pipePluginMetaList.stream().map(PipePluginMeta::getJarName).collect(Collectors.toList());
- TGetJarInListResp resp = configNodeClient.getPipePluginJar(new
TGetJarInListReq(jarNameList));
- if (resp.getStatus().getCode() ==
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
- throw new StartupException("Failed to get pipe plugin jar from config
node.");
- }
- List<ByteBuffer> jarList = resp.getJarList();
- for (int i = 0; i < pipePluginMetaList.size(); i++) {
- PipePluginExecutableManager.getInstance()
- .saveToInstallDir(jarList.get(i),
pipePluginMetaList.get(i).getJarName());
- }
- } catch (IOException | TException | ClientManagerException e) {
- throw new StartupException(e);
- }
+ private void preparePipeResources() throws StartupException {
+ PipeAgent.runtime().launch(resourcesInformationHolder);
}
private void getPipeInformationList(List<ByteBuffer> allPipeInformation) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
index 6c4c6282a7f..929a7ce4f47 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeClientManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
@@ -126,15 +125,7 @@ public class ClusterSyncInfoFetcher implements
ISyncInfoFetcher {
@Override
public TSStatus recordMsg(String pipeName, PipeMessage message) {
- try (ConfigNodeClient configNodeClient =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- TRecordPipeMessageReq req =
- new TRecordPipeMessageReq(pipeName, message.serializeToByteBuffer());
- return configNodeClient.recordPipeMessage(req);
- } catch (Exception e) {
- LOGGER.error("RecordMsg error because {}", e.getMessage(), e);
- return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, e.getMessage());
- }
+ return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, "method not supported");
}
// endregion
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index 7607d0cedfc..13f3461981a 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -600,11 +600,7 @@ struct TGetPathsSetTemplatesResp {
2: optional list<string> pathList
}
-// SYNC
-struct TRecordPipeMessageReq{
- 1: required string pipeName
- 2: required binary message
-}
+// Pipe
struct TShowPipeInfo {
1: required string id
@@ -1294,9 +1290,6 @@ service IConfigNodeRPCService {
/* Get all pipe information. It is used for DataNode registration and
restart*/
TGetAllPipeInfoResp getAllPipeInfo();
- /* Get all pipe information. It is used for DataNode registration and
restart*/
- common.TSStatus recordPipeMessage(TRecordPipeMessageReq req);
-
// ======================================================
// TestTools
// ======================================================