This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-package-structure
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e39a03e4389a0470bde06ce2e38d6ac858e2864f
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Apr 1 12:45:20 2024 +0800

    Pipe: refactor package structure
---
 .../consensus/statemachine/ConfigRegionStateMachine.java   |  2 +-
 .../org/apache/iotdb/confignode/manager/ConfigManager.java |  2 +-
 .../pipe/{transfer => }/agent/PipeConfigNodeAgent.java     | 10 +++++-----
 .../agent/plugin/PipeConfigNodePluginAgent.java            |  2 +-
 .../agent/plugin/PipeConfigRegionConnectorConstructor.java |  6 +++---
 .../agent/plugin/PipeConfigRegionExtractorConstructor.java |  4 ++--
 .../agent/plugin/PipeConfigRegionProcessorConstructor.java |  2 +-
 .../agent/receiver/IoTDBConfigNodeReceiverAgent.java       |  3 ++-
 .../agent/runtime/PipeConfigNodeRuntimeAgent.java          |  6 +++---
 .../agent/runtime/PipeConfigRegionListener.java            |  6 +++---
 .../{transfer => }/agent/task/PipeConfigNodeTaskAgent.java | 12 ++++++------
 .../connector/client/IoTDBConfigNodeSyncClientManager.java |  6 +++---
 .../payload}/PipeTransferConfigNodeHandshakeV1Req.java     |  2 +-
 .../payload}/PipeTransferConfigNodeHandshakeV2Req.java     |  2 +-
 .../payload}/PipeTransferConfigPlanReq.java                |  2 +-
 .../payload}/PipeTransferConfigSnapshotPieceReq.java       |  2 +-
 .../payload}/PipeTransferConfigSnapshotSealReq.java        |  2 +-
 .../protocol}/IoTDBConfigRegionAirGapConnector.java        | 12 ++++++------
 .../protocol}/IoTDBConfigRegionConnector.java              | 10 +++++-----
 .../pipe/coordinator/runtime/PipeHeartbeatScheduler.java   |  2 +-
 .../{transfer => }/execution/PipeConfigNodeSubtask.java    |  6 +++---
 .../execution/PipeConfigNodeSubtaskExecutor.java           |  2 +-
 .../extractor/ConfigRegionListeningFilter.java             |  2 +-
 .../extractor/ConfigRegionListeningQueue.java              |  2 +-
 .../extractor/IoTDBConfigRegionExtractor.java              |  4 ++--
 .../protocol}/IoTDBConfigNodeReceiver.java                 | 14 ++++++++------
 .../visitor}/PipeConfigPhysicalPlanExceptionVisitor.java   |  2 +-
 .../visitor}/PipeConfigPhysicalPlanTSStatusVisitor.java    |  2 +-
 .../pipe/{transfer => }/task/PipeConfigNodeTask.java       |  2 +-
 .../{transfer => }/task/PipeConfigNodeTaskBuilder.java     |  4 ++--
 .../pipe/{transfer => }/task/PipeConfigNodeTaskStage.java  |  6 +++---
 .../persistence/executor/ConfigPlanExecutor.java           |  2 +-
 .../apache/iotdb/confignode/persistence/pipe/PipeInfo.java |  2 +-
 .../org/apache/iotdb/confignode/service/ConfigNode.java    |  2 +-
 .../pipe/connector/IoTDBConfigRegionConnectorTest.java     |  2 +-
 .../{ => connector}/PipeConfigNodeThriftRequestTest.java   | 10 +++++-----
 .../pipe/execution/PipeConfigNodeSubtaskExecutorTest.java  |  2 --
 .../pipe/extractor/IoTDBConfigRegionExtractorTest.java     |  1 -
 .../persistence/ConfigRegionListeningQueueTest.java        |  2 +-
 .../main/java/org/apache/iotdb/db/audit/AuditLogger.java   |  2 +-
 .../db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java  |  6 +++---
 .../db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java    |  2 +-
 .../db/pipe/connector/payload/legacy/DeletionPipeData.java |  4 ++--
 .../iotdb/db/pipe/connector/payload/legacy/PipeData.java   |  2 +-
 .../db/pipe/connector/payload/legacy/TsFilePipeData.java   |  4 ++--
 .../{executor => }/PipeConnectorSubtaskExecutor.java       |  2 +-
 .../{executor => }/PipeProcessorSubtaskExecutor.java       |  2 +-
 .../{executor => }/PipeSubtaskExecutorManager.java         |  2 +-
 .../realtime/assigner/PipeDataRegionAssigner.java          |  4 ++--
 .../pattern/{matcher => }/CachedSchemaPatternMatcher.java  |  2 +-
 .../pipe/pattern/{matcher => }/PipeDataRegionMatcher.java  |  2 +-
 .../SimpleConsensusProgressIndexAssigner.java              |  2 +-
 .../{ => protocol}/airgap/IoTDBAirGapReceiver.java         |  4 ++--
 .../{ => protocol}/airgap/IoTDBAirGapReceiverAgent.java    |  2 +-
 .../legacy/IoTDBLegacyPipeReceiverAgent.java               |  2 +-
 .../{ => protocol}/legacy/loader/DeletionLoader.java       |  2 +-
 .../receiver/{ => protocol}/legacy/loader/ILoader.java     |  2 +-
 .../{ => protocol}/legacy/loader/TsFileLoader.java         |  2 +-
 .../{ => protocol}/thrift/IoTDBDataNodeReceiver.java       |  8 ++++----
 .../{ => protocol}/thrift/IoTDBDataNodeReceiverAgent.java  |  2 +-
 .../receiver/{ => visitor}/PipePlanToStatementVisitor.java |  2 +-
 .../{ => visitor}/PipeStatementExceptionVisitor.java       |  2 +-
 .../{ => visitor}/PipeStatementTSStatusVisitor.java        |  2 +-
 .../db/pipe/task/builder/PipeDataNodeTaskBuilder.java      |  6 +++---
 .../iotdb/db/pipe/task/connection/PipeEventCollector.java  |  2 +-
 .../iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java   |  2 +-
 .../iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java   |  2 +-
 .../subtask/connector/PipeConnectorSubtaskLifeCycle.java   |  2 +-
 .../subtask/connector/PipeConnectorSubtaskManager.java     |  4 ++--
 .../execution/executor/SubscriptionSubtaskExecutor.java    |  2 +-
 .../task/stage/SubscriptionTaskConnectorStage.java         |  2 +-
 .../subtask/SubscriptionConnectorSubtaskLifeCycle.java     |  2 +-
 .../task/subtask/SubscriptionConnectorSubtaskManager.java  |  4 ++--
 .../apache/iotdb/db/pipe/connector/PipeReceiverTest.java   |  2 +-
 .../pipe/execution/PipeConnectorSubtaskExecutorTest.java   |  1 -
 .../pipe/execution/PipeProcessorSubtaskExecutorTest.java   |  1 -
 .../db/pipe/pattern/CachedSchemaPatternMatcherTest.java    |  1 -
 .../org/apache/iotdb/commons/pipe/event/EnrichedEvent.java |  2 +-
 .../iotdb/commons/pipe/metric/PipeEventCommitMetrics.java  |  2 +-
 .../progress/{committer => }/PipeEventCommitManager.java   |  2 +-
 .../pipe/progress/{committer => }/PipeEventCommitter.java  |  2 +-
 81 files changed, 134 insertions(+), 137 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index a0c6be6df9e..aca6ade457a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -34,7 +34,7 @@ import 
org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import 
org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.agent.PipeConfigNodeAgent;
+import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
 import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
 import org.apache.iotdb.confignode.persistence.schema.ConfignodeSnapshotParser;
 import org.apache.iotdb.confignode.service.ConfigNode;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index a72e48a874b..17b969d928c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -90,8 +90,8 @@ import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.node.NodeMetrics;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
 import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.agent.PipeConfigNodeAgent;
 import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.schema.ClusterSchemaQuotaStatistics;
 import org.apache.iotdb.confignode.manager.subscription.SubscriptionManager;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/PipeConfigNodeAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeAgent.java
similarity index 86%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/PipeConfigNodeAgent.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeAgent.java
index 1ea1cd0fbec..834b4e0ce35 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/PipeConfigNodeAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeAgent.java
@@ -17,12 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.agent;
+package org.apache.iotdb.confignode.manager.pipe.agent;
 
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.agent.plugin.PipeConfigNodePluginAgent;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.agent.receiver.IoTDBConfigNodeReceiverAgent;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.agent.runtime.PipeConfigNodeRuntimeAgent;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.agent.task.PipeConfigNodeTaskAgent;
+import 
org.apache.iotdb.confignode.manager.pipe.agent.plugin.PipeConfigNodePluginAgent;
+import 
org.apache.iotdb.confignode.manager.pipe.agent.receiver.IoTDBConfigNodeReceiverAgent;
+import 
org.apache.iotdb.confignode.manager.pipe.agent.runtime.PipeConfigNodeRuntimeAgent;
+import 
org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeTaskAgent;
 import org.apache.iotdb.confignode.service.ConfigNode;
 
 /** {@link PipeConfigNodeAgent} is the entry point of the pipe module in 
{@link ConfigNode}. */
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/plugin/PipeConfigNodePluginAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
similarity index 96%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/plugin/PipeConfigNodePluginAgent.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
index d70a6825288..8e748c38b68 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/plugin/PipeConfigNodePluginAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.agent.plugin;
+package org.apache.iotdb.confignode.manager.pipe.agent.plugin;
 
 import org.apache.iotdb.commons.pipe.agent.plugin.PipeConnectorConstructor;
 import org.apache.iotdb.commons.pipe.agent.plugin.PipeExtractorConstructor;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/plugin/PipeConfigRegionConnectorConstructor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionConnectorConstructor.java
similarity index 91%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/plugin/PipeConfigRegionConnectorConstructor.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionConnectorConstructor.java
index 128f30bcbcc..3bcd1d6b749 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/plugin/PipeConfigRegionConnectorConstructor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionConnectorConstructor.java
@@ -17,13 +17,13 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.agent.plugin;
+package org.apache.iotdb.confignode.manager.pipe.agent.plugin;
 
 import org.apache.iotdb.commons.pipe.agent.plugin.PipeConnectorConstructor;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.donothing.DoNothingConnector;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.config.IoTDBConfigRegionAirGapConnector;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.config.IoTDBConfigRegionConnector;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.protocol.IoTDBConfigRegionAirGapConnector;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.protocol.IoTDBConfigRegionConnector;
 import org.apache.iotdb.pipe.api.PipeConnector;
 
 class PipeConfigRegionConnectorConstructor extends PipeConnectorConstructor {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/plugin/PipeConfigRegionExtractorConstructor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionExtractorConstructor.java
similarity index 92%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/plugin/PipeConfigRegionExtractorConstructor.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionExtractorConstructor.java
index 41a93323016..beae07642fd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/plugin/PipeConfigRegionExtractorConstructor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionExtractorConstructor.java
@@ -17,12 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.agent.plugin;
+package org.apache.iotdb.confignode.manager.pipe.agent.plugin;
 
 import org.apache.iotdb.commons.pipe.agent.plugin.PipeExtractorConstructor;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.extractor.donothing.DoNothingExtractor;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.extractor.IoTDBConfigRegionExtractor;
+import 
org.apache.iotdb.confignode.manager.pipe.extractor.IoTDBConfigRegionExtractor;
 import org.apache.iotdb.pipe.api.PipeExtractor;
 
 class PipeConfigRegionExtractorConstructor extends PipeExtractorConstructor {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/plugin/PipeConfigRegionProcessorConstructor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionProcessorConstructor.java
similarity index 95%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/plugin/PipeConfigRegionProcessorConstructor.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionProcessorConstructor.java
index 31bd1b9a669..1f9f84ffdef 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/plugin/PipeConfigRegionProcessorConstructor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionProcessorConstructor.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.agent.plugin;
+package org.apache.iotdb.confignode.manager.pipe.agent.plugin;
 
 import org.apache.iotdb.commons.pipe.agent.plugin.PipeProcessorConstructor;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/IoTDBConfigNodeReceiverAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
similarity index 90%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/IoTDBConfigNodeReceiverAgent.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
index 1e8fe433c4c..2b5413172a7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/IoTDBConfigNodeReceiverAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
@@ -17,11 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.agent.receiver;
+package org.apache.iotdb.confignode.manager.pipe.agent.receiver;
 
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import 
org.apache.iotdb.confignode.manager.pipe.receiver.protocol.IoTDBConfigNodeReceiver;
 
 import java.io.File;
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/runtime/PipeConfigNodeRuntimeAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
similarity index 95%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/runtime/PipeConfigNodeRuntimeAgent.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
index 6bb355aae9b..160c710ea4d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/runtime/PipeConfigNodeRuntimeAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.agent.runtime;
+package org.apache.iotdb.confignode.manager.pipe.agent.runtime;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
@@ -27,9 +27,9 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
+import 
org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningQueue;
 import 
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeCopiedFileDirStartupCleaner;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.agent.PipeConfigNodeAgent;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.extractor.ConfigRegionListeningQueue;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
 import org.slf4j.Logger;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/runtime/PipeConfigRegionListener.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListener.java
similarity index 90%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/runtime/PipeConfigRegionListener.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListener.java
index 785d27499e7..458ed1c57a8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/runtime/PipeConfigRegionListener.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListener.java
@@ -17,11 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.agent.runtime;
+package org.apache.iotdb.confignode.manager.pipe.agent.runtime;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.extractor.ConfigRegionListeningFilter;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.extractor.ConfigRegionListeningQueue;
+import 
org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
+import 
org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningQueue;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
 import java.io.IOException;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/task/PipeConfigNodeTaskAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
similarity index 94%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/task/PipeConfigNodeTaskAgent.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index 8429c5793ef..5f9614eceb2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/task/PipeConfigNodeTaskAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.agent.task;
+package org.apache.iotdb.confignode.manager.pipe.agent.task;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
@@ -28,11 +28,11 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.agent.PipeConfigNodeAgent;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.extractor.ConfigRegionListeningFilter;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.task.PipeConfigNodeTask;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.task.PipeConfigNodeTaskBuilder;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.task.PipeConfigNodeTaskStage;
+import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
+import 
org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
+import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTask;
+import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTaskBuilder;
+import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTaskStage;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/client/IoTDBConfigNodeSyncClientManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
similarity index 86%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/client/IoTDBConfigNodeSyncClientManager.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index 21fd2b2dafe..e92f4879d4c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/client/IoTDBConfigNodeSyncClientManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -17,14 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.connector.client;
+package org.apache.iotdb.confignode.manager.pipe.connector.client;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClientManager;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferHandshakeV2Req;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigNodeHandshakeV1Req;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigNodeHandshakeV2Req;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigNodeHandshakeV1Req;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigNodeHandshakeV2Req;
 import org.apache.iotdb.confignode.service.ConfigNode;
 
 import java.io.IOException;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/payload/request/PipeTransferConfigNodeHandshakeV1Req.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/PipeTransferConfigNodeHandshakeV1Req.java
similarity index 96%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/payload/request/PipeTransferConfigNodeHandshakeV1Req.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/PipeTransferConfigNodeHandshakeV1Req.java
index 18a8aa6c8e7..c161081782c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/payload/request/PipeTransferConfigNodeHandshakeV1Req.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/PipeTransferConfigNodeHandshakeV1Req.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request;
+package org.apache.iotdb.confignode.manager.pipe.connector.payload;
 
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferHandshakeV1Req;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/payload/request/PipeTransferConfigNodeHandshakeV2Req.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/PipeTransferConfigNodeHandshakeV2Req.java
similarity index 96%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/payload/request/PipeTransferConfigNodeHandshakeV2Req.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/PipeTransferConfigNodeHandshakeV2Req.java
index 849b60e016b..f19b94ffed2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/payload/request/PipeTransferConfigNodeHandshakeV2Req.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/PipeTransferConfigNodeHandshakeV2Req.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request;
+package org.apache.iotdb.confignode.manager.pipe.connector.payload;
 
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferHandshakeV2Req;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/payload/request/PipeTransferConfigPlanReq.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/PipeTransferConfigPlanReq.java
similarity index 97%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/payload/request/PipeTransferConfigPlanReq.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/PipeTransferConfigPlanReq.java
index 63db6619cd5..e51890567f4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/payload/request/PipeTransferConfigPlanReq.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/PipeTransferConfigPlanReq.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request;
+package org.apache.iotdb.confignode.manager.pipe.connector.payload;
 
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/payload/request/PipeTransferConfigSnapshotPieceReq.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/PipeTransferConfigSnapshotPieceReq.java
similarity index 96%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/payload/request/PipeTransferConfigSnapshotPieceReq.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/PipeTransferConfigSnapshotPieceReq.java
index ecd9e673beb..1be784d1a5e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/payload/request/PipeTransferConfigSnapshotPieceReq.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/PipeTransferConfigSnapshotPieceReq.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request;
+package org.apache.iotdb.confignode.manager.pipe.connector.payload;
 
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/payload/request/PipeTransferConfigSnapshotSealReq.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/PipeTransferConfigSnapshotSealReq.java
similarity index 98%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/payload/request/PipeTransferConfigSnapshotSealReq.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/PipeTransferConfigSnapshotSealReq.java
index 4321a8f22e0..0397cff3a94 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/payload/request/PipeTransferConfigSnapshotSealReq.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/PipeTransferConfigSnapshotSealReq.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request;
+package org.apache.iotdb.confignode.manager.pipe.connector.payload;
 
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/config/IoTDBConfigRegionAirGapConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
similarity index 91%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/config/IoTDBConfigRegionAirGapConnector.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index caa85c566a2..3914684e83c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/config/IoTDBConfigRegionAirGapConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -17,19 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.connector.config;
+package org.apache.iotdb.confignode.manager.pipe.connector.protocol;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
 import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBAirGapConnector;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigNodeHandshakeV1Req;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigNodeHandshakeV2Req;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigPlanReq;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigSnapshotPieceReq;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigSnapshotSealReq;
 import 
org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionSnapshotEvent;
 import 
org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionWritePlanEvent;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigNodeHandshakeV1Req;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigNodeHandshakeV2Req;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigPlanReq;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigSnapshotPieceReq;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigSnapshotSealReq;
 import org.apache.iotdb.confignode.service.ConfigNode;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.pipe.api.event.Event;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/config/IoTDBConfigRegionConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
similarity index 93%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/config/IoTDBConfigRegionConnector.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 58909ae9e53..1706ad85bbc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/connector/config/IoTDBConfigRegionConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.connector.config;
+package org.apache.iotdb.confignode.manager.pipe.connector.protocol;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -25,12 +25,12 @@ import 
org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
 import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClientManager;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
 import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBSslSyncConnector;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.client.IoTDBConfigNodeSyncClientManager;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigPlanReq;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigSnapshotPieceReq;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigSnapshotSealReq;
 import 
org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionSnapshotEvent;
 import 
org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionWritePlanEvent;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.client.IoTDBConfigNodeSyncClientManager;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigPlanReq;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigSnapshotPieceReq;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigSnapshotSealReq;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java
index 14fc9d83c2c..65217818e7d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java
@@ -29,7 +29,7 @@ import 
org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.ConfigManager;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.agent.PipeConfigNodeAgent;
+import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/execution/PipeConfigNodeSubtask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
similarity index 96%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/execution/PipeConfigNodeSubtask.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
index f845ddb1070..bbe0ead0edd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/execution/PipeConfigNodeSubtask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.execution;
+package org.apache.iotdb.confignode.manager.pipe.execution;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
@@ -28,8 +28,8 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.task.subtask.PipeAbstractConnectorSubtask;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.agent.PipeConfigNodeAgent;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.extractor.IoTDBConfigRegionExtractor;
+import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
+import 
org.apache.iotdb.confignode.manager.pipe.extractor.IoTDBConfigRegionExtractor;
 import org.apache.iotdb.pipe.api.PipeExtractor;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/execution/PipeConfigNodeSubtaskExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtaskExecutor.java
similarity index 96%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/execution/PipeConfigNodeSubtaskExecutor.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtaskExecutor.java
index 2fe8ad1e875..1a34ab4783a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/execution/PipeConfigNodeSubtaskExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtaskExecutor.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.execution;
+package org.apache.iotdb.confignode.manager.pipe.execution;
 
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.pipe.execution.executor.PipeSubtaskExecutor;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/extractor/ConfigRegionListeningFilter.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
similarity index 99%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/extractor/ConfigRegionListeningFilter.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
index f97954c71ec..84cebf259fb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/extractor/ConfigRegionListeningFilter.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.extractor;
+package org.apache.iotdb.confignode.manager.pipe.extractor;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/extractor/ConfigRegionListeningQueue.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningQueue.java
similarity index 99%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/extractor/ConfigRegionListeningQueue.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningQueue.java
index 1cd367ec6a3..48c7edd2c29 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/extractor/ConfigRegionListeningQueue.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningQueue.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.extractor;
+package org.apache.iotdb.confignode.manager.pipe.extractor;
 
 import org.apache.iotdb.commons.auth.user.LocalFileUserAccessor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/extractor/IoTDBConfigRegionExtractor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
similarity index 96%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/extractor/IoTDBConfigRegionExtractor.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
index aa414bd44db..91d8a21b70c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/extractor/IoTDBConfigRegionExtractor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.extractor;
+package org.apache.iotdb.confignode.manager.pipe.extractor;
 
 import org.apache.iotdb.commons.consensus.ConfigRegionId;
 import 
org.apache.iotdb.commons.pipe.datastructure.queue.listening.AbstractPipeListeningQueue;
@@ -25,9 +25,9 @@ import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
 import org.apache.iotdb.commons.pipe.extractor.IoTDBNonDataRegionExtractor;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
 import 
org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionSnapshotEvent;
 import 
org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionWritePlanEvent;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.agent.PipeConfigNodeAgent;
 import org.apache.iotdb.confignode.service.ConfigNode;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.exception.ConsensusException;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
similarity index 95%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/IoTDBConfigNodeReceiver.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 6d93c57ab1b..864c04789ac 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.agent.receiver;
+package org.apache.iotdb.confignode.manager.pipe.receiver.protocol;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -44,12 +44,14 @@ import 
org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSch
 import 
org.apache.iotdb.confignode.consensus.request.write.template.ExtendSchemaTemplatePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
 import org.apache.iotdb.confignode.manager.ConfigManager;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigNodeHandshakeV1Req;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigNodeHandshakeV2Req;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigPlanReq;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigSnapshotPieceReq;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigSnapshotSealReq;
 import 
org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionSnapshotEvent;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigNodeHandshakeV1Req;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigNodeHandshakeV2Req;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigPlanReq;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigSnapshotPieceReq;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigSnapshotSealReq;
+import 
org.apache.iotdb.confignode.manager.pipe.receiver.visitor.PipeConfigPhysicalPlanExceptionVisitor;
+import 
org.apache.iotdb.confignode.manager.pipe.receiver.visitor.PipeConfigPhysicalPlanTSStatusVisitor;
 import org.apache.iotdb.confignode.persistence.schema.CNPhysicalPlanGenerator;
 import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType;
 import org.apache.iotdb.confignode.persistence.schema.ConfignodeSnapshotParser;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/PipeConfigPhysicalPlanExceptionVisitor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanExceptionVisitor.java
similarity index 96%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/PipeConfigPhysicalPlanExceptionVisitor.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanExceptionVisitor.java
index 1012873f598..a258256d035 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/PipeConfigPhysicalPlanExceptionVisitor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanExceptionVisitor.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.agent.receiver;
+package org.apache.iotdb.confignode.manager.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.MetadataException;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/PipeConfigPhysicalPlanTSStatusVisitor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
similarity index 99%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/PipeConfigPhysicalPlanTSStatusVisitor.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
index 8ac227396ff..7046f3d8dc6 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/PipeConfigPhysicalPlanTSStatusVisitor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.agent.receiver;
+package org.apache.iotdb.confignode.manager.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/task/PipeConfigNodeTask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTask.java
similarity index 95%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/task/PipeConfigNodeTask.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTask.java
index 6da0c405a83..7eea3b8482e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/task/PipeConfigNodeTask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTask.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.task;
+package org.apache.iotdb.confignode.manager.pipe.task;
 
 import org.apache.iotdb.commons.pipe.task.PipeTask;
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/task/PipeConfigNodeTaskBuilder.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTaskBuilder.java
similarity index 94%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/task/PipeConfigNodeTaskBuilder.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTaskBuilder.java
index 8433126010d..31f5aa1ea9b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/task/PipeConfigNodeTaskBuilder.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTaskBuilder.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.task;
+package org.apache.iotdb.confignode.manager.pipe.task;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.pipe.task.PipeTask;
@@ -26,7 +26,7 @@ import 
org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.extractor.ConfigRegionListeningFilter;
+import 
org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/task/PipeConfigNodeTaskStage.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTaskStage.java
similarity index 90%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/task/PipeConfigNodeTaskStage.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTaskStage.java
index 647e8163a6b..36eec65b69e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/task/PipeConfigNodeTaskStage.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTaskStage.java
@@ -17,12 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.transfer.task;
+package org.apache.iotdb.confignode.manager.pipe.task;
 
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.task.stage.PipeTaskStage;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.execution.PipeConfigNodeSubtask;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.execution.PipeConfigNodeSubtaskExecutor;
+import 
org.apache.iotdb.confignode.manager.pipe.execution.PipeConfigNodeSubtask;
+import 
org.apache.iotdb.confignode.manager.pipe.execution.PipeConfigNodeSubtaskExecutor;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import java.util.Map;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 9cbb89ae9e6..99b4fe3d31e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -113,7 +113,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTrigger
 import 
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
 import 
org.apache.iotdb.confignode.consensus.response.partition.SchemaNodeManagementResp;
 import 
org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.agent.PipeConfigNodeAgent;
+import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
 import org.apache.iotdb.confignode.persistence.AuthorInfo;
 import org.apache.iotdb.confignode.persistence.ClusterInfo;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
index 31887d0c5fe..ba3981b67f1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
@@ -29,7 +29,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipeP
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.agent.PipeConfigNodeAgent;
+import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 5268757cc12..b92918f58a5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -45,8 +45,8 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
 import org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeMetrics;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.agent.PipeConfigNodeAgent;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/connector/IoTDBConfigRegionConnectorTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/connector/IoTDBConfigRegionConnectorTest.java
index a180365607b..30ed0ee7580 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/connector/IoTDBConfigRegionConnectorTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/connector/IoTDBConfigRegionConnectorTest.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.confignode.manager.pipe.connector;
 
 import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.config.IoTDBConfigRegionConnector;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.protocol.IoTDBConfigRegionConnector;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/PipeConfigNodeThriftRequestTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/connector/PipeConfigNodeThriftRequestTest.java
similarity index 89%
rename from 
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/PipeConfigNodeThriftRequestTest.java
rename to 
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/connector/PipeConfigNodeThriftRequestTest.java
index 6cc1d0d9753..934fa214e06 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/PipeConfigNodeThriftRequestTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/connector/PipeConfigNodeThriftRequestTest.java
@@ -17,13 +17,13 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe;
+package org.apache.iotdb.confignode.manager.pipe.connector;
 
 import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigNodeHandshakeV1Req;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigPlanReq;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigSnapshotPieceReq;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.connector.payload.request.PipeTransferConfigSnapshotSealReq;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigNodeHandshakeV1Req;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigPlanReq;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigSnapshotPieceReq;
+import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigSnapshotSealReq;
 import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType;
 
 import org.junit.Assert;
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtaskExecutorTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtaskExecutorTest.java
index 430ed022107..16d949d1dcb 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtaskExecutorTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtaskExecutorTest.java
@@ -25,8 +25,6 @@ import 
org.apache.iotdb.commons.pipe.execution.executor.PipeSubtaskExecutor;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.task.subtask.PipeSubtask;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.execution.PipeConfigNodeSubtask;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.execution.PipeConfigNodeSubtaskExecutor;
 
 import org.junit.After;
 import org.junit.Assert;
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractorTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractorTest.java
index 6779b16a51c..46f514dbaf1 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractorTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractorTest.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.confignode.manager.pipe.extractor;
 
 import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.extractor.IoTDBConfigRegionExtractor;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ConfigRegionListeningQueueTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ConfigRegionListeningQueueTest.java
index 62e3749677f..f9e72c59d6e 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ConfigRegionListeningQueueTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ConfigRegionListeningQueueTest.java
@@ -25,8 +25,8 @@ import 
org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
 import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan;
+import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
 import 
org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionWritePlanEvent;
-import 
org.apache.iotdb.confignode.manager.pipe.transfer.agent.PipeConfigNodeAgent;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
 import org.apache.iotdb.pipe.api.event.Event;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
index b626f16f237..6ddcb5d4b25 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
@@ -49,7 +49,7 @@ import javax.validation.constraints.NotNull;
 import java.time.ZoneId;
 import java.util.List;
 
-import static 
org.apache.iotdb.db.pipe.receiver.legacy.loader.ILoader.SCHEMA_FETCHER;
+import static 
org.apache.iotdb.db.pipe.receiver.protocol.legacy.loader.ILoader.SCHEMA_FETCHER;
 
 public class AuditLogger {
   private static final Logger logger = 
LoggerFactory.getLogger(AuditLogger.class);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
index e583644886f..555696d4147 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.db.pipe.agent.receiver;
 
 import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.receiver.airgap.IoTDBAirGapReceiverAgent;
-import org.apache.iotdb.db.pipe.receiver.legacy.IoTDBLegacyPipeReceiverAgent;
-import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBDataNodeReceiverAgent;
+import 
org.apache.iotdb.db.pipe.receiver.protocol.airgap.IoTDBAirGapReceiverAgent;
+import 
org.apache.iotdb.db.pipe.receiver.protocol.legacy.IoTDBLegacyPipeReceiverAgent;
+import 
org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index bcaa258502b..c7dc2d9c39d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningQueue;
-import 
org.apache.iotdb.db.pipe.progress.assigner.SimpleConsensusProgressIndexAssigner;
+import org.apache.iotdb.db.pipe.progress.SimpleConsensusProgressIndexAssigner;
 import 
org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.service.ResourcesInformationHolder;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/legacy/DeletionPipeData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/legacy/DeletionPipeData.java
index 6a95583674b..463213a408c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/legacy/DeletionPipeData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/legacy/DeletionPipeData.java
@@ -21,8 +21,8 @@
 package org.apache.iotdb.db.pipe.connector.payload.legacy;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.db.pipe.receiver.legacy.loader.DeletionLoader;
-import org.apache.iotdb.db.pipe.receiver.legacy.loader.ILoader;
+import org.apache.iotdb.db.pipe.receiver.protocol.legacy.loader.DeletionLoader;
+import org.apache.iotdb.db.pipe.receiver.protocol.legacy.loader.ILoader;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/legacy/PipeData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/legacy/PipeData.java
index 36e8dfa3ea5..d6417687a26 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/legacy/PipeData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/legacy/PipeData.java
@@ -20,7 +20,7 @@
 
 package org.apache.iotdb.db.pipe.connector.payload.legacy;
 
-import org.apache.iotdb.db.pipe.receiver.legacy.loader.ILoader;
+import org.apache.iotdb.db.pipe.receiver.protocol.legacy.loader.ILoader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/legacy/TsFilePipeData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/legacy/TsFilePipeData.java
index 1bad5946cab..87de0cd8430 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/legacy/TsFilePipeData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/legacy/TsFilePipeData.java
@@ -20,8 +20,8 @@
 
 package org.apache.iotdb.db.pipe.connector.payload.legacy;
 
-import org.apache.iotdb.db.pipe.receiver.legacy.loader.ILoader;
-import org.apache.iotdb.db.pipe.receiver.legacy.loader.TsFileLoader;
+import org.apache.iotdb.db.pipe.receiver.protocol.legacy.loader.ILoader;
+import org.apache.iotdb.db.pipe.receiver.protocol.legacy.loader.TsFileLoader;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataInputStream;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutor.java
similarity index 96%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutor.java
index 044ad6224ba..ce832091c1d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutor.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.execution.executor;
+package org.apache.iotdb.db.pipe.execution;
 
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutor.java
similarity index 95%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutor.java
index 410dd15ae84..a4b36d3664c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutor.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.execution.executor;
+package org.apache.iotdb.db.pipe.execution;
 
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeSubtaskExecutorManager.java
similarity index 97%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeSubtaskExecutorManager.java
index f10077a659a..1792a1015d6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeSubtaskExecutorManager.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.execution.executor;
+package org.apache.iotdb.db.pipe.execution;
 
 import 
org.apache.iotdb.db.subscription.execution.executor.SubscriptionSubtaskExecutor;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index bddcfc00ebe..e85fd5b0a1d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -25,8 +25,8 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
 import org.apache.iotdb.db.pipe.metric.PipeAssignerMetrics;
-import org.apache.iotdb.db.pipe.pattern.matcher.CachedSchemaPatternMatcher;
-import org.apache.iotdb.db.pipe.pattern.matcher.PipeDataRegionMatcher;
+import org.apache.iotdb.db.pipe.pattern.CachedSchemaPatternMatcher;
+import org.apache.iotdb.db.pipe.pattern.PipeDataRegionMatcher;
 
 import java.io.Closeable;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/CachedSchemaPatternMatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/CachedSchemaPatternMatcher.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java
index 470c7ba110f..1de43b7042e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/CachedSchemaPatternMatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.pattern.matcher;
+package org.apache.iotdb.db.pipe.pattern;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/PipeDataRegionMatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/PipeDataRegionMatcher.java
similarity index 97%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/PipeDataRegionMatcher.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/PipeDataRegionMatcher.java
index 4be05cc3a81..5132d6fb494 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/PipeDataRegionMatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/PipeDataRegionMatcher.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.pattern.matcher;
+package org.apache.iotdb.db.pipe.pattern;
 
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/progress/assigner/SimpleConsensusProgressIndexAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/progress/SimpleConsensusProgressIndexAssigner.java
similarity index 98%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/progress/assigner/SimpleConsensusProgressIndexAssigner.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/progress/SimpleConsensusProgressIndexAssigner.java
index 855330b7c6a..0105acd434c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/progress/assigner/SimpleConsensusProgressIndexAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/progress/SimpleConsensusProgressIndexAssigner.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.progress.assigner;
+package org.apache.iotdb.db.pipe.progress;
 
 import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
 import org.apache.iotdb.commons.exception.StartupException;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
similarity index 98%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 12f7634ac8f..f003219c781 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.receiver.airgap;
+package org.apache.iotdb.db.pipe.receiver.protocol.airgap;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.concurrent.WrappedRunnable;
@@ -26,7 +26,7 @@ import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapELanguageCon
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapOneByteResponse;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBDataNodeReceiverAgent;
+import 
org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java
similarity index 98%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiverAgent.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java
index 497ae6ce834..6db65b00a32 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.receiver.airgap;
+package org.apache.iotdb.db.pipe.receiver.protocol.airgap;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
index 30481ae9cf2..e2e57763432 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -18,7 +18,7 @@
  *
  */
 
-package org.apache.iotdb.db.pipe.receiver.legacy;
+package org.apache.iotdb.db.pipe.receiver.protocol.legacy;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/DeletionLoader.java
similarity index 98%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/DeletionLoader.java
index f5711a826ab..19dc268acec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/DeletionLoader.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.receiver.legacy.loader;
+package org.apache.iotdb.db.pipe.receiver.protocol.legacy.loader;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.db.auth.AuthorityChecker;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/ILoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/ILoader.java
similarity index 95%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/ILoader.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/ILoader.java
index eb759ab82ed..22f9f4d44b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/ILoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/ILoader.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.receiver.legacy.loader;
+package org.apache.iotdb.db.pipe.receiver.protocol.legacy.loader;
 
 import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
similarity index 98%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
index 080f5f105ec..9b9e0705f21 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.receiver.legacy.loader;
+package org.apache.iotdb.db.pipe.receiver.protocol.legacy.loader;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
similarity index 98%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index d704353c0f2..783069c566a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.receiver.thrift;
+package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -46,9 +46,9 @@ import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
-import org.apache.iotdb.db.pipe.receiver.PipePlanToStatementVisitor;
-import org.apache.iotdb.db.pipe.receiver.PipeStatementExceptionVisitor;
-import org.apache.iotdb.db.pipe.receiver.PipeStatementTSStatusVisitor;
+import org.apache.iotdb.db.pipe.receiver.visitor.PipePlanToStatementVisitor;
+import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor;
+import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
 import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
similarity index 95%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiverAgent.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
index 113b0a00278..d2dfb1309e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.receiver.thrift;
+package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
 
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
index 37b66c93e51..1c1317ee94a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.receiver;
+package org.apache.iotdb.db.pipe.receiver.visitor;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipeStatementExceptionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
similarity index 98%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipeStatementExceptionVisitor.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
index 1598d6e15d7..c99b2fb41dc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipeStatementExceptionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.receiver;
+package org.apache.iotdb.db.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.MetadataException;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTSStatusVisitor.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 2fcb6c76f8c..52a0c963278 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.receiver;
+package org.apache.iotdb.db.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
index e3e0d36429e..7871038a123 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
@@ -24,9 +24,9 @@ import 
org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeType;
-import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
-import 
org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
+import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.execution.PipeProcessorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.execution.PipeSubtaskExecutorManager;
 import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskExtractorStage;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index d3989915120..bc6f1bb6291 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.task.connection;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.commons.pipe.progress.committer.PipeEventCommitManager;
+import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
 import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 8ab1e10d0b7..becb0c78dd7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.task.stage;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
 import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
 import org.apache.iotdb.commons.pipe.task.stage.PipeTaskStage;
-import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
 import 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 9fd7fcfe2fa..2540debdd8d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -29,7 +29,7 @@ import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.task.stage.PipeTaskStage;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import 
org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.execution.PipeProcessorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
 import org.apache.iotdb.db.storageengine.StorageEngine;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
index 0d1ddc57f8b..8391f4c7c81 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.task.subtask.connector;
 
 import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
-import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import org.slf4j.Logger;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 1089ef9ceaa..124260a434e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -25,10 +25,10 @@ import 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
-import org.apache.iotdb.commons.pipe.progress.committer.PipeEventCommitManager;
+import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
 import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.pipe.api.PipeConnector;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/execution/executor/SubscriptionSubtaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/execution/executor/SubscriptionSubtaskExecutor.java
index 9ef5e6b07f8..84a8e08d232 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/execution/executor/SubscriptionSubtaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/execution/executor/SubscriptionSubtaskExecutor.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.subscription.execution.executor;
 
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
-import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
 
 public class SubscriptionSubtaskExecutor extends PipeConnectorSubtaskExecutor {
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
index 85f511f5d2b..1bbcc153beb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.subscription.task.stage;
 
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
 import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
-import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
 import 
org.apache.iotdb.db.subscription.task.subtask.SubscriptionConnectorSubtaskManager;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
index ac25e01f3f4..8192d13e811 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.subscription.task.subtask;
 
 import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
-import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
 import 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
index fd10dc272db..ab5040f04ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
@@ -25,10 +25,10 @@ import 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
-import org.apache.iotdb.commons.pipe.progress.committer.PipeEventCommitManager;
+import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
 import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
 import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
 import 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
index b2673de2ded..7c7d74825b0 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.connector;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
-import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBDataNodeReceiver;
+import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
index f3db55a9f32..e1f5063eacc 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.pipe.execution;
 
 import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
-import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutorTest.java
index 8c1db1384eb..53e62831dfe 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutorTest.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.pipe.execution;
 
 import org.apache.iotdb.commons.pipe.task.EventSupplier;
-import 
org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
index 609dd9b3f44..3710173892d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
@@ -25,7 +25,6 @@ import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeE
 import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
-import org.apache.iotdb.db.pipe.pattern.matcher.CachedSchemaPatternMatcher;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 5c6cc6c262d..a782db82adb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.commons.pipe.event;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
-import org.apache.iotdb.commons.pipe.progress.committer.PipeEventCommitManager;
+import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.pipe.api.event.Event;
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeEventCommitMetrics.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeEventCommitMetrics.java
index c6ed82ad215..e7ab3ef4248 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeEventCommitMetrics.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeEventCommitMetrics.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.commons.pipe.metric;
 
-import org.apache.iotdb.commons.pipe.progress.committer.PipeEventCommitter;
+import org.apache.iotdb.commons.pipe.progress.PipeEventCommitter;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.metrics.AbstractMetricService;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/committer/PipeEventCommitManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
similarity index 98%
rename from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/committer/PipeEventCommitManager.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
index 75589b80edd..89eb7b2868c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/committer/PipeEventCommitManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.pipe.progress.committer;
+package org.apache.iotdb.commons.pipe.progress;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.metric.PipeEventCommitMetrics;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/committer/PipeEventCommitter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
similarity index 98%
rename from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/committer/PipeEventCommitter.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
index 1d9002c0fcb..04664baf1ea 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/committer/PipeEventCommitter.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.pipe.progress.committer;
+package org.apache.iotdb.commons.pipe.progress;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.event.Event;

Reply via email to