This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-package-refactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 72fc68413b7f6a047a7dabd1460d42019a3ee1cf
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jun 7 03:05:29 2023 +0800

    Pipe: refactor package structure (pipe-api)
---
 .../persistence/pipe/PipePluginInfo.java           | 16 +++++------
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 23 ++++++++--------
 .../pipe/plugin/CreatePipePluginProcedure.java     |  8 +++---
 .../impl/pipe/plugin/DropPipePluginProcedure.java  |  6 ++--
 .../runtime/PipeHandleLeaderChangeProcedure.java   |  6 ++--
 .../runtime/PipeHandleMetaChangeProcedure.java     |  4 +--
 .../pipe/task/AbstractOperatePipeProcedureV2.java  |  5 ++--
 .../impl/pipe/task/CreatePipeProcedureV2.java      | 15 ++++------
 .../impl/pipe/task/DropPipeProcedureV2.java        | 13 ++++-----
 .../impl/pipe/task/StartPipeProcedureV2.java       | 17 +++++-------
 .../impl/pipe/task/StopPipeProcedureV2.java        | 17 +++++-------
 .../org/apache/iotdb/pipe/api/event/EventType.java | 26 ------------------
 .../event/dml/insertion/TsFileInsertionEvent.java  |  8 ------
 .../api/exception/PipeManagementException.java     | 32 ----------------------
 .../service/PipePluginExecutableManager.java       |  6 ++--
 .../db/pipe/agent/plugin/PipePluginAgent.java      | 19 ++++++-------
 .../db/pipe/agent/runtime/PipeAgentLauncher.java   |  4 +--
 .../pipe/collector/IoTDBDataRegionCollector.java   |  4 +--
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  5 ----
 .../task/subtask/PipeConnectorSubtaskManager.java  |  3 +-
 20 files changed, 75 insertions(+), 162 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index f839f6251dc..22f0cb97943 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -38,7 +38,7 @@ import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
 import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -93,14 +93,14 @@ public class PipePluginInfo implements SnapshotProcessor {
   public void validateBeforeCreatingPipePlugin(String pluginName, String 
jarName, String jarMD5) {
     // both build-in and user defined pipe plugin should be unique
     if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
-      throw new PipeManagementException(
+      throw new PipeException(
           String.format(
               "Failed to create PipePlugin [%s], the same name PipePlugin has 
been created",
               pluginName));
     }
 
     if (pipePluginMetaKeeper.jarNameExistsAndMatchesMd5(jarName, jarMD5)) {
-      throw new PipeManagementException(
+      throw new PipeException(
           String.format(
               "Failed to create PipePlugin [%s], the same name Jar [%s] but 
different MD5 [%s] has existed",
               pluginName, jarName, jarMD5));
@@ -110,7 +110,7 @@ public class PipePluginInfo implements SnapshotProcessor {
   public void validateBeforeDroppingPipePlugin(String pluginName) {
     if (pipePluginMetaKeeper.containsPipePlugin(pluginName)
         && pipePluginMetaKeeper.getPipePluginMeta(pluginName).isBuiltin()) {
-      throw new PipeManagementException(
+      throw new PipeException(
           String.format(
               "Failed to drop PipePlugin [%s], the PipePlugin is a built-in 
PipePlugin",
               pluginName));
@@ -133,7 +133,7 @@ public class PipePluginInfo implements SnapshotProcessor {
               "Failed to create pipe, the pipe collector plugin %s does not 
exist",
               collectorPluginName);
       LOGGER.warn(exceptionMessage);
-      throw new PipeManagementException(exceptionMessage);
+      throw new PipeException(exceptionMessage);
     }
 
     final PipeParameters processorParameters =
@@ -147,7 +147,7 @@ public class PipePluginInfo implements SnapshotProcessor {
               "Failed to create pipe, the pipe processor plugin %s does not 
exist",
               processorPluginName);
       LOGGER.warn(exceptionMessage);
-      throw new PipeManagementException(exceptionMessage);
+      throw new PipeException(exceptionMessage);
     }
 
     final PipeParameters connectorParameters =
@@ -156,7 +156,7 @@ public class PipePluginInfo implements SnapshotProcessor {
       final String exceptionMessage =
           "Failed to create pipe, the pipe connector plugin is not specified";
       LOGGER.warn(exceptionMessage);
-      throw new PipeManagementException(exceptionMessage);
+      throw new PipeException(exceptionMessage);
     }
     final String connectorPluginName =
         connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY);
@@ -166,7 +166,7 @@ public class PipePluginInfo implements SnapshotProcessor {
               "Failed to create pipe, the pipe connector plugin %s does not 
exist",
               connectorPluginName);
       LOGGER.warn(exceptionMessage);
-      throw new PipeManagementException(exceptionMessage);
+      throw new PipeException(exceptionMessage);
     }
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 8aaa0948f80..d2580f87a1d 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -34,7 +34,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStat
 import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -74,8 +74,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
 
   /////////////////////////////// Validator ///////////////////////////////
 
-  public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest)
-      throws PipeManagementException {
+  public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) throws 
PipeException {
     if (!isPipeExisted(createPipeRequest.getPipeName())) {
       return;
     }
@@ -85,15 +84,15 @@ public class PipeTaskInfo implements SnapshotProcessor {
             "Failed to create pipe %s, the pipe with the same name has been 
created",
             createPipeRequest.getPipeName());
     LOGGER.info(exceptionMessage);
-    throw new PipeManagementException(exceptionMessage);
+    throw new PipeException(exceptionMessage);
   }
 
-  public void checkBeforeStartPipe(String pipeName) throws 
PipeManagementException {
+  public void checkBeforeStartPipe(String pipeName) throws PipeException {
     if (!isPipeExisted(pipeName)) {
       final String exceptionMessage =
           String.format("Failed to start pipe %s, the pipe does not exist", 
pipeName);
       LOGGER.info(exceptionMessage);
-      throw new PipeManagementException(exceptionMessage);
+      throw new PipeException(exceptionMessage);
     }
 
     final PipeStatus pipeStatus = getPipeStatus(pipeName);
@@ -101,22 +100,22 @@ public class PipeTaskInfo implements SnapshotProcessor {
       final String exceptionMessage =
           String.format("Failed to start pipe %s, the pipe is already 
running", pipeName);
       LOGGER.info(exceptionMessage);
-      throw new PipeManagementException(exceptionMessage);
+      throw new PipeException(exceptionMessage);
     }
     if (pipeStatus == PipeStatus.DROPPED) {
       final String exceptionMessage =
           String.format("Failed to start pipe %s, the pipe is already 
dropped", pipeName);
       LOGGER.info(exceptionMessage);
-      throw new PipeManagementException(exceptionMessage);
+      throw new PipeException(exceptionMessage);
     }
   }
 
-  public void checkBeforeStopPipe(String pipeName) throws 
PipeManagementException {
+  public void checkBeforeStopPipe(String pipeName) throws PipeException {
     if (!isPipeExisted(pipeName)) {
       final String exceptionMessage =
           String.format("Failed to stop pipe %s, the pipe does not exist", 
pipeName);
       LOGGER.info(exceptionMessage);
-      throw new PipeManagementException(exceptionMessage);
+      throw new PipeException(exceptionMessage);
     }
 
     final PipeStatus pipeStatus = getPipeStatus(pipeName);
@@ -124,13 +123,13 @@ public class PipeTaskInfo implements SnapshotProcessor {
       final String exceptionMessage =
           String.format("Failed to stop pipe %s, the pipe is already stop", 
pipeName);
       LOGGER.info(exceptionMessage);
-      throw new PipeManagementException(exceptionMessage);
+      throw new PipeException(exceptionMessage);
     }
     if (pipeStatus == PipeStatus.DROPPED) {
       final String exceptionMessage =
           String.format("Failed to stop pipe %s, the pipe is already dropped", 
pipeName);
       LOGGER.info(exceptionMessage);
-      throw new PipeManagementException(exceptionMessage);
+      throw new PipeException(exceptionMessage);
     }
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index 88f40bdc114..cdefb0501b2 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -36,7 +36,7 @@ import 
org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
 import 
org.apache.iotdb.confignode.procedure.state.pipe.plugin.CreatePipePluginState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -128,7 +128,7 @@ public class CreatePipePluginProcedure extends 
AbstractNodeProcedure<CreatePipeP
               pipePluginMeta.getPluginName(),
               pipePluginMeta.getJarName(),
               pipePluginMeta.getJarMD5());
-    } catch (PipeManagementException e) {
+    } catch (PipeException e) {
       // The pipe plugin has already created, we should end the procedure
       LOGGER.warn(
           "Pipe plugin {} is already created, end the 
CreatePipePluginProcedure({})",
@@ -162,7 +162,7 @@ public class CreatePipePluginProcedure extends 
AbstractNodeProcedure<CreatePipeP
     final ConsensusWriteResponse response =
         configNodeManager.getConsensusManager().write(createPluginPlan);
     if (!response.isSuccessful()) {
-      throw new PipeManagementException(response.getErrorMessage());
+      throw new PipeException(response.getErrorMessage());
     }
 
     setNextState(CreatePipePluginState.CREATE_ON_DATA_NODES);
@@ -181,7 +181,7 @@ public class CreatePipePluginProcedure extends 
AbstractNodeProcedure<CreatePipeP
       return Flow.HAS_MORE_STATE;
     }
 
-    throw new PipeManagementException(
+    throw new PipeException(
         String.format(
             "Failed to create pipe plugin instance [%s] on data nodes",
             pipePluginMeta.getPluginName()));
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index d44df9ac580..5b469725038 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -31,7 +31,7 @@ import 
org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure
 import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
 import 
org.apache.iotdb.confignode.procedure.state.pipe.plugin.DropPipePluginState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -109,7 +109,7 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
 
     try {
       
pipePluginCoordinator.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName);
-    } catch (PipeManagementException e) {
+    } catch (PipeException e) {
       // if the pipe plugin is a built-in plugin, we should not drop it
       LOGGER.warn(e.getMessage());
       setFailure(new ProcedureException(e.getMessage()));
@@ -133,7 +133,7 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
       return Flow.HAS_MORE_STATE;
     }
 
-    throw new PipeManagementException(
+    throw new PipeException(
         String.format("Failed to drop pipe plugin %s on data nodes", 
pluginName));
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 3cbcebe6f6b..dc56840f114 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -27,7 +27,7 @@ import 
org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.task.AbstractOperatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -95,7 +95,7 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
     final ConsensusWriteResponse response =
         
env.getConfigManager().getConsensusManager().write(pipeHandleLeaderChangePlan);
     if (!response.isSuccessful()) {
-      throw new PipeManagementException(response.getErrorMessage());
+      throw new PipeException(response.getErrorMessage());
     }
   }
 
@@ -137,7 +137,7 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
     final ConsensusWriteResponse response =
         
env.getConfigManager().getConsensusManager().write(pipeHandleLeaderChangePlan);
     if (!response.isSuccessful()) {
-      throw new PipeManagementException(response.getErrorMessage());
+      throw new PipeException(response.getErrorMessage());
     }
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 39948e9fbb9..2911b95e839 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -34,7 +34,7 @@ import 
org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.task.AbstractOperatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -233,7 +233,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
             .getConsensusManager()
             .write(new PipeHandleMetaChangePlan(pipeMetaList));
     if (!response.isSuccessful()) {
-      throw new PipeManagementException(response.getErrorMessage());
+      throw new PipeException(response.getErrorMessage());
     }
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
index 133a74854be..023585bb85d 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
-import org.apache.iotdb.commons.exception.sync.PipeException;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -27,7 +26,7 @@ import 
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedExcepti
 import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
 import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
 import 
org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -196,7 +195,7 @@ public abstract class AbstractOperatePipeProcedureV2
 
     if 
(RpcUtils.squashResponseStatusList(env.pushPipeMetaToDataNodes(pipeMetaBinaryList)).getCode()
         != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeManagementException("Failed to push pipe meta list to data 
nodes");
+      throw new PipeException("Failed to push pipe meta list to data nodes");
     }
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index b7ab5ef62d5..aefab3c03f5 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -33,7 +33,6 @@ import 
org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
@@ -70,8 +69,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromValidateTask(ConfigNodeProcedureEnv env)
-      throws PipeManagementException {
+  protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws 
PipeException {
     LOGGER.info(
         "CreatePipeProcedureV2: executeFromValidateTask({})", 
createPipeRequest.getPipeName());
 
@@ -84,8 +82,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env)
-      throws PipeManagementException {
+  protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) 
throws PipeException {
     LOGGER.info(
         "CreatePipeProcedureV2: executeFromCalculateInfoForTask({})",
         createPipeRequest.getPipeName());
@@ -111,7 +108,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
 
   @Override
   protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv 
env)
-      throws PipeManagementException {
+      throws PipeException {
     LOGGER.info(
         "CreatePipeProcedureV2: executeFromWriteConfigNodeConsensus({})",
         createPipeRequest.getPipeName());
@@ -121,13 +118,13 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
             .getConsensusManager()
             .write(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta));
     if (!response.isSuccessful()) {
-      throw new PipeManagementException(response.getErrorMessage());
+      throw new PipeException(response.getErrorMessage());
     }
   }
 
   @Override
   protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
-      throws PipeManagementException, IOException {
+      throws PipeException, IOException {
     LOGGER.info(
         "CreatePipeProcedureV2: executeFromOperateOnDataNodes({})",
         createPipeRequest.getPipeName());
@@ -161,7 +158,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
             .getConsensusManager()
             .write(new DropPipePlanV2(createPipeRequest.getPipeName()));
     if (!response.isSuccessful()) {
-      throw new PipeManagementException(response.getErrorMessage());
+      throw new PipeException(response.getErrorMessage());
     }
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index 5a715ae3534..3edd2ec5d57 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -24,7 +24,6 @@ import 
org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
@@ -55,8 +54,7 @@ public class DropPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromValidateTask(ConfigNodeProcedureEnv env)
-      throws PipeManagementException {
+  protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws 
PipeException {
     LOGGER.info("DropPipeProcedureV2: executeFromValidateTask({})", pipeName);
 
     env.getConfigManager()
@@ -67,27 +65,26 @@ public class DropPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env)
-      throws PipeManagementException {
+  protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) 
throws PipeException {
     LOGGER.info("DropPipeProcedureV2: executeFromCalculateInfoForTask({})", 
pipeName);
     // Do nothing
   }
 
   @Override
   protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv 
env)
-      throws PipeManagementException {
+      throws PipeException {
     LOGGER.info("DropPipeProcedureV2: 
executeFromWriteConfigNodeConsensus({})", pipeName);
 
     final ConsensusWriteResponse response =
         env.getConfigManager().getConsensusManager().write(new 
DropPipePlanV2(pipeName));
     if (!response.isSuccessful()) {
-      throw new PipeManagementException(response.getErrorMessage());
+      throw new PipeException(response.getErrorMessage());
     }
   }
 
   @Override
   protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
-      throws PipeManagementException, IOException {
+      throws PipeException, IOException {
     LOGGER.info("DropPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
     pushPipeMetaToDataNodes(env);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index ecd4cdc57c0..7bdd3bbd6fa 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -25,7 +25,6 @@ import 
org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
@@ -56,8 +55,7 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromValidateTask(ConfigNodeProcedureEnv env)
-      throws PipeManagementException {
+  protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws 
PipeException {
     LOGGER.info("StartPipeProcedureV2: executeFromValidateTask({})", pipeName);
 
     env.getConfigManager()
@@ -68,15 +66,14 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env)
-      throws PipeManagementException {
+  protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) 
throws PipeException {
     LOGGER.info("StartPipeProcedureV2: executeFromCalculateInfoForTask({})", 
pipeName);
     // Do nothing
   }
 
   @Override
   protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv 
env)
-      throws PipeManagementException {
+      throws PipeException {
     LOGGER.info("StartPipeProcedureV2: 
executeFromWriteConfigNodeConsensus({})", pipeName);
 
     final ConsensusWriteResponse response =
@@ -84,13 +81,13 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
             .getConsensusManager()
             .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING));
     if (!response.isSuccessful()) {
-      throw new PipeManagementException(response.getErrorMessage());
+      throw new PipeException(response.getErrorMessage());
     }
   }
 
   @Override
   protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
-      throws PipeManagementException, IOException {
+      throws PipeException, IOException {
     LOGGER.info("StartPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
     pushPipeMetaToDataNodes(env);
@@ -117,13 +114,13 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
             .getConsensusManager()
             .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED));
     if (!response.isSuccessful()) {
-      throw new PipeManagementException(response.getErrorMessage());
+      throw new PipeException(response.getErrorMessage());
     }
   }
 
   @Override
   protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
-      throws PipeManagementException, IOException {
+      throws PipeException, IOException {
     LOGGER.info("StartPipeProcedureV2: rollbackFromOperateOnDataNodes({})", 
pipeName);
 
     pushPipeMetaToDataNodes(env);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index fcd015fb940..2408bac3209 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -25,7 +25,6 @@ import 
org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
@@ -56,8 +55,7 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromValidateTask(ConfigNodeProcedureEnv env)
-      throws PipeManagementException {
+  protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws 
PipeException {
     LOGGER.info("StopPipeProcedureV2: executeFromValidateTask({})", pipeName);
 
     env.getConfigManager()
@@ -68,15 +66,14 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env)
-      throws PipeManagementException {
+  protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) 
throws PipeException {
     LOGGER.info("StopPipeProcedureV2: executeFromCalculateInfoForTask({})", 
pipeName);
     // Do nothing
   }
 
   @Override
   protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv 
env)
-      throws PipeManagementException {
+      throws PipeException {
     LOGGER.info("StopPipeProcedureV2: 
executeFromWriteConfigNodeConsensus({})", pipeName);
 
     final ConsensusWriteResponse response =
@@ -84,13 +81,13 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
             .getConsensusManager()
             .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED));
     if (!response.isSuccessful()) {
-      throw new PipeManagementException(response.getErrorMessage());
+      throw new PipeException(response.getErrorMessage());
     }
   }
 
   @Override
   protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
-      throws PipeManagementException, IOException {
+      throws PipeException, IOException {
     LOGGER.info("StopPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
     pushPipeMetaToDataNodes(env);
@@ -117,13 +114,13 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
             .getConsensusManager()
             .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING));
     if (!response.isSuccessful()) {
-      throw new PipeManagementException(response.getErrorMessage());
+      throw new PipeException(response.getErrorMessage());
     }
   }
 
   @Override
   protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
-      throws PipeManagementException, IOException {
+      throws PipeException, IOException {
     LOGGER.info("StopPipeProcedureV2: rollbackFromOperateOnDataNodes({})", 
pipeName);
 
     pushPipeMetaToDataNodes(env);
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/EventType.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/EventType.java
deleted file mode 100644
index 20be75367d5..00000000000
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/EventType.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.event;
-
-public enum EventType {
-  TABLET_INSERTION,
-  TSFILE_INSERTION,
-  DELETION,
-}
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
index 325f0683cb7..a3e44fd55ed 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
@@ -33,12 +33,4 @@ public interface TsFileInsertionEvent extends Event {
    * @return the list of TsFileInsertionEvent
    */
   Iterable<TabletInsertionEvent> toTabletInsertionEvents();
-
-  /**
-   * The method is used to compact several TabletInsertionEvents into one 
TsFileInsertionEvent. The
-   * underlying data in TabletInsertionEvents will be stored into a TsFile.
-   *
-   * @return TsFileInsertionEvent
-   */
-  TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> 
iterable);
 }
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeManagementException.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeManagementException.java
deleted file mode 100644
index 5ec3b2039b0..00000000000
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeManagementException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.exception;
-
-public class PipeManagementException extends PipeException {
-
-  public PipeManagementException(String message, Throwable cause) {
-    super(message);
-    this.initCause(cause);
-  }
-
-  public PipeManagementException(String message) {
-    super(message);
-  }
-}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
index 843df30c6ba..5d9d7da27e5 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.commons.pipe.plugin.service;
 import org.apache.iotdb.commons.executable.ExecutableManager;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.apache.commons.codec.digest.DigestUtils;
 import org.slf4j.Logger;
@@ -41,7 +41,7 @@ public class PipePluginExecutableManager extends 
ExecutableManager {
     super(temporaryLibRoot, libRoot);
   }
 
-  public boolean isLocalJarMatched(PipePluginMeta pipePluginMeta) throws 
PipeManagementException {
+  public boolean isLocalJarMatched(PipePluginMeta pipePluginMeta) throws 
PipeException {
     final String pluginName = pipePluginMeta.getPluginName();
     final String md5FilePath = pluginName + ".txt";
 
@@ -69,7 +69,7 @@ public class PipePluginExecutableManager extends 
ExecutableManager {
                   + "because error occurred when trying to compute md5 of jar 
file for function %s ",
               pluginName, pluginName);
       LOGGER.warn(errorMessage, e);
-      throw new PipeManagementException(errorMessage);
+      throw new PipeException(errorMessage);
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index b0fe93b4533..454cf5c1788 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.PipePlugin;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,7 +82,7 @@ public class PipePluginAgent {
     }
   }
 
-  private void checkIfRegistered(PipePluginMeta pipePluginMeta) throws 
PipeManagementException {
+  private void checkIfRegistered(PipePluginMeta pipePluginMeta) throws 
PipeException {
     final String pluginName = pipePluginMeta.getPluginName();
     final PipePluginMeta information = 
pipePluginMetaKeeper.getPipePluginMeta(pluginName);
     if (information == null) {
@@ -95,7 +95,7 @@ public class PipePluginAgent {
               "Failed to register PipePlugin %s, because the given PipePlugin 
name is the same as a built-in PipePlugin name.",
               pluginName);
       LOGGER.warn(errorMessage);
-      throw new PipeManagementException(errorMessage);
+      throw new PipeException(errorMessage);
     }
 
     if (PipePluginExecutableManager.getInstance()
@@ -107,7 +107,7 @@ public class PipePluginAgent {
                   + "because existed md5 of jar file for pipe plugin %s is 
different from the new jar file.",
               pluginName, pluginName);
       LOGGER.warn(errMsg);
-      throw new PipeManagementException(errMsg);
+      throw new PipeException(errMsg);
     }
 
     // if the pipe plugin is already registered and the jar file is the same, 
do nothing
@@ -125,10 +125,9 @@ public class PipePluginAgent {
    * the PipePluginClassLoader and its instance will be created to ensure that 
it can be loaded.
    *
    * @param pipePluginMeta the meta information of the PipePlugin
-   * @throws PipeManagementException if the PipePlugin can not be loaded or 
its instance can not be
-   *     created
+   * @throws PipeException if the PipePlugin can not be loaded or its instance 
can not be created
    */
-  public void doRegister(PipePluginMeta pipePluginMeta) throws 
PipeManagementException {
+  public void doRegister(PipePluginMeta pipePluginMeta) throws PipeException {
     final String pluginName = pipePluginMeta.getPluginName();
     final String className = pipePluginMeta.getClassName();
 
@@ -155,7 +154,7 @@ public class PipePluginAgent {
               "Failed to register PipePlugin %s(%s), because its instance can 
not be constructed successfully. Exception: %s",
               pluginName.toUpperCase(), className, e);
       LOGGER.warn(errorMessage, e);
-      throw new PipeManagementException(errorMessage);
+      throw new PipeException(errorMessage);
     }
   }
 
@@ -175,7 +174,7 @@ public class PipePluginAgent {
         String errorMessage =
             String.format("Failed to deregister builtin PipePlugin %s.", 
pluginName);
         LOGGER.warn(errorMessage);
-        throw new PipeManagementException(errorMessage);
+        throw new PipeException(errorMessage);
       }
 
       // remove anyway
@@ -211,7 +210,7 @@ public class PipePluginAgent {
 
   public PipeConnector reflectConnector(PipeParameters connectorParameters) {
     if 
(!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) {
-      throw new PipeManagementException(
+      throw new PipeException(
           "Failed to reflect PipeConnector instance because 'connector' is not 
specified in the parameters.");
     }
     return (PipeConnector)
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 16f3716bc13..69786989982 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -35,7 +35,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.service.ResourcesInformationHolder;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
@@ -122,7 +122,7 @@ class PipeAgentLauncher {
           if 
(!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
             pipePluginMetaList.add(pipePluginMeta);
           }
-        } catch (PipeManagementException e) {
+        } catch (PipeException e) {
           pipePluginMetaList.add(pipePluginMeta);
         }
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
index 663ee0d8f43..bb247981bf6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
@@ -36,7 +36,7 @@ import 
org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import 
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -228,7 +228,7 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
 
   private void rethrowExceptionIfAny(AtomicReference<Exception> 
exceptionHolder) {
     if (exceptionHolder.get() != null) {
-      throw new PipeManagementException("failed to start collectors.", 
exceptionHolder.get());
+      throw new PipeException("failed to start collectors.", 
exceptionHolder.get());
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index c4c21ac9114..785948b66a4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -165,11 +165,6 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
     }
   }
 
-  @Override
-  public TsFileInsertionEvent 
toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable) {
-    throw new UnsupportedOperationException("Not implemented yet");
-  }
-
   /////////////////////////// Object ///////////////////////////
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
index 26c21d9545f..a6e338aae2e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import 
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -68,7 +67,7 @@ public class PipeConnectorSubtaskManager {
         // TODO: use runtimeConfiguration to configure PipeConnector
         pipeConnector.handshake();
       } catch (Exception e) {
-        throw new PipeManagementException(
+        throw new PipeException(
             "Failed to construct PipeConnector, because of " + e.getMessage(), 
e);
       }
 


Reply via email to