This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch revert-IOTDB-5893 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9124e21b8a49206a054b0c3fee41f071fb97bb14 Author: Steve Yurong Su <[email protected]> AuthorDate: Sun May 21 23:14:49 2023 +0800 Revert "[IOTDB-5893] Pipe: PipeLauncher for data node setup process (#9879)" This reverts commit 6f3f4e0d92f9b01c76990e14490d62c5b6657ad2. --- .../iotdb/confignode/manager/ConfigManager.java | 11 +- .../apache/iotdb/confignode/manager/IManager.java | 8 + .../iotdb/confignode/manager/pipe/PipeManager.java | 3 - .../manager/pipe/{runtime => }/PipeMetaSyncer.java | 2 +- .../pipe/{plugin => }/PipePluginCoordinator.java | 2 +- .../pipe/{runtime => }/PipeRuntimeCoordinator.java | 2 +- .../pipe/{task => }/PipeTaskCoordinator.java | 23 +-- .../pipe/plugin/CreatePipePluginProcedure.java | 2 +- .../impl/pipe/plugin/DropPipePluginProcedure.java | 2 +- .../thrift/ConfigNodeRPCServiceProcessor.java | 7 + .../apache/iotdb/db/client/ConfigNodeClient.java | 17 +++ .../iotdb/db/pipe/agent/runtime/PipeLauncher.java | 166 --------------------- .../db/pipe/agent/runtime/PipeRuntimeAgent.java | 9 -- .../java/org/apache/iotdb/db/service/DataNode.java | 94 +++++++++++- .../db/sync/common/ClusterSyncInfoFetcher.java | 11 +- .../src/main/thrift/confignode.thrift | 9 +- 16 files changed, 170 insertions(+), 198 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index a23b6b44412..11d2bd2912e 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -144,6 +144,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; +import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq; import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp; import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp; @@ -1623,10 +1624,18 @@ public class ConfigManager implements IManager { public TGetAllPipeInfoResp getAllPipeInfo() { TSStatus status = confirmLeader(); return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - ? pipeManager.getPipeTaskCoordinator().getAllPipeInfo() + ? pipeManager.getPipeTaskCoordinator().showPipes() : new TGetAllPipeInfoResp().setStatus(status); } + @Override + public TSStatus recordPipeMessage(TRecordPipeMessageReq req) { + TSStatus status = confirmLeader(); + return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + ? pipeManager.getPipeTaskCoordinator().recordPipeMessage(req) + : status; + } + @Override public TGetRegionIdResp getRegionId(TGetRegionIdReq req) { TSStatus status = confirmLeader(); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index 5eea6b30fdf..4a305f822bc 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -87,6 +87,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; +import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq; import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp; import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp; @@ -597,6 +598,13 @@ public interface IManager { */ TGetAllPipeInfoResp getAllPipeInfo(); + /** + * Record PipeMessage + * + * @return TSStatus + */ + TSStatus recordPipeMessage(TRecordPipeMessageReq req); + /** * Get RegionId。used for Show cluster slots information in * docs/zh/UserGuide/Cluster/Cluster-Maintenance.md. diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java index be9440796fa..ec9ffd63009 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java @@ -20,9 +20,6 @@ package org.apache.iotdb.confignode.manager.pipe; import org.apache.iotdb.confignode.manager.ConfigManager; -import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator; -import org.apache.iotdb.confignode.manager.pipe.runtime.PipeRuntimeCoordinator; -import org.apache.iotdb.confignode.manager.pipe.task.PipeTaskCoordinator; import org.apache.iotdb.confignode.persistence.pipe.PipeInfo; public class PipeManager { diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java similarity index 97% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java index ba9bba61c6a..515abfdbdd1 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.confignode.manager.pipe.runtime; +package org.apache.iotdb.confignode.manager.pipe; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java similarity index 98% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java index 7435344edbf..8943a28e8c2 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.confignode.manager.pipe.plugin; +package org.apache.iotdb.confignode.manager.pipe; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java similarity index 98% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java index 9a117a0fbd1..81df8587176 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.confignode.manager.pipe.runtime; +package org.apache.iotdb.confignode.manager.pipe; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java similarity index 91% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java index 573c1cb1a80..e01f4b785bc 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.manager.pipe.task; +package org.apache.iotdb.confignode.manager.pipe; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2; @@ -25,6 +25,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; +import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; import org.apache.iotdb.rpc.TSStatusCode; @@ -75,14 +76,7 @@ public class PipeTaskCoordinator { return configManager.getProcedureManager().dropPipe(pipeName); } - public TShowPipeResp showPipes(TShowPipeReq req) { - return ((PipeTableResp) - configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset()) - .filter(req.whereClause, req.pipeName) - .convertToTShowPipeResp(); - } - - public TGetAllPipeInfoResp getAllPipeInfo() { + public TGetAllPipeInfoResp showPipes() { try { return ((PipeTableResp) configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset()) @@ -95,4 +89,15 @@ public class PipeTaskCoordinator { Collections.emptyList()); } } + + public TShowPipeResp showPipes(TShowPipeReq req) { + return ((PipeTableResp) + configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset()) + .filter(req.whereClause, req.pipeName) + .convertToTShowPipeResp(); + } + + public TSStatus recordPipeMessage(TRecordPipeMessageReq req) { + throw new UnsupportedOperationException("Not implemented yet"); + } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java index 88f40bdc114..b4790661278 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java @@ -24,7 +24,7 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan; import org.apache.iotdb.confignode.manager.ConfigManager; -import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator; +import org.apache.iotdb.confignode.manager.pipe.PipePluginCoordinator; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java index d44df9ac580..7c8268ecd02 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java @@ -20,7 +20,7 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.plugin; import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan; -import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator; +import org.apache.iotdb.confignode.manager.pipe.PipePluginCoordinator; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index c2b44e07de1..22367d7c164 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -127,6 +127,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TLoginReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo; +import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq; import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp; import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq; @@ -908,6 +909,12 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac return configManager.getAllPipeInfo(); } + @Override + @Deprecated + public TSStatus recordPipeMessage(TRecordPipeMessageReq req) { + return configManager.recordPipeMessage(req); + } + @Override public TGetRegionIdResp getRegionId(TGetRegionIdReq req) { if (req.isSetTimeStamp() && req.getType() != TConsensusGroupType.DataRegion) { diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java index 4376f6a4eec..ca35b446fc3 100644 --- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java +++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java @@ -96,6 +96,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TLoginReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo; +import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq; import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp; import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq; @@ -1879,6 +1880,22 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie throw new TException(MSG_RECONNECTION_FAIL); } + @Override + public TSStatus recordPipeMessage(TRecordPipeMessageReq req) throws TException { + for (int i = 0; i < RETRY_NUM; i++) { + try { + TSStatus status = client.recordPipeMessage(req); + if (!updateConfigNodeLeader(status)) { + return status; + } + } catch (TException e) { + configLeader = null; + } + waitAndReconnect(); + } + throw new TException(MSG_RECONNECTION_FAIL); + } + @Override public TGetRegionIdResp getRegionId(TGetRegionIdReq req) throws TException { for (int i = 0; i < RETRY_NUM; i++) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java deleted file mode 100644 index 2d0d45ebc02..00000000000 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.agent.runtime; - -import org.apache.iotdb.commons.client.exception.ClientManagerException; -import org.apache.iotdb.commons.exception.StartupException; -import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta; -import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager; -import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager; -import org.apache.iotdb.commons.pipe.task.meta.PipeMeta; -import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; -import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq; -import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp; -import org.apache.iotdb.db.client.ConfigNodeClient; -import org.apache.iotdb.db.client.ConfigNodeClientManager; -import org.apache.iotdb.db.client.ConfigNodeInfo; -import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.pipe.agent.PipeAgent; -import org.apache.iotdb.db.service.ResourcesInformationHolder; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; -import org.apache.iotdb.rpc.TSStatusCode; - -import org.apache.thrift.TException; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -public class PipeLauncher { - - private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig(); - - public void launchPipePluginAgent(ResourcesInformationHolder resourcesInformationHolder) - throws StartupException { - initPipePluginRelatedInstances(); - - if (resourcesInformationHolder.getPipePluginMetaList() == null - || resourcesInformationHolder.getPipePluginMetaList().isEmpty()) { - return; - } - - final List<PipePluginMeta> uninstalledOrConflictedPipePluginMetaList = - getUninstalledOrConflictedPipePluginMetaList(resourcesInformationHolder); - int index = 0; - while (index < uninstalledOrConflictedPipePluginMetaList.size()) { - List<PipePluginMeta> curList = new ArrayList<>(); - int offset = 0; - while (offset < ResourcesInformationHolder.getJarNumOfOneRpc() - && index + offset < uninstalledOrConflictedPipePluginMetaList.size()) { - curList.add(uninstalledOrConflictedPipePluginMetaList.get(index + offset)); - offset++; - } - index += (offset + 1); - fetchAndSavePipePluginJars(curList); - } - - // create instances of pipe plugins and do registration - try { - for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) { - if (meta.isBuiltin()) { - continue; - } - PipeAgent.plugin().doRegister(meta); - } - } catch (Exception e) { - throw new StartupException(e); - } - } - - private void initPipePluginRelatedInstances() throws StartupException { - try { - PipePluginExecutableManager.setupAndGetInstance( - IOTDB_CONFIG.getPipeTemporaryLibDir(), IOTDB_CONFIG.getPipeDir()); - PipePluginClassLoaderManager.setupAndGetInstance(IOTDB_CONFIG.getPipeDir()); - } catch (IOException e) { - throw new StartupException(e); - } - } - - private List<PipePluginMeta> getUninstalledOrConflictedPipePluginMetaList( - ResourcesInformationHolder resourcesInformationHolder) { - final List<PipePluginMeta> pipePluginMetaList = new ArrayList<>(); - for (PipePluginMeta pipePluginMeta : resourcesInformationHolder.getPipePluginMetaList()) { - if (pipePluginMeta.isBuiltin()) { - continue; - } - // If jar does not exist, add current pipePluginMeta to list - if (!PipePluginExecutableManager.getInstance() - .hasFileUnderInstallDir(pipePluginMeta.getJarName())) { - pipePluginMetaList.add(pipePluginMeta); - } else { - try { - // local jar has conflicts with jar on config node, add current pipePluginMeta to list - if (!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) { - pipePluginMetaList.add(pipePluginMeta); - } - } catch (PipeManagementException e) { - pipePluginMetaList.add(pipePluginMeta); - } - } - } - return pipePluginMetaList; - } - - private void fetchAndSavePipePluginJars(List<PipePluginMeta> pipePluginMetaList) - throws StartupException { - try (ConfigNodeClient configNodeClient = - ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final List<String> jarNameList = - pipePluginMetaList.stream().map(PipePluginMeta::getJarName).collect(Collectors.toList()); - final TGetJarInListResp resp = - configNodeClient.getPipePluginJar(new TGetJarInListReq(jarNameList)); - if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) { - throw new StartupException("Failed to get pipe plugin jar from config node."); - } - final List<ByteBuffer> jarList = resp.getJarList(); - for (int i = 0; i < pipePluginMetaList.size(); i++) { - PipePluginExecutableManager.getInstance() - .saveToInstallDir(jarList.get(i), pipePluginMetaList.get(i).getJarName()); - } - } catch (IOException | TException | ClientManagerException e) { - throw new StartupException(e); - } - } - - public void launchPipeTaskAgent() throws StartupException { - try (final ConfigNodeClient configNodeClient = - ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final TGetAllPipeInfoResp getAllPipeInfoResp = configNodeClient.getAllPipeInfo(); - if (getAllPipeInfoResp.getStatus().getCode() - == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) { - throw new StartupException("Failed to get pipe task meta from config node."); - } - - PipeAgent.task() - .handlePipeMetaChanges( - getAllPipeInfoResp.getAllPipeInfo().stream() - .map(PipeMeta::deserialize) - .collect(Collectors.toList())); - } catch (StartupException e) { - throw e; - } catch (Exception e) { - throw new StartupException(e); - } - } -} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java index 2d85aac9e8c..1160ff4a845 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java @@ -19,9 +19,7 @@ package org.apache.iotdb.db.pipe.agent.runtime; -import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask; -import org.apache.iotdb.db.service.ResourcesInformationHolder; import org.apache.iotdb.pipe.api.exception.PipeRuntimeException; import org.slf4j.Logger; @@ -31,13 +29,6 @@ public class PipeRuntimeAgent { private static final Logger LOGGER = LoggerFactory.getLogger(PipeRuntimeAgent.class); - public synchronized void launch(ResourcesInformationHolder resourcesInformationHolder) - throws StartupException { - final PipeLauncher pipeLauncher = new PipeLauncher(); - pipeLauncher.launchPipePluginAgent(resourcesInformationHolder); - pipeLauncher.launchPipeTaskAgent(); - } - public void report(PipeSubtask subtask) { // TODO: terminate the task by the given taskID LOGGER.warn( diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java index 44a5690db8c..3e0f39ac4ff 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -31,6 +31,8 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta; +import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager; +import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager; import org.apache.iotdb.commons.service.JMXService; import org.apache.iotdb.commons.service.RegisterManager; import org.apache.iotdb.commons.service.metric.MetricService; @@ -82,6 +84,7 @@ import org.apache.iotdb.db.wal.WALManager; import org.apache.iotdb.db.wal.utils.WALMode; import org.apache.iotdb.metrics.config.MetricConfigDescriptor; import org.apache.iotdb.metrics.utils.InternalReporterType; +import org.apache.iotdb.pipe.api.exception.PipeManagementException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.udf.api.exception.UDFManagementException; @@ -458,7 +461,7 @@ public class DataNode implements DataNodeMBean { private void prepareResources() throws StartupException { prepareUDFResources(); prepareTriggerResources(); - preparePipeResources(); + preparePipePluginResources(); } /** register services and set up DataNode */ @@ -833,8 +836,93 @@ public class DataNode implements DataNodeMBean { } } - private void preparePipeResources() throws StartupException { - PipeAgent.runtime().launch(resourcesInformationHolder); + private void preparePipePluginResources() throws StartupException { + initPipePluginRelatedInstance(); + if (resourcesInformationHolder.getPipePluginMetaList() == null + || resourcesInformationHolder.getPipePluginMetaList().isEmpty()) { + return; + } + + // get jars from config node + List<PipePluginMeta> pipePluginNeedJarList = getJarListForPipePlugin(); + int index = 0; + while (index < pipePluginNeedJarList.size()) { + List<PipePluginMeta> curList = new ArrayList<>(); + int offset = 0; + while (offset < ResourcesInformationHolder.getJarNumOfOneRpc() + && index + offset < pipePluginNeedJarList.size()) { + curList.add(pipePluginNeedJarList.get(index + offset)); + offset++; + } + index += (offset + 1); + getJarOfPipePlugins(curList); + } + + // create instances of pipe plugins and do registration + try { + for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) { + if (meta.isBuiltin()) { + continue; + } + PipeAgent.plugin().doRegister(meta); + } + } catch (Exception e) { + throw new StartupException(e); + } + } + + private void initPipePluginRelatedInstance() throws StartupException { + try { + PipePluginExecutableManager.setupAndGetInstance( + config.getPipeTemporaryLibDir(), config.getPipeDir()); + PipePluginClassLoaderManager.setupAndGetInstance(config.getPipeDir()); + } catch (IOException e) { + throw new StartupException(e); + } + } + + private List<PipePluginMeta> getJarListForPipePlugin() { + List<PipePluginMeta> res = new ArrayList<>(); + for (PipePluginMeta pipePluginMeta : resourcesInformationHolder.getPipePluginMetaList()) { + if (pipePluginMeta.isBuiltin()) { + continue; + } + // If jar does not exist, add current pipePluginMeta to list + if (!PipePluginExecutableManager.getInstance() + .hasFileUnderInstallDir(pipePluginMeta.getJarName())) { + res.add(pipePluginMeta); + } else { + try { + // local jar has conflicts with jar on config node, add current pipePluginMeta to list + if (!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) { + res.add(pipePluginMeta); + } + } catch (PipeManagementException e) { + res.add(pipePluginMeta); + } + } + } + return res; + } + + private void getJarOfPipePlugins(List<PipePluginMeta> pipePluginMetaList) + throws StartupException { + try (ConfigNodeClient configNodeClient = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + List<String> jarNameList = + pipePluginMetaList.stream().map(PipePluginMeta::getJarName).collect(Collectors.toList()); + TGetJarInListResp resp = configNodeClient.getPipePluginJar(new TGetJarInListReq(jarNameList)); + if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) { + throw new StartupException("Failed to get pipe plugin jar from config node."); + } + List<ByteBuffer> jarList = resp.getJarList(); + for (int i = 0; i < pipePluginMetaList.size(); i++) { + PipePluginExecutableManager.getInstance() + .saveToInstallDir(jarList.get(i), pipePluginMetaList.get(i).getJarName()); + } + } catch (IOException | TException | ClientManagerException e) { + throw new StartupException(e); + } } private void getPipeInformationList(List<ByteBuffer> allPipeInformation) { diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java index 929a7ce4f47..6c4c6282a7f 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.sync.pipesink.PipeSink; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq; import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp; +import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq; import org.apache.iotdb.db.client.ConfigNodeClient; import org.apache.iotdb.db.client.ConfigNodeClientManager; import org.apache.iotdb.db.client.ConfigNodeInfo; @@ -125,7 +126,15 @@ public class ClusterSyncInfoFetcher implements ISyncInfoFetcher { @Override public TSStatus recordMsg(String pipeName, PipeMessage message) { - return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, "method not supported"); + try (ConfigNodeClient configNodeClient = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TRecordPipeMessageReq req = + new TRecordPipeMessageReq(pipeName, message.serializeToByteBuffer()); + return configNodeClient.recordPipeMessage(req); + } catch (Exception e) { + LOGGER.error("RecordMsg error because {}", e.getMessage(), e); + return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, e.getMessage()); + } } // endregion diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift index 13f3461981a..7607d0cedfc 100644 --- a/thrift-confignode/src/main/thrift/confignode.thrift +++ b/thrift-confignode/src/main/thrift/confignode.thrift @@ -600,7 +600,11 @@ struct TGetPathsSetTemplatesResp { 2: optional list<string> pathList } -// Pipe +// SYNC +struct TRecordPipeMessageReq{ + 1: required string pipeName + 2: required binary message +} struct TShowPipeInfo { 1: required string id @@ -1290,6 +1294,9 @@ service IConfigNodeRPCService { /* Get all pipe information. It is used for DataNode registration and restart*/ TGetAllPipeInfoResp getAllPipeInfo(); + /* Get all pipe information. It is used for DataNode registration and restart*/ + common.TSStatus recordPipeMessage(TRecordPipeMessageReq req); + // ====================================================== // TestTools // ======================================================
