This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-package-refactor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 72fc68413b7f6a047a7dabd1460d42019a3ee1cf Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Jun 7 03:05:29 2023 +0800 Pipe: refactor package structure (pipe-api) --- .../persistence/pipe/PipePluginInfo.java | 16 +++++------ .../confignode/persistence/pipe/PipeTaskInfo.java | 23 ++++++++-------- .../pipe/plugin/CreatePipePluginProcedure.java | 8 +++--- .../impl/pipe/plugin/DropPipePluginProcedure.java | 6 ++-- .../runtime/PipeHandleLeaderChangeProcedure.java | 6 ++-- .../runtime/PipeHandleMetaChangeProcedure.java | 4 +-- .../pipe/task/AbstractOperatePipeProcedureV2.java | 5 ++-- .../impl/pipe/task/CreatePipeProcedureV2.java | 15 ++++------ .../impl/pipe/task/DropPipeProcedureV2.java | 13 ++++----- .../impl/pipe/task/StartPipeProcedureV2.java | 17 +++++------- .../impl/pipe/task/StopPipeProcedureV2.java | 17 +++++------- .../org/apache/iotdb/pipe/api/event/EventType.java | 26 ------------------ .../event/dml/insertion/TsFileInsertionEvent.java | 8 ------ .../api/exception/PipeManagementException.java | 32 ---------------------- .../service/PipePluginExecutableManager.java | 6 ++-- .../db/pipe/agent/plugin/PipePluginAgent.java | 19 ++++++------- .../db/pipe/agent/runtime/PipeAgentLauncher.java | 4 +-- .../pipe/collector/IoTDBDataRegionCollector.java | 4 +-- .../common/tsfile/PipeTsFileInsertionEvent.java | 5 ---- .../task/subtask/PipeConnectorSubtaskManager.java | 3 +- 20 files changed, 75 insertions(+), 162 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index f839f6251dc..22f0cb97943 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -38,7 +38,7 @@ import org.apache.iotdb.db.pipe.config.PipeCollectorConstant; import org.apache.iotdb.db.pipe.config.PipeConnectorConstant; import org.apache.iotdb.db.pipe.config.PipeProcessorConstant; import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; @@ -93,14 +93,14 @@ public class PipePluginInfo implements SnapshotProcessor { public void validateBeforeCreatingPipePlugin(String pluginName, String jarName, String jarMD5) { // both build-in and user defined pipe plugin should be unique if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) { - throw new PipeManagementException( + throw new PipeException( String.format( "Failed to create PipePlugin [%s], the same name PipePlugin has been created", pluginName)); } if (pipePluginMetaKeeper.jarNameExistsAndMatchesMd5(jarName, jarMD5)) { - throw new PipeManagementException( + throw new PipeException( String.format( "Failed to create PipePlugin [%s], the same name Jar [%s] but different MD5 [%s] has existed", pluginName, jarName, jarMD5)); @@ -110,7 +110,7 @@ public class PipePluginInfo implements SnapshotProcessor { public void validateBeforeDroppingPipePlugin(String pluginName) { if (pipePluginMetaKeeper.containsPipePlugin(pluginName) && pipePluginMetaKeeper.getPipePluginMeta(pluginName).isBuiltin()) { - throw new PipeManagementException( + throw new PipeException( String.format( "Failed to drop PipePlugin [%s], the PipePlugin is a built-in PipePlugin", pluginName)); @@ -133,7 +133,7 @@ public class PipePluginInfo implements SnapshotProcessor { "Failed to create pipe, the pipe collector plugin %s does not exist", collectorPluginName); LOGGER.warn(exceptionMessage); - throw new PipeManagementException(exceptionMessage); + throw new PipeException(exceptionMessage); } final PipeParameters processorParameters = @@ -147,7 +147,7 @@ public class PipePluginInfo implements SnapshotProcessor { "Failed to create pipe, the pipe processor plugin %s does not exist", processorPluginName); LOGGER.warn(exceptionMessage); - throw new PipeManagementException(exceptionMessage); + throw new PipeException(exceptionMessage); } final PipeParameters connectorParameters = @@ -156,7 +156,7 @@ public class PipePluginInfo implements SnapshotProcessor { final String exceptionMessage = "Failed to create pipe, the pipe connector plugin is not specified"; LOGGER.warn(exceptionMessage); - throw new PipeManagementException(exceptionMessage); + throw new PipeException(exceptionMessage); } final String connectorPluginName = connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY); @@ -166,7 +166,7 @@ public class PipePluginInfo implements SnapshotProcessor { "Failed to create pipe, the pipe connector plugin %s does not exist", connectorPluginName); LOGGER.warn(exceptionMessage); - throw new PipeManagementException(exceptionMessage); + throw new PipeException(exceptionMessage); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 8aaa0948f80..d2580f87a1d 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -34,7 +34,7 @@ import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStat import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.consensus.common.DataSet; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; @@ -74,8 +74,7 @@ public class PipeTaskInfo implements SnapshotProcessor { /////////////////////////////// Validator /////////////////////////////// - public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) - throws PipeManagementException { + public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) throws PipeException { if (!isPipeExisted(createPipeRequest.getPipeName())) { return; } @@ -85,15 +84,15 @@ public class PipeTaskInfo implements SnapshotProcessor { "Failed to create pipe %s, the pipe with the same name has been created", createPipeRequest.getPipeName()); LOGGER.info(exceptionMessage); - throw new PipeManagementException(exceptionMessage); + throw new PipeException(exceptionMessage); } - public void checkBeforeStartPipe(String pipeName) throws PipeManagementException { + public void checkBeforeStartPipe(String pipeName) throws PipeException { if (!isPipeExisted(pipeName)) { final String exceptionMessage = String.format("Failed to start pipe %s, the pipe does not exist", pipeName); LOGGER.info(exceptionMessage); - throw new PipeManagementException(exceptionMessage); + throw new PipeException(exceptionMessage); } final PipeStatus pipeStatus = getPipeStatus(pipeName); @@ -101,22 +100,22 @@ public class PipeTaskInfo implements SnapshotProcessor { final String exceptionMessage = String.format("Failed to start pipe %s, the pipe is already running", pipeName); LOGGER.info(exceptionMessage); - throw new PipeManagementException(exceptionMessage); + throw new PipeException(exceptionMessage); } if (pipeStatus == PipeStatus.DROPPED) { final String exceptionMessage = String.format("Failed to start pipe %s, the pipe is already dropped", pipeName); LOGGER.info(exceptionMessage); - throw new PipeManagementException(exceptionMessage); + throw new PipeException(exceptionMessage); } } - public void checkBeforeStopPipe(String pipeName) throws PipeManagementException { + public void checkBeforeStopPipe(String pipeName) throws PipeException { if (!isPipeExisted(pipeName)) { final String exceptionMessage = String.format("Failed to stop pipe %s, the pipe does not exist", pipeName); LOGGER.info(exceptionMessage); - throw new PipeManagementException(exceptionMessage); + throw new PipeException(exceptionMessage); } final PipeStatus pipeStatus = getPipeStatus(pipeName); @@ -124,13 +123,13 @@ public class PipeTaskInfo implements SnapshotProcessor { final String exceptionMessage = String.format("Failed to stop pipe %s, the pipe is already stop", pipeName); LOGGER.info(exceptionMessage); - throw new PipeManagementException(exceptionMessage); + throw new PipeException(exceptionMessage); } if (pipeStatus == PipeStatus.DROPPED) { final String exceptionMessage = String.format("Failed to stop pipe %s, the pipe is already dropped", pipeName); LOGGER.info(exceptionMessage); - throw new PipeManagementException(exceptionMessage); + throw new PipeException(exceptionMessage); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java index 88f40bdc114..cdefb0501b2 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java @@ -36,7 +36,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure; import org.apache.iotdb.confignode.procedure.state.pipe.plugin.CreatePipePluginState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.Binary; @@ -128,7 +128,7 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipeP pipePluginMeta.getPluginName(), pipePluginMeta.getJarName(), pipePluginMeta.getJarMD5()); - } catch (PipeManagementException e) { + } catch (PipeException e) { // The pipe plugin has already created, we should end the procedure LOGGER.warn( "Pipe plugin {} is already created, end the CreatePipePluginProcedure({})", @@ -162,7 +162,7 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipeP final ConsensusWriteResponse response = configNodeManager.getConsensusManager().write(createPluginPlan); if (!response.isSuccessful()) { - throw new PipeManagementException(response.getErrorMessage()); + throw new PipeException(response.getErrorMessage()); } setNextState(CreatePipePluginState.CREATE_ON_DATA_NODES); @@ -181,7 +181,7 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipeP return Flow.HAS_MORE_STATE; } - throw new PipeManagementException( + throw new PipeException( String.format( "Failed to create pipe plugin instance [%s] on data nodes", pipePluginMeta.getPluginName())); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java index d44df9ac580..5b469725038 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java @@ -31,7 +31,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure; import org.apache.iotdb.confignode.procedure.state.pipe.plugin.DropPipePluginState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -109,7 +109,7 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi try { pipePluginCoordinator.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName); - } catch (PipeManagementException e) { + } catch (PipeException e) { // if the pipe plugin is a built-in plugin, we should not drop it LOGGER.warn(e.getMessage()); setFailure(new ProcedureException(e.getMessage())); @@ -133,7 +133,7 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi return Flow.HAS_MORE_STATE; } - throw new PipeManagementException( + throw new PipeException( String.format("Failed to drop pipe plugin %s on data nodes", pluginName)); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java index 3cbcebe6f6b..dc56840f114 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java @@ -27,7 +27,7 @@ import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.impl.pipe.task.AbstractOperatePipeProcedureV2; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -95,7 +95,7 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur final ConsensusWriteResponse response = env.getConfigManager().getConsensusManager().write(pipeHandleLeaderChangePlan); if (!response.isSuccessful()) { - throw new PipeManagementException(response.getErrorMessage()); + throw new PipeException(response.getErrorMessage()); } } @@ -137,7 +137,7 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur final ConsensusWriteResponse response = env.getConfigManager().getConsensusManager().write(pipeHandleLeaderChangePlan); if (!response.isSuccessful()) { - throw new PipeManagementException(response.getErrorMessage()); + throw new PipeException(response.getErrorMessage()); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java index 39948e9fbb9..2911b95e839 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java @@ -34,7 +34,7 @@ import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.impl.pipe.task.AbstractOperatePipeProcedureV2; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -233,7 +233,7 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV .getConsensusManager() .write(new PipeHandleMetaChangePlan(pipeMetaList)); if (!response.isSuccessful()) { - throw new PipeManagementException(response.getErrorMessage()); + throw new PipeException(response.getErrorMessage()); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java index 133a74854be..023585bb85d 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java @@ -18,7 +18,6 @@ */ package org.apache.iotdb.confignode.procedure.impl.pipe.task; -import org.apache.iotdb.commons.exception.sync.PipeException; import org.apache.iotdb.commons.pipe.task.meta.PipeMeta; import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; @@ -27,7 +26,7 @@ import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedExcepti import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException; import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure; import org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -196,7 +195,7 @@ public abstract class AbstractOperatePipeProcedureV2 if (RpcUtils.squashResponseStatusList(env.pushPipeMetaToDataNodes(pipeMetaBinaryList)).getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - throw new PipeManagementException("Failed to push pipe meta list to data nodes"); + throw new PipeException("Failed to push pipe meta list to data nodes"); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java index b7ab5ef62d5..aefab3c03f5 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java @@ -33,7 +33,6 @@ import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.pipe.api.exception.PipeException; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; @@ -70,8 +69,7 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { } @Override - protected void executeFromValidateTask(ConfigNodeProcedureEnv env) - throws PipeManagementException { + protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws PipeException { LOGGER.info( "CreatePipeProcedureV2: executeFromValidateTask({})", createPipeRequest.getPipeName()); @@ -84,8 +82,7 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { } @Override - protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) - throws PipeManagementException { + protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeException { LOGGER.info( "CreatePipeProcedureV2: executeFromCalculateInfoForTask({})", createPipeRequest.getPipeName()); @@ -111,7 +108,7 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { @Override protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) - throws PipeManagementException { + throws PipeException { LOGGER.info( "CreatePipeProcedureV2: executeFromWriteConfigNodeConsensus({})", createPipeRequest.getPipeName()); @@ -121,13 +118,13 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { .getConsensusManager() .write(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta)); if (!response.isSuccessful()) { - throw new PipeManagementException(response.getErrorMessage()); + throw new PipeException(response.getErrorMessage()); } } @Override protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) - throws PipeManagementException, IOException { + throws PipeException, IOException { LOGGER.info( "CreatePipeProcedureV2: executeFromOperateOnDataNodes({})", createPipeRequest.getPipeName()); @@ -161,7 +158,7 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { .getConsensusManager() .write(new DropPipePlanV2(createPipeRequest.getPipeName())); if (!response.isSuccessful()) { - throw new PipeManagementException(response.getErrorMessage()); + throw new PipeException(response.getErrorMessage()); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java index 5a715ae3534..3edd2ec5d57 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java @@ -24,7 +24,6 @@ import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.pipe.api.exception.PipeException; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; @@ -55,8 +54,7 @@ public class DropPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { } @Override - protected void executeFromValidateTask(ConfigNodeProcedureEnv env) - throws PipeManagementException { + protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws PipeException { LOGGER.info("DropPipeProcedureV2: executeFromValidateTask({})", pipeName); env.getConfigManager() @@ -67,27 +65,26 @@ public class DropPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { } @Override - protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) - throws PipeManagementException { + protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeException { LOGGER.info("DropPipeProcedureV2: executeFromCalculateInfoForTask({})", pipeName); // Do nothing } @Override protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) - throws PipeManagementException { + throws PipeException { LOGGER.info("DropPipeProcedureV2: executeFromWriteConfigNodeConsensus({})", pipeName); final ConsensusWriteResponse response = env.getConfigManager().getConsensusManager().write(new DropPipePlanV2(pipeName)); if (!response.isSuccessful()) { - throw new PipeManagementException(response.getErrorMessage()); + throw new PipeException(response.getErrorMessage()); } } @Override protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) - throws PipeManagementException, IOException { + throws PipeException, IOException { LOGGER.info("DropPipeProcedureV2: executeFromOperateOnDataNodes({})", pipeName); pushPipeMetaToDataNodes(env); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java index ecd4cdc57c0..7bdd3bbd6fa 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java @@ -25,7 +25,6 @@ import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.pipe.api.exception.PipeException; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; @@ -56,8 +55,7 @@ public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { } @Override - protected void executeFromValidateTask(ConfigNodeProcedureEnv env) - throws PipeManagementException { + protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws PipeException { LOGGER.info("StartPipeProcedureV2: executeFromValidateTask({})", pipeName); env.getConfigManager() @@ -68,15 +66,14 @@ public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { } @Override - protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) - throws PipeManagementException { + protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeException { LOGGER.info("StartPipeProcedureV2: executeFromCalculateInfoForTask({})", pipeName); // Do nothing } @Override protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) - throws PipeManagementException { + throws PipeException { LOGGER.info("StartPipeProcedureV2: executeFromWriteConfigNodeConsensus({})", pipeName); final ConsensusWriteResponse response = @@ -84,13 +81,13 @@ public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { .getConsensusManager() .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING)); if (!response.isSuccessful()) { - throw new PipeManagementException(response.getErrorMessage()); + throw new PipeException(response.getErrorMessage()); } } @Override protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) - throws PipeManagementException, IOException { + throws PipeException, IOException { LOGGER.info("StartPipeProcedureV2: executeFromOperateOnDataNodes({})", pipeName); pushPipeMetaToDataNodes(env); @@ -117,13 +114,13 @@ public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { .getConsensusManager() .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED)); if (!response.isSuccessful()) { - throw new PipeManagementException(response.getErrorMessage()); + throw new PipeException(response.getErrorMessage()); } } @Override protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) - throws PipeManagementException, IOException { + throws PipeException, IOException { LOGGER.info("StartPipeProcedureV2: rollbackFromOperateOnDataNodes({})", pipeName); pushPipeMetaToDataNodes(env); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java index fcd015fb940..2408bac3209 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java @@ -25,7 +25,6 @@ import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.pipe.api.exception.PipeException; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; @@ -56,8 +55,7 @@ public class StopPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { } @Override - protected void executeFromValidateTask(ConfigNodeProcedureEnv env) - throws PipeManagementException { + protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws PipeException { LOGGER.info("StopPipeProcedureV2: executeFromValidateTask({})", pipeName); env.getConfigManager() @@ -68,15 +66,14 @@ public class StopPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { } @Override - protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) - throws PipeManagementException { + protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeException { LOGGER.info("StopPipeProcedureV2: executeFromCalculateInfoForTask({})", pipeName); // Do nothing } @Override protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) - throws PipeManagementException { + throws PipeException { LOGGER.info("StopPipeProcedureV2: executeFromWriteConfigNodeConsensus({})", pipeName); final ConsensusWriteResponse response = @@ -84,13 +81,13 @@ public class StopPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { .getConsensusManager() .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED)); if (!response.isSuccessful()) { - throw new PipeManagementException(response.getErrorMessage()); + throw new PipeException(response.getErrorMessage()); } } @Override protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) - throws PipeManagementException, IOException { + throws PipeException, IOException { LOGGER.info("StopPipeProcedureV2: executeFromOperateOnDataNodes({})", pipeName); pushPipeMetaToDataNodes(env); @@ -117,13 +114,13 @@ public class StopPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { .getConsensusManager() .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING)); if (!response.isSuccessful()) { - throw new PipeManagementException(response.getErrorMessage()); + throw new PipeException(response.getErrorMessage()); } } @Override protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) - throws PipeManagementException, IOException { + throws PipeException, IOException { LOGGER.info("StopPipeProcedureV2: rollbackFromOperateOnDataNodes({})", pipeName); pushPipeMetaToDataNodes(env); diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/EventType.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/EventType.java deleted file mode 100644 index 20be75367d5..00000000000 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/EventType.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.pipe.api.event; - -public enum EventType { - TABLET_INSERTION, - TSFILE_INSERTION, - DELETION, -} diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java index 325f0683cb7..a3e44fd55ed 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java @@ -33,12 +33,4 @@ public interface TsFileInsertionEvent extends Event { * @return the list of TsFileInsertionEvent */ Iterable<TabletInsertionEvent> toTabletInsertionEvents(); - - /** - * The method is used to compact several TabletInsertionEvents into one TsFileInsertionEvent. The - * underlying data in TabletInsertionEvents will be stored into a TsFile. - * - * @return TsFileInsertionEvent - */ - TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable); } diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeManagementException.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeManagementException.java deleted file mode 100644 index 5ec3b2039b0..00000000000 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeManagementException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.pipe.api.exception; - -public class PipeManagementException extends PipeException { - - public PipeManagementException(String message, Throwable cause) { - super(message); - this.initCause(cause); - } - - public PipeManagementException(String message) { - super(message); - } -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java index 843df30c6ba..5d9d7da27e5 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java @@ -22,7 +22,7 @@ package org.apache.iotdb.commons.pipe.plugin.service; import org.apache.iotdb.commons.executable.ExecutableManager; import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; @@ -41,7 +41,7 @@ public class PipePluginExecutableManager extends ExecutableManager { super(temporaryLibRoot, libRoot); } - public boolean isLocalJarMatched(PipePluginMeta pipePluginMeta) throws PipeManagementException { + public boolean isLocalJarMatched(PipePluginMeta pipePluginMeta) throws PipeException { final String pluginName = pipePluginMeta.getPluginName(); final String md5FilePath = pluginName + ".txt"; @@ -69,7 +69,7 @@ public class PipePluginExecutableManager extends ExecutableManager { + "because error occurred when trying to compute md5 of jar file for function %s ", pluginName, pluginName); LOGGER.warn(errorMessage, e); - throw new PipeManagementException(errorMessage); + throw new PipeException(errorMessage); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java index b0fe93b4533..454cf5c1788 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java @@ -33,7 +33,7 @@ import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.PipePlugin; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +82,7 @@ public class PipePluginAgent { } } - private void checkIfRegistered(PipePluginMeta pipePluginMeta) throws PipeManagementException { + private void checkIfRegistered(PipePluginMeta pipePluginMeta) throws PipeException { final String pluginName = pipePluginMeta.getPluginName(); final PipePluginMeta information = pipePluginMetaKeeper.getPipePluginMeta(pluginName); if (information == null) { @@ -95,7 +95,7 @@ public class PipePluginAgent { "Failed to register PipePlugin %s, because the given PipePlugin name is the same as a built-in PipePlugin name.", pluginName); LOGGER.warn(errorMessage); - throw new PipeManagementException(errorMessage); + throw new PipeException(errorMessage); } if (PipePluginExecutableManager.getInstance() @@ -107,7 +107,7 @@ public class PipePluginAgent { + "because existed md5 of jar file for pipe plugin %s is different from the new jar file.", pluginName, pluginName); LOGGER.warn(errMsg); - throw new PipeManagementException(errMsg); + throw new PipeException(errMsg); } // if the pipe plugin is already registered and the jar file is the same, do nothing @@ -125,10 +125,9 @@ public class PipePluginAgent { * the PipePluginClassLoader and its instance will be created to ensure that it can be loaded. * * @param pipePluginMeta the meta information of the PipePlugin - * @throws PipeManagementException if the PipePlugin can not be loaded or its instance can not be - * created + * @throws PipeException if the PipePlugin can not be loaded or its instance can not be created */ - public void doRegister(PipePluginMeta pipePluginMeta) throws PipeManagementException { + public void doRegister(PipePluginMeta pipePluginMeta) throws PipeException { final String pluginName = pipePluginMeta.getPluginName(); final String className = pipePluginMeta.getClassName(); @@ -155,7 +154,7 @@ public class PipePluginAgent { "Failed to register PipePlugin %s(%s), because its instance can not be constructed successfully. Exception: %s", pluginName.toUpperCase(), className, e); LOGGER.warn(errorMessage, e); - throw new PipeManagementException(errorMessage); + throw new PipeException(errorMessage); } } @@ -175,7 +174,7 @@ public class PipePluginAgent { String errorMessage = String.format("Failed to deregister builtin PipePlugin %s.", pluginName); LOGGER.warn(errorMessage); - throw new PipeManagementException(errorMessage); + throw new PipeException(errorMessage); } // remove anyway @@ -211,7 +210,7 @@ public class PipePluginAgent { public PipeConnector reflectConnector(PipeParameters connectorParameters) { if (!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) { - throw new PipeManagementException( + throw new PipeException( "Failed to reflect PipeConnector instance because 'connector' is not specified in the parameters."); } return (PipeConnector) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java index 16f3716bc13..69786989982 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java @@ -35,7 +35,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.service.ResourcesInformationHolder; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; @@ -122,7 +122,7 @@ class PipeAgentLauncher { if (!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) { pipePluginMetaList.add(pipePluginMeta); } - } catch (PipeManagementException e) { + } catch (PipeException e) { pipePluginMetaList.add(pipePluginMeta); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java index 663ee0d8f43..bb247981bf6 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java @@ -36,7 +36,7 @@ import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.PipeParameters; import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration; import org.apache.iotdb.pipe.api.event.Event; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -228,7 +228,7 @@ public class IoTDBDataRegionCollector implements PipeCollector { private void rethrowExceptionIfAny(AtomicReference<Exception> exceptionHolder) { if (exceptionHolder.get() != null) { - throw new PipeManagementException("failed to start collectors.", exceptionHolder.get()); + throw new PipeException("failed to start collectors.", exceptionHolder.get()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index c4c21ac9114..785948b66a4 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -165,11 +165,6 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns } } - @Override - public TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable) { - throw new UnsupportedOperationException("Not implemented yet"); - } - /////////////////////////// Object /////////////////////////// @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java index 26c21d9545f..a6e338aae2e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java @@ -32,7 +32,6 @@ import org.apache.iotdb.pipe.api.customizer.PipeParameters; import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeException; -import org.apache.iotdb.pipe.api.exception.PipeManagementException; import java.util.HashMap; import java.util.Map; @@ -68,7 +67,7 @@ public class PipeConnectorSubtaskManager { // TODO: use runtimeConfiguration to configure PipeConnector pipeConnector.handshake(); } catch (Exception e) { - throw new PipeManagementException( + throw new PipeException( "Failed to construct PipeConnector, because of " + e.getMessage(), e); }
