This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new 3c0d8264bd8 [IOTDB-6114] Pipe: Support multi-cluster data sync 
(#10868)(#10926)
3c0d8264bd8 is described below

commit 3c0d8264bd80cfd8e2dced092acc11be6c22cd18
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Aug 22 17:07:35 2023 +0800

    [IOTDB-6114] Pipe: Support multi-cluster data sync (#10868)(#10926)
---
 .../org/apache/iotdb/db/audit/AuditLogger.java     |   1 +
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |   1 +
 .../dataregion/DataExecutionVisitor.java           |  23 ++
 .../config/constant/PipeExtractorConstant.java     |   4 +
 .../apache/iotdb/db/pipe/event/EnrichedEvent.java  |   2 +
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  17 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  17 +-
 .../db/pipe/event/realtime/PipeRealtimeEvent.java  |   8 +-
 .../event/realtime/PipeRealtimeEventFactory.java   |  10 +-
 .../PipeHistoricalDataRegionTsFileExtractor.java   |   2 +
 .../realtime/PipeRealtimeDataRegionExtractor.java  |  10 +
 .../realtime/assigner/PipeDataRegionAssigner.java  |   4 +
 .../listener/PipeInsertionDataNodeListener.java    |   6 +-
 .../receiver/thrift/IoTDBThriftReceiverV1.java     |  12 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   8 +-
 .../execution/executor/RegionWriteExecutor.java    |   7 +
 .../execution/load/LoadTsFileManager.java          |  12 +-
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  42 ++++
 .../queryengine/plan/execution/QueryExecution.java |   4 +-
 .../plan/execution/config/ConfigTaskVisitor.java   |  10 +-
 .../config/executor/ClusterConfigTaskExecutor.java |  10 +-
 .../config/executor/IConfigTaskExecutor.java       |  10 +-
 .../execution/config/sys/pipe/CreatePipeTask.java  |   2 +-
 .../execution/config/sys/pipe/DropPipeTask.java    |   2 +-
 .../execution/config/sys/pipe/ShowPipeTask.java    |   2 +-
 .../execution/config/sys/pipe/StartPipeTask.java   |   2 +-
 .../execution/config/sys/pipe/StopPipeTask.java    |   2 +-
 .../db/queryengine/plan/parser/ASTVisitor.java     |  10 +-
 .../plan/planner/LogicalPlanVisitor.java           |  45 ++++
 .../plan/planner/plan/node/PlanNodeType.java       |   7 +-
 .../plan/planner/plan/node/PlanVisitor.java        |   5 +
 .../planner/plan/node/load/LoadTsFileNode.java     |   4 -
 .../plan/node/write/InsertMultiTabletsNode.java    |   6 +
 .../plan/planner/plan/node/write/InsertNode.java   |  12 +-
 .../planner/plan/node/write/InsertRowsNode.java    |   6 +
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |   6 +
 .../plan/node/write/PipeEnrichedInsertNode.java    | 276 +++++++++++++++++++++
 .../scheduler/load/LoadTsFileDispatcherImpl.java   |  11 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |  11 +-
 .../queryengine/plan/statement/StatementType.java  |   1 +
 .../plan/statement/StatementVisitor.java           |  58 +++--
 .../plan/statement/crud/InsertBaseStatement.java   |   3 +-
 .../plan/statement/crud/LoadTsFileStatement.java   |  11 +
 .../crud/PipeEnrichedInsertBaseStatement.java      | 224 +++++++++++++++++
 .../crud/PipeEnrichedLoadTsFileStatement.java      | 132 ++++++++++
 .../pipe/CreatePipeStatement.java                  |   2 +-
 .../{sys => metadata}/pipe/DropPipeStatement.java  |   2 +-
 .../{sys => metadata}/pipe/ShowPipesStatement.java |   2 +-
 .../{sys => metadata}/pipe/StartPipeStatement.java |   2 +-
 .../{sys => metadata}/pipe/StopPipeStatement.java  |   2 +-
 .../iotdb/db/storageengine/StorageEngine.java      |   5 +-
 .../db/storageengine/dataregion/DataRegion.java    |   7 +-
 .../dataregion/memtable/TsFileProcessor.java       |   3 +-
 .../quotas/DataNodeThrottleQuotaManager.java       |   1 +
 .../rescon/quotas/DefaultOperationQuota.java       |  14 +-
 .../db/trigger/executor/TriggerFireVisitor.java    |  20 ++
 .../db/pipe/extractor/PipeRealtimeExtractTest.java |   3 +-
 .../plan/statement/sys/pipe/PipeStatementTest.java |  10 +-
 .../scheduler/load/LoadTsFileSchedulerTest.java    |   3 +-
 .../thrift/src/main/thrift/datanode.thrift         |   1 +
 60 files changed, 1029 insertions(+), 106 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
index 5c7b9da4923..fa1fd88dbd8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
@@ -205,6 +205,7 @@ public class AuditLogger {
       case BATCH_INSERT_ROWS:
       case BATCH_INSERT_ONE_DEVICE:
       case MULTI_BATCH_INSERT:
+      case PIPE_ENRICHED_INSERT:
       case DELETE:
       case SELECT_INTO:
       case LOAD_FILES:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
index 674f09d2bb1..2b6298feff9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
@@ -209,6 +209,7 @@ public class AuthorityChecker {
       case BATCH_INSERT_ONE_DEVICE:
       case BATCH_INSERT_ROWS:
       case MULTI_BATCH_INSERT:
+      case PIPE_ENRICHED_INSERT:
         return PrivilegeType.INSERT_TIMESERIES.ordinal();
       case LIST_ROLE:
       case LIST_ROLE_USERS:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 28384aee64b..1e647995a57 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -28,10 +28,12 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -164,6 +166,27 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
     }
   }
 
+  @Override
+  public TSStatus visitPipeEnrichedInsert(PipeEnrichedInsertNode node, 
DataRegion context) {
+    final InsertNode realInsertNode = node.getInsertNode();
+
+    realInsertNode.markAsGeneratedByPipe();
+
+    if (realInsertNode instanceof InsertRowNode) {
+      return visitInsertRow((InsertRowNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertTabletNode) {
+      return visitInsertTablet((InsertTabletNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertRowsNode) {
+      return visitInsertRows((InsertRowsNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertMultiTabletsNode) {
+      return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode, 
context);
+    } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) {
+      return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode) 
realInsertNode, context);
+    } else {
+      return visitPlan(realInsertNode, context);
+    }
+  }
+
   @Override
   public TSStatus visitDeleteData(DeleteDataNode node, DataRegion dataRegion) {
     try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
index b92b502ae19..6a7c095c148 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
@@ -26,6 +26,10 @@ public class PipeExtractorConstant {
   public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern";
   public static final String EXTRACTOR_PATTERN_DEFAULT_VALUE = "root";
 
+  public static final String EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY =
+      "extractor.forwarding-pipe-requests";
+  public static final boolean EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE 
= true;
+
   public static final String EXTRACTOR_HISTORY_ENABLE_KEY = 
"extractor.history.enable";
   public static final String EXTRACTOR_HISTORY_START_TIME = 
"extractor.history.start-time";
   public static final String EXTRACTOR_HISTORY_END_TIME = 
"extractor.history.end-time";
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index ddd0c78877c..ef49f2bf8c9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -134,4 +134,6 @@ public abstract class EnrichedEvent implements Event {
   public void reportException(PipeRuntimeException pipeRuntimeException) {
     PipeAgent.runtime().report(this.pipeTaskMeta, pipeRuntimeException);
   }
+
+  public abstract boolean isGeneratedByPipe();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 7162cdabeca..739a2d88292 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -46,24 +46,30 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   private final WALEntryHandler walEntryHandler;
   private final ProgressIndex progressIndex;
   private final boolean isAligned;
+  private final boolean isGeneratedByPipe;
 
   private TabletInsertionDataContainer dataContainer;
 
   public PipeInsertNodeTabletInsertionEvent(
-      WALEntryHandler walEntryHandler, ProgressIndex progressIndex, boolean 
isAligned) {
-    this(walEntryHandler, progressIndex, isAligned, null, null);
+      WALEntryHandler walEntryHandler,
+      ProgressIndex progressIndex,
+      boolean isAligned,
+      boolean isGeneratedByPipe) {
+    this(walEntryHandler, progressIndex, isAligned, isGeneratedByPipe, null, 
null);
   }
 
   private PipeInsertNodeTabletInsertionEvent(
       WALEntryHandler walEntryHandler,
       ProgressIndex progressIndex,
       boolean isAligned,
+      boolean isGeneratedByPipe,
       PipeTaskMeta pipeTaskMeta,
       String pattern) {
     super(pipeTaskMeta, pattern);
     this.walEntryHandler = walEntryHandler;
     this.progressIndex = progressIndex;
     this.isAligned = isAligned;
+    this.isGeneratedByPipe = isGeneratedByPipe;
   }
 
   public InsertNode getInsertNode() throws WALPipeException {
@@ -111,7 +117,12 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   public PipeInsertNodeTabletInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       PipeTaskMeta pipeTaskMeta, String pattern) {
     return new PipeInsertNodeTabletInsertionEvent(
-        walEntryHandler, progressIndex, isAligned, pipeTaskMeta, pattern);
+        walEntryHandler, progressIndex, isAligned, isGeneratedByPipe, 
pipeTaskMeta, pattern);
+  }
+
+  @Override
+  public boolean isGeneratedByPipe() {
+    return isGeneratedByPipe;
   }
 
   /////////////////////////// TabletInsertionEvent ///////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index bb260bf22ac..5705f5c8600 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -48,16 +48,19 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   private final TsFileResource resource;
   private File tsFile;
 
+  private final boolean isGeneratedByPipe;
+
   private final AtomicBoolean isClosed;
 
   private TsFileInsertionDataContainer dataContainer;
 
-  public PipeTsFileInsertionEvent(TsFileResource resource) {
-    this(resource, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
+  public PipeTsFileInsertionEvent(TsFileResource resource, boolean 
isGeneratedByPipe) {
+    this(resource, isGeneratedByPipe, null, null, Long.MIN_VALUE, 
Long.MAX_VALUE);
   }
 
   public PipeTsFileInsertionEvent(
       TsFileResource resource,
+      boolean isGeneratedByPipe,
       PipeTaskMeta pipeTaskMeta,
       String pattern,
       long startTime,
@@ -70,6 +73,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
     this.resource = resource;
     tsFile = resource.getTsFile();
 
+    this.isGeneratedByPipe = isGeneratedByPipe;
+
     isClosed = new AtomicBoolean(resource.isClosed());
     // register close listener if TsFile is not closed
     if (!isClosed.get()) {
@@ -153,7 +158,13 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
   @Override
   public PipeTsFileInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       PipeTaskMeta pipeTaskMeta, String pattern) {
-    return new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern, 
startTime, endTime);
+    return new PipeTsFileInsertionEvent(
+        resource, isGeneratedByPipe, pipeTaskMeta, pattern, startTime, 
endTime);
+  }
+
+  @Override
+  public boolean isGeneratedByPipe() {
+    return isGeneratedByPipe;
   }
 
   /////////////////////////// TsFileInsertionEvent ///////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 5db7b20bde0..38f318cd05d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
-import org.apache.iotdb.pipe.api.event.Event;
 
 import java.util.Map;
 
@@ -69,7 +68,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
     this.device2Measurements = device2Measurements;
   }
 
-  public Event getEvent() {
+  public EnrichedEvent getEvent() {
     return event;
   }
 
@@ -129,6 +128,11 @@ public class PipeRealtimeEvent extends EnrichedEvent {
         pattern);
   }
 
+  @Override
+  public boolean isGeneratedByPipe() {
+    return event.isGeneratedByPipe();
+  }
+
   @Override
   public String toString() {
     return "PipeRealtimeEvent{"
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index f542d8c6647..6af720da2f2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -30,16 +30,20 @@ public class PipeRealtimeEventFactory {
 
   private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new 
TsFileEpochManager();
 
-  public static PipeRealtimeEvent createRealtimeEvent(TsFileResource resource) 
{
+  public static PipeRealtimeEvent createRealtimeEvent(
+      TsFileResource resource, boolean isGeneratedByPipe) {
     return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
-        new PipeTsFileInsertionEvent(resource), resource);
+        new PipeTsFileInsertionEvent(resource, isGeneratedByPipe), resource);
   }
 
   public static PipeRealtimeEvent createRealtimeEvent(
       WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource 
resource) {
     return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
         new PipeInsertNodeTabletInsertionEvent(
-            walEntryHandler, insertNode.getProgressIndex(), 
insertNode.isAligned()),
+            walEntryHandler,
+            insertNode.getProgressIndex(),
+            insertNode.isAligned(),
+            insertNode.isGeneratedByPipe()),
         insertNode,
         resource);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index a6c6b4d60ee..506dd6adcd7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -179,6 +179,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                     resource ->
                         new PipeTsFileInsertionEvent(
                             resource,
+                            false,
                             pipeTaskMeta,
                             pattern,
                             historicalDataExtractionStartTime,
@@ -195,6 +196,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                     resource ->
                         new PipeTsFileInsertionEvent(
                             resource,
+                            false,
                             pipeTaskMeta,
                             pattern,
                             historicalDataExtractionStartTime,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index 9fd72fb7b67..22658fb3b42 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -32,6 +32,8 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor 
{
 
   protected String pattern;
+  protected boolean isForwardingPipeRequests;
+
   protected String dataRegionId;
   protected PipeTaskMeta pipeTaskMeta;
 
@@ -51,6 +53,10 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
         parameters.getStringOrDefault(
             PipeExtractorConstant.EXTRACTOR_PATTERN_KEY,
             PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
+    isForwardingPipeRequests =
+        parameters.getBooleanOrDefault(
+            PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+            
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
 
     final PipeTaskExtractorRuntimeEnvironment environment =
         (PipeTaskExtractorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
@@ -79,6 +85,10 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
     return pattern;
   }
 
+  public final boolean isForwardingPipeRequests() {
+    return isForwardingPipeRequests;
+  }
+
   public final PipeTaskMeta getPipeTaskMeta() {
     return pipeTaskMeta;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
index ee569e9e4ff..1177634be9c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
@@ -47,6 +47,10 @@ public class PipeDataRegionAssigner {
         .match(event)
         .forEach(
             extractor -> {
+              if (event.getEvent().isGeneratedByPipe() && 
!extractor.isForwardingPipeRequests()) {
+                return;
+              }
+
               final PipeRealtimeEvent copiedEvent =
                   event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
                       extractor.getPipeTaskMeta(), extractor.getPattern());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
index b7a7a0c3770..35648b1bb5c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
@@ -91,7 +91,8 @@ public class PipeInsertionDataNodeListener {
 
   //////////////////////////// listen to events ////////////////////////////
 
-  public void listenToTsFile(String dataRegionId, TsFileResource 
tsFileResource) {
+  public void listenToTsFile(
+      String dataRegionId, TsFileResource tsFileResource, boolean 
isGeneratedByPipe) {
     // We don't judge whether listenToTsFileExtractorCount.get() == 0 here on 
purpose
     // because extractors may use tsfile events when some exceptions occur in 
the
     // insert nodes listening process.
@@ -103,7 +104,8 @@ public class PipeInsertionDataNodeListener {
       return;
     }
 
-    
assigner.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource));
+    assigner.publishToAssign(
+        PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource, 
isGeneratedByPipe));
   }
 
   public void listenToInsertNode(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 2aebbd7a3e5..09d6870f489 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -38,8 +38,11 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -424,12 +427,17 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
           TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null 
statement.");
     }
 
-    final long queryId = SessionManager.getInstance().requestQueryId();
+    if (statement instanceof InsertBaseStatement) {
+      statement = new PipeEnrichedInsertBaseStatement((InsertBaseStatement) 
statement);
+    } else if (statement instanceof LoadTsFileStatement) {
+      statement = new PipeEnrichedLoadTsFileStatement((LoadTsFileStatement) 
statement);
+    }
+
     final ExecutionResult result =
         Coordinator.getInstance()
             .execute(
                 statement,
-                queryId,
+                SessionManager.getInstance().requestQueryId(),
                 null,
                 "",
                 partitionFetcher,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 86647205bc9..446f71e2cb3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -404,12 +404,12 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TLoadResp sendLoadCommand(TLoadCommandReq req) {
-
-    TSStatus resultStatus =
+    return createTLoadResp(
         StorageEngine.getInstance()
             .executeLoadCommand(
-                LoadTsFileScheduler.LoadCommand.values()[req.commandType], 
req.uuid);
-    return createTLoadResp(resultStatus);
+                LoadTsFileScheduler.LoadCommand.values()[req.commandType],
+                req.uuid,
+                req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe));
   }
 
   private TLoadResp createTLoadResp(TSStatus resultStatus) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index 1b94e607282..c60787dd737 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -60,6 +60,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import org.apache.iotdb.db.schemaengine.SchemaEngine;
 import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
@@ -222,6 +223,12 @@ public class RegionWriteExecutor {
       return executeDataInsert(node, context);
     }
 
+    @Override
+    public RegionExecutionResult visitPipeEnrichedInsert(
+        PipeEnrichedInsertNode node, WritePlanNodeExecutionContext context) {
+      return executeDataInsert(node, context);
+    }
+
     private RegionExecutionResult executeDataInsert(
         InsertNode insertNode, WritePlanNodeExecutionContext context) {
       RegionExecutionResult response = new RegionExecutionResult();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index b1ab2b77e6b..97542ceab24 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -133,11 +133,12 @@ public class LoadTsFileManager {
     }
   }
 
-  public boolean loadAll(String uuid) throws IOException, LoadFileException {
+  public boolean loadAll(String uuid, boolean isGeneratedByPipe)
+      throws IOException, LoadFileException {
     if (!uuid2WriterManager.containsKey(uuid)) {
       return false;
     }
-    uuid2WriterManager.get(uuid).loadAll();
+    uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe);
     clean(uuid);
     return true;
   }
@@ -249,7 +250,7 @@ public class LoadTsFileManager {
       }
     }
 
-    private void loadAll() throws IOException, LoadFileException {
+    private void loadAll(boolean isGeneratedByPipe) throws IOException, 
LoadFileException {
       if (isClosed) {
         throw new 
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
       }
@@ -259,7 +260,10 @@ public class LoadTsFileManager {
           writer.endChunkGroup();
         }
         writer.endFile();
-        entry.getKey().getDataRegion().loadNewTsFile(generateResource(writer), 
true);
+        entry
+            .getKey()
+            .getDataRegion()
+            .loadNewTsFile(generateResource(writer), true, isGeneratedByPipe);
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 5f56411ac91..7135a8ddb9e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -98,6 +98,8 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -2425,6 +2427,37 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     }
   }
 
+  @Override
+  public Analysis visitPipeEnrichedInsert(
+      PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement, 
MPPQueryContext context) {
+    Analysis analysis;
+
+    final InsertBaseStatement insertBaseStatement =
+        pipeEnrichedInsertBaseStatement.getInsertBaseStatement();
+    if (insertBaseStatement instanceof InsertTabletStatement) {
+      analysis = visitInsertTablet((InsertTabletStatement) 
insertBaseStatement, context);
+    } else if (insertBaseStatement instanceof InsertMultiTabletsStatement) {
+      analysis =
+          visitInsertMultiTablets((InsertMultiTabletsStatement) 
insertBaseStatement, context);
+    } else if (insertBaseStatement instanceof InsertRowStatement) {
+      analysis = visitInsertRow((InsertRowStatement) insertBaseStatement, 
context);
+    } else if (insertBaseStatement instanceof InsertRowsStatement) {
+      analysis = visitInsertRows((InsertRowsStatement) insertBaseStatement, 
context);
+    } else if (insertBaseStatement instanceof InsertRowsOfOneDeviceStatement) {
+      analysis =
+          visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceStatement) 
insertBaseStatement, context);
+    } else {
+      throw new UnsupportedOperationException(
+          "Unsupported insert statement type: " + 
insertBaseStatement.getClass().getName());
+    }
+
+    // statement may be changed because of logical view
+    pipeEnrichedInsertBaseStatement.setInsertBaseStatement(
+        (InsertBaseStatement) analysis.getStatement());
+    analysis.setStatement(pipeEnrichedInsertBaseStatement);
+    return analysis;
+  }
+
   private void validateSchema(
       Analysis analysis, InsertBaseStatement insertStatement, MPPQueryContext 
context) {
     final long startTime = System.nanoTime();
@@ -2478,6 +2511,15 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         .analyzeFileByFile();
   }
 
+  @Override
+  public Analysis visitPipeEnrichedLoadFile(
+      PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, 
MPPQueryContext context) {
+    final Analysis analysis =
+        
visitLoadFile(pipeEnrichedLoadTsFileStatement.getLoadTsFileStatement(), 
context);
+    analysis.setStatement(pipeEnrichedLoadTsFileStatement);
+    return analysis;
+  }
+
   /** get analysis according to statement and params */
   private Analysis getAnalysisForWriting(
       Analysis analysis, List<DataPartitionQueryParam> 
dataPartitionQueryParams) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index f7bca3e2a73..14765cfe2aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -64,6 +64,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -322,7 +323,8 @@ public class QueryExecution implements IQueryExecution {
               context,
               stateMachine,
               syncInternalServiceClientManager,
-              partitionFetcher);
+              partitionFetcher,
+              rawStatement instanceof PipeEnrichedLoadTsFileStatement);
       this.scheduler.start();
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java
index f98bd503882..57b6a3cbad9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java
@@ -119,6 +119,11 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.CreateModel
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.DropModelStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.ShowModelsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.ShowTrailsStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.AlterSchemaTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.CreateSchemaTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.DeactivateTemplateStatement;
@@ -138,11 +143,6 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.sys.KillQueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.LoadConfigurationStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.MergeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.SetSystemStatusStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.CreatePipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.DropPipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.ShowPipesStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StartPipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StopPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetSpaceQuotaStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetThrottleQuotaStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowSpaceQuotaStatement;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index a2c4783e266..1f25a869d04 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -160,6 +160,11 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseState
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.CreateModelStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.AlterSchemaTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.CreateSchemaTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.DeactivateTemplateStatement;
@@ -174,11 +179,6 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogica
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.DeleteLogicalViewStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.RenameLogicalViewStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.KillQueryStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.CreatePipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.DropPipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.ShowPipesStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StartPipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StopPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetSpaceQuotaStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetThrottleQuotaStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowSpaceQuotaStatement;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index a59ada9d88b..895f81cd9be 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -44,6 +44,11 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseState
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.CreateModelStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.AlterSchemaTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.CreateSchemaTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.DeactivateTemplateStatement;
@@ -57,11 +62,6 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.AlterLogical
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.DeleteLogicalViewStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.RenameLogicalViewStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.KillQueryStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.CreatePipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.DropPipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.ShowPipesStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StartPipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StopPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetSpaceQuotaStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetThrottleQuotaStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowSpaceQuotaStatement;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
index 5d71693f6bc..f0213c7d4f7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
@@ -22,7 +22,7 @@ package 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
 import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.CreatePipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/DropPipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/DropPipeTask.java
index ec074a0de87..f3449f36990 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/DropPipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/DropPipeTask.java
@@ -22,7 +22,7 @@ package 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
 import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.DropPipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
index 1e7dc50be2c..e5343e3c8eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
@@ -27,7 +27,7 @@ import 
org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
 import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.ShowPipesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StartPipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StartPipeTask.java
index 9b9539c732f..62b55bb017c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StartPipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StartPipeTask.java
@@ -22,7 +22,7 @@ package 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
 import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StartPipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StopPipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StopPipeTask.java
index a0e495cc8bd..c18070a5027 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StopPipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StopPipeTask.java
@@ -22,7 +22,7 @@ package 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
 import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StopPipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index c03775b3d39..4ea3445b14b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -160,6 +160,11 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.CreateModel
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.DropModelStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.ShowModelsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.ShowTrailsStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.AlterSchemaTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.CreateSchemaTemplateStatement;
@@ -185,11 +190,6 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.sys.MergeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.SetSystemStatusStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.CreatePipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.DropPipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.ShowPipesStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StartPipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StopPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetSpaceQuotaStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetThrottleQuotaStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowSpaceQuotaStatement;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 86b98eda96a..372afb78488 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -42,21 +42,26 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.Mea
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -487,12 +492,52 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
     return insertNode;
   }
 
+  @Override
+  public PlanNode visitPipeEnrichedInsert(
+      PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement, 
MPPQueryContext context) {
+    InsertNode insertNode;
+
+    final InsertBaseStatement insertBaseStatement =
+        pipeEnrichedInsertBaseStatement.getInsertBaseStatement();
+    if (insertBaseStatement instanceof InsertRowStatement) {
+      insertNode =
+          (InsertRowNode) visitInsertRow((InsertRowStatement) 
insertBaseStatement, context);
+    } else if (insertBaseStatement instanceof InsertRowsStatement) {
+      insertNode =
+          (InsertRowsNode) visitInsertRows((InsertRowsStatement) 
insertBaseStatement, context);
+    } else if (insertBaseStatement instanceof InsertRowsOfOneDeviceStatement) {
+      insertNode =
+          (InsertRowsOfOneDeviceNode)
+              visitInsertRowsOfOneDevice(
+                  (InsertRowsOfOneDeviceStatement) insertBaseStatement, 
context);
+    } else if (insertBaseStatement instanceof InsertTabletStatement) {
+      insertNode =
+          (InsertTabletNode)
+              visitInsertTablet((InsertTabletStatement) insertBaseStatement, 
context);
+    } else if (insertBaseStatement instanceof InsertMultiTabletsStatement) {
+      insertNode =
+          (InsertMultiTabletsNode)
+              visitInsertMultiTablets((InsertMultiTabletsStatement) 
insertBaseStatement, context);
+    } else {
+      throw new UnsupportedOperationException(
+          "Unsupported insert statement type: " + 
insertBaseStatement.getClass().getName());
+    }
+
+    return new PipeEnrichedInsertNode(insertNode);
+  }
+
   @Override
   public PlanNode visitLoadFile(LoadTsFileStatement loadTsFileStatement, 
MPPQueryContext context) {
     return new LoadTsFileNode(
         context.getQueryId().genPlanNodeId(), 
loadTsFileStatement.getResources());
   }
 
+  @Override
+  public PlanNode visitPipeEnrichedLoadFile(
+      PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, 
MPPQueryContext context) {
+    return 
visitLoadFile(pipeEnrichedLoadTsFileStatement.getLoadTsFileStatement(), 
context);
+  }
+
   @Override
   public PlanNode visitShowTimeSeries(
       ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext 
context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index d2379d1435d..2364aef9d7e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -93,6 +93,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataInputStream;
@@ -177,7 +178,9 @@ public enum PlanNodeType {
   ROLLBACK_LOGICAL_VIEW_BLACK_LIST((short) 75),
   DELETE_LOGICAL_VIEW((short) 76),
   LOGICAL_VIEW_SCHEMA_SCAN((short) 77),
-  ALTER_LOGICAL_VIEW((short) 78);
+  ALTER_LOGICAL_VIEW((short) 78),
+  PIPE_ENRICHED_INSERT((short) 79),
+  ;
 
   public static final int BYTES = Short.BYTES;
 
@@ -380,6 +383,8 @@ public enum PlanNodeType {
         return LogicalViewSchemaScanNode.deserialize(buffer);
       case 78:
         return AlterLogicalViewNode.deserialize(buffer);
+      case 79:
+        return PipeEnrichedInsertNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index c603af07f0e..e6eb22fc5a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -94,6 +94,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 
 public abstract class PlanVisitor<R, C> {
 
@@ -423,6 +424,10 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
+  public R visitPipeEnrichedInsert(PipeEnrichedInsertNode node, C context) {
+    return visitPlan(node, context);
+  }
+
   public R visitDeleteData(DeleteDataNode node, C context) {
     return visitPlan(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index f304678fc3c..793e6139693 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -40,10 +40,6 @@ public class LoadTsFileNode extends WritePlanNode {
 
   private final List<TsFileResource> resources;
 
-  public LoadTsFileNode(PlanNodeId id) {
-    this(id, new ArrayList<>());
-  }
-
   public LoadTsFileNode(PlanNodeId id, List<TsFileResource> resources) {
     super(id);
     this.resources = resources;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 167019fe361..69af99b48cf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -240,6 +240,12 @@ public class InsertMultiTabletsNode extends InsertNode {
     }
   }
 
+  @Override
+  public void markAsGeneratedByPipe() {
+    isGeneratedByPipe = true;
+    insertTabletNodeList.forEach(InsertTabletNode::markAsGeneratedByPipe);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index fba7c0574ca..abca512e1e8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -77,6 +77,8 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
 
   protected ProgressIndex progressIndex;
 
+  protected boolean isGeneratedByPipe = false;
+
   protected InsertNode(PlanNodeId id) {
     super(id);
   }
@@ -169,6 +171,14 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
     throw new NotImplementedException("serializeAttributes of InsertNode is 
not implemented");
   }
 
+  public boolean isGeneratedByPipe() {
+    return isGeneratedByPipe;
+  }
+
+  public void markAsGeneratedByPipe() {
+    isGeneratedByPipe = true;
+  }
+
   // region Serialization methods for WAL
   /** Serialized size of measurement schemas, ignoring failed time series */
   protected int serializeMeasurementSchemasSize() {
@@ -274,7 +284,7 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
   // region progress index
 
   @Override
-  public final ProgressIndex getProgressIndex() {
+  public ProgressIndex getProgressIndex() {
     return progressIndex;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 61949962285..20394f14ae0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -202,6 +202,12 @@ public class InsertRowsNode extends InsertNode {
     }
   }
 
+  @Override
+  public void markAsGeneratedByPipe() {
+    isGeneratedByPipe = true;
+    insertRowNodeList.forEach(InsertRowNode::markAsGeneratedByPipe);
+  }
+
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
     Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 28b436c6237..a05e2080f95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -269,6 +269,12 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
     }
   }
 
+  @Override
+  public void markAsGeneratedByPipe() {
+    isGeneratedByPipe = true;
+    insertRowNodeList.forEach(InsertRowNode::markAsGeneratedByPipe);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
new file mode 100644
index 00000000000..c2988beafe5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IDeviceID;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PipeEnrichedInsertNode extends InsertNode {
+
+  private final InsertNode insertNode;
+
+  public PipeEnrichedInsertNode(InsertNode insertNode) {
+    super(insertNode.getPlanNodeId());
+    this.insertNode = insertNode;
+  }
+
+  public InsertNode getInsertNode() {
+    return insertNode;
+  }
+
+  @Override
+  public boolean isGeneratedByPipe() {
+    return insertNode.isGeneratedByPipe();
+  }
+
+  @Override
+  public void markAsGeneratedByPipe() {
+    insertNode.markAsGeneratedByPipe();
+  }
+
+  @Override
+  public PlanNodeId getPlanNodeId() {
+    return insertNode.getPlanNodeId();
+  }
+
+  @Override
+  public void setPlanNodeId(PlanNodeId id) {
+    insertNode.setPlanNodeId(id);
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return insertNode.getChildren();
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    insertNode.addChild(child);
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new PipeEnrichedInsertNode((InsertNode) insertNode.clone());
+  }
+
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new PipeEnrichedInsertNode(
+        (InsertNode) insertNode.createSubNode(subNodeId, startIndex, 
endIndex));
+  }
+
+  @Override
+  public PlanNode cloneWithChildren(List<PlanNode> children) {
+    return new PipeEnrichedInsertNode((InsertNode) 
insertNode.cloneWithChildren(children));
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return insertNode.allowedChildCount();
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return insertNode.getOutputColumnNames();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitPipeEnrichedInsert(this, context);
+  }
+
+  @Override
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
+    return insertNode.splitByPartition(analysis).stream()
+        .map(
+            plan ->
+                plan instanceof PipeEnrichedInsertNode
+                    ? plan
+                    : new PipeEnrichedInsertNode((InsertNode) plan))
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public TRegionReplicaSet getDataRegionReplicaSet() {
+    return insertNode.getDataRegionReplicaSet();
+  }
+
+  @Override
+  public void setDataRegionReplicaSet(TRegionReplicaSet dataRegionReplicaSet) {
+    insertNode.setDataRegionReplicaSet(dataRegionReplicaSet);
+  }
+
+  @Override
+  public PartialPath getDevicePath() {
+    return insertNode.getDevicePath();
+  }
+
+  @Override
+  public void setDevicePath(PartialPath devicePath) {
+    insertNode.setDevicePath(devicePath);
+  }
+
+  @Override
+  public boolean isAligned() {
+    return insertNode.isAligned();
+  }
+
+  @Override
+  public void setAligned(boolean aligned) {
+    insertNode.setAligned(aligned);
+  }
+
+  @Override
+  public MeasurementSchema[] getMeasurementSchemas() {
+    return insertNode.getMeasurementSchemas();
+  }
+
+  @Override
+  public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) {
+    insertNode.setMeasurementSchemas(measurementSchemas);
+  }
+
+  @Override
+  public String[] getMeasurements() {
+    return insertNode.getMeasurements();
+  }
+
+  @Override
+  public TSDataType[] getDataTypes() {
+    return insertNode.getDataTypes();
+  }
+
+  @Override
+  public TSDataType getDataType(int index) {
+    return insertNode.getDataType(index);
+  }
+
+  @Override
+  public void setDataTypes(TSDataType[] dataTypes) {
+    insertNode.setDataTypes(dataTypes);
+  }
+
+  @Override
+  public IDeviceID getDeviceID() {
+    return insertNode.getDeviceID();
+  }
+
+  @Override
+  public void setDeviceID(IDeviceID deviceID) {
+    insertNode.setDeviceID(deviceID);
+  }
+
+  @Override
+  public long getSearchIndex() {
+    return insertNode.getSearchIndex();
+  }
+
+  @Override
+  public void setSearchIndex(long searchIndex) {
+    insertNode.setSearchIndex(searchIndex);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.PIPE_ENRICHED_INSERT.serialize(byteBuffer);
+    insertNode.serialize(byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.PIPE_ENRICHED_INSERT.serialize(stream);
+    insertNode.serialize(stream);
+  }
+
+  public static PlanNode deserialize(ByteBuffer buffer) {
+    return new PipeEnrichedInsertNode((InsertNode) 
PlanNodeType.deserialize(buffer));
+  }
+
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return insertNode.getRegionReplicaSet();
+  }
+
+  @Override
+  public long getMinTime() {
+    return insertNode.getMinTime();
+  }
+
+  @Override
+  public boolean isSyncFromLeaderWhenUsingIoTConsensus() {
+    return insertNode.isSyncFromLeaderWhenUsingIoTConsensus();
+  }
+
+  @Override
+  public void markFailedMeasurement(int index) {
+    insertNode.markFailedMeasurement(index);
+  }
+
+  @Override
+  public boolean hasValidMeasurements() {
+    return insertNode.hasValidMeasurements();
+  }
+
+  @Override
+  public void setFailedMeasurementNumber(int failedMeasurementNumber) {
+    insertNode.setFailedMeasurementNumber(failedMeasurementNumber);
+  }
+
+  @Override
+  public int getFailedMeasurementNumber() {
+    return insertNode.getFailedMeasurementNumber();
+  }
+
+  @Override
+  public void setProgressIndex(ProgressIndex progressIndex) {
+    insertNode.setProgressIndex(progressIndex);
+  }
+
+  @Override
+  public ProgressIndex getProgressIndex() {
+    return insertNode.getProgressIndex();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return o instanceof PipeEnrichedInsertNode
+        && insertNode.equals(((PipeEnrichedInsertNode) o).insertNode);
+  }
+
+  @Override
+  public int hashCode() {
+    return insertNode.hashCode();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 075c6e1433a..620920a01a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -68,16 +68,19 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       internalServiceClientManager;
   private final ExecutorService executor;
+  private final boolean isGeneratedByPipe;
 
   private static final String NODE_CONNECTION_ERROR = "can't connect to node 
{}";
 
   public LoadTsFileDispatcherImpl(
-      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
internalServiceClientManager) {
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
internalServiceClientManager,
+      boolean isGeneratedByPipe) {
     this.internalServiceClientManager = internalServiceClientManager;
     this.localhostIpAddr = 
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
     this.localhostInternalPort = 
IoTDBDescriptor.getInstance().getConfig().getInternalPort();
     this.executor =
         
IoTDBThreadPoolFactory.newCachedThreadPool(LoadTsFileDispatcherImpl.class.getName());
+    this.isGeneratedByPipe = isGeneratedByPipe;
   }
 
   public void setUuid(String uuid) {
@@ -178,7 +181,8 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
         StorageEngine.getInstance()
             .executeLoadCommand(
                 
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
-                loadCommandReq.uuid);
+                loadCommandReq.uuid,
+                loadCommandReq.isSetIsGeneratedByPipe() && 
loadCommandReq.isGeneratedByPipe);
     if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
       throw new FragmentInstanceDispatchException(resultStatus);
     }
@@ -211,7 +215,8 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
             .getDataRegion((DataRegionId) groupId)
             .loadNewTsFile(
                 ((LoadSingleTsFileNode) planNode).getTsFileResource(),
-                ((LoadSingleTsFileNode) planNode).isDeleteAfterLoad());
+                ((LoadSingleTsFileNode) planNode).isDeleteAfterLoad(),
+                isGeneratedByPipe);
       } catch (LoadFileException e) {
         logger.warn(String.format("Load TsFile Node %s error.", planNode), e);
         TSStatus resultStatus = new TSStatus();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 9127eead0ad..834829c3849 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -102,22 +102,24 @@ public class LoadTsFileScheduler implements IScheduler {
   private final DataPartitionBatchFetcher partitionFetcher;
   private final List<LoadSingleTsFileNode> tsFileNodeList;
   private final PlanFragmentId fragmentId;
-
-  private Set<TRegionReplicaSet> allReplicaSets;
+  private final Set<TRegionReplicaSet> allReplicaSets;
+  private final boolean isGeneratedByPipe;
 
   public LoadTsFileScheduler(
       DistributedQueryPlan distributedQueryPlan,
       MPPQueryContext queryContext,
       QueryStateMachine stateMachine,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
internalServiceClientManager,
-      IPartitionFetcher partitionFetcher) {
+      IPartitionFetcher partitionFetcher,
+      boolean isGeneratedByPipe) {
     this.queryContext = queryContext;
     this.stateMachine = stateMachine;
     this.tsFileNodeList = new ArrayList<>();
     this.fragmentId = 
distributedQueryPlan.getRootSubPlan().getPlanFragment().getId();
-    this.dispatcher = new 
LoadTsFileDispatcherImpl(internalServiceClientManager);
+    this.dispatcher = new 
LoadTsFileDispatcherImpl(internalServiceClientManager, isGeneratedByPipe);
     this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
     this.allReplicaSets = new HashSet<>();
+    this.isGeneratedByPipe = isGeneratedByPipe;
 
     for (FragmentInstance fragmentInstance : 
distributedQueryPlan.getInstances()) {
       tsFileNodeList.add((LoadSingleTsFileNode) 
fragmentInstance.getFragment().getPlanNodeTree());
@@ -279,6 +281,7 @@ public class LoadTsFileScheduler implements IScheduler {
     TLoadCommandReq loadCommandReq =
         new TLoadCommandReq(
             (isFirstPhaseSuccess ? LoadCommand.EXECUTE : 
LoadCommand.ROLLBACK).ordinal(), uuid);
+    loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
     Future<FragInstanceDispatchResult> dispatchResultFuture =
         dispatcher.dispatchCommand(loadCommandReq, allReplicaSets);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
index b8089c96e22..b818f763e7e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
@@ -64,6 +64,7 @@ public enum StatementType {
   BATCH_INSERT_ROWS,
   BATCH_INSERT_ONE_DEVICE,
   MULTI_BATCH_INSERT,
+  PIPE_ENRICHED_INSERT,
 
   DELETE,
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
index 0fc4bd25d98..17bee5aa40b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
@@ -27,6 +27,8 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -78,6 +80,11 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.CreateModel
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.DropModelStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.ShowModelsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.model.ShowTrailsStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.AlterSchemaTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
@@ -105,11 +112,6 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.sys.MergeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.SetSystemStatusStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.CreatePipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.DropPipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.ShowPipesStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StartPipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StopPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetSpaceQuotaStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetThrottleQuotaStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowSpaceQuotaStatement;
@@ -302,6 +304,34 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(loadTsFileStatement, context);
   }
 
+  public R visitPipeEnrichedLoadFile(
+      PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, C 
context) {
+    return visitStatement(pipeEnrichedLoadTsFileStatement, context);
+  }
+
+  public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
+    return visitStatement(insertRowStatement, context);
+  }
+
+  public R visitInsertRows(InsertRowsStatement insertRowsStatement, C context) 
{
+    return visitStatement(insertRowsStatement, context);
+  }
+
+  public R visitInsertMultiTablets(
+      InsertMultiTabletsStatement insertMultiTabletsStatement, C context) {
+    return visitStatement(insertMultiTabletsStatement, context);
+  }
+
+  public R visitInsertRowsOfOneDevice(
+      InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, C 
context) {
+    return visitStatement(insertRowsOfOneDeviceStatement, context);
+  }
+
+  public R visitPipeEnrichedInsert(
+      PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement, C 
context) {
+    return visitStatement(pipeEnrichedInsertBaseStatement, context);
+  }
+
   /** Data Control Language (DCL) */
   public R visitAuthor(AuthorStatement authorStatement, C context) {
     return visitStatement(authorStatement, context);
@@ -339,24 +369,6 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(countStatement, context);
   }
 
-  public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
-    return visitStatement(insertRowStatement, context);
-  }
-
-  public R visitInsertRows(InsertRowsStatement insertRowsStatement, C context) 
{
-    return visitStatement(insertRowsStatement, context);
-  }
-
-  public R visitInsertMultiTablets(
-      InsertMultiTabletsStatement insertMultiTabletsStatement, C context) {
-    return visitStatement(insertMultiTabletsStatement, context);
-  }
-
-  public R visitInsertRowsOfOneDevice(
-      InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, C 
context) {
-    return visitStatement(insertRowsOfOneDeviceStatement, context);
-  }
-
   public R visitSchemaFetch(SchemaFetchStatement schemaFetchStatement, C 
context) {
     return visitStatement(schemaFetchStatement, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index d118884cef6..848d5806f53 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -279,8 +279,7 @@ public abstract class InsertBaseStatement extends Statement 
{
    *
    * @return map from device path to its measurements.
    */
-  protected final Map<PartialPath, List<Pair<String, Integer>>>
-      getMapFromDeviceToMeasurementAndIndex() {
+  protected Map<PartialPath, List<Pair<String, Integer>>> 
getMapFromDeviceToMeasurementAndIndex() {
     boolean[] isLogicalView = new boolean[this.measurements.length];
     int[] indexMapToLogicalViewList = new int[this.measurements.length];
     Arrays.fill(isLogicalView, false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 381d367afe9..a58da41af62 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -69,6 +69,17 @@ public class LoadTsFileStatement extends Statement {
     sortTsFiles(tsFiles);
   }
 
+  protected LoadTsFileStatement() {
+    this.file = null;
+    this.databaseLevel = 
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
+    this.verifySchema = true;
+    this.deleteAfterLoad = true;
+    this.autoCreateDatabase = 
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
+    this.tsFiles = new ArrayList<>();
+    this.resources = new ArrayList<>();
+    this.statementType = StatementType.MULTI_BATCH_INSERT;
+  }
+
   private void findAllTsFile(File file) {
     final File[] files = file.listFiles();
     if (files == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
new file mode 100644
index 00000000000..10834b20ecc
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.crud;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.List;
+import java.util.Map;
+
+public class PipeEnrichedInsertBaseStatement extends InsertBaseStatement {
+
+  private InsertBaseStatement insertBaseStatement;
+
+  public PipeEnrichedInsertBaseStatement(InsertBaseStatement 
insertBaseStatement) {
+    statementType = StatementType.PIPE_ENRICHED_INSERT;
+    this.insertBaseStatement = insertBaseStatement;
+  }
+
+  public InsertBaseStatement getInsertBaseStatement() {
+    return insertBaseStatement;
+  }
+
+  public void setInsertBaseStatement(InsertBaseStatement insertBaseStatement) {
+    this.insertBaseStatement = insertBaseStatement;
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitPipeEnrichedInsert(this, context);
+  }
+
+  @Override
+  public boolean isDebug() {
+    return insertBaseStatement.isDebug();
+  }
+
+  @Override
+  public void setDebug(boolean debug) {
+    insertBaseStatement.setDebug(debug);
+  }
+
+  @Override
+  public boolean isQuery() {
+    return insertBaseStatement.isQuery();
+  }
+
+  @Override
+  public boolean isAuthenticationRequired() {
+    return insertBaseStatement.isAuthenticationRequired();
+  }
+
+  @Override
+  public PartialPath getDevicePath() {
+    return insertBaseStatement.getDevicePath();
+  }
+
+  @Override
+  public void setDevicePath(PartialPath devicePath) {
+    insertBaseStatement.setDevicePath(devicePath);
+  }
+
+  @Override
+  public String[] getMeasurements() {
+    return insertBaseStatement.getMeasurements();
+  }
+
+  @Override
+  public void setMeasurements(String[] measurements) {
+    insertBaseStatement.setMeasurements(measurements);
+  }
+
+  @Override
+  public MeasurementSchema[] getMeasurementSchemas() {
+    return insertBaseStatement.getMeasurementSchemas();
+  }
+
+  @Override
+  public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) {
+    insertBaseStatement.setMeasurementSchemas(measurementSchemas);
+  }
+
+  @Override
+  public boolean isAligned() {
+    return insertBaseStatement.isAligned();
+  }
+
+  @Override
+  public void setAligned(boolean aligned) {
+    insertBaseStatement.setAligned(aligned);
+  }
+
+  @Override
+  public TSDataType[] getDataTypes() {
+    return insertBaseStatement.getDataTypes();
+  }
+
+  @Override
+  public void setDataTypes(TSDataType[] dataTypes) {
+    insertBaseStatement.setDataTypes(dataTypes);
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return insertBaseStatement.getPaths();
+  }
+
+  @Override
+  public void updateAfterSchemaValidation() throws QueryProcessException {
+    insertBaseStatement.updateAfterSchemaValidation();
+  }
+
+  @Override
+  protected void selfCheckDataTypes(int index)
+      throws DataTypeMismatchException, PathNotExistException {
+    insertBaseStatement.selfCheckDataTypes(index);
+  }
+
+  @Override
+  public void markFailedMeasurement(int index, Exception cause) {
+    insertBaseStatement.markFailedMeasurement(index, cause);
+  }
+
+  @Override
+  public boolean hasValidMeasurements() {
+    return insertBaseStatement.hasValidMeasurements();
+  }
+
+  @Override
+  public boolean hasFailedMeasurements() {
+    return insertBaseStatement.hasFailedMeasurements();
+  }
+
+  @Override
+  public int getFailedMeasurementNumber() {
+    return insertBaseStatement.getFailedMeasurementNumber();
+  }
+
+  @Override
+  public List<String> getFailedMeasurements() {
+    return insertBaseStatement.getFailedMeasurements();
+  }
+
+  @Override
+  public List<Exception> getFailedExceptions() {
+    return insertBaseStatement.getFailedExceptions();
+  }
+
+  @Override
+  public List<String> getFailedMessages() {
+    return insertBaseStatement.getFailedMessages();
+  }
+
+  @Override
+  public void setFailedMeasurementIndex2Info(
+      Map<Integer, InsertBaseStatement.FailedMeasurementInfo> 
failedMeasurementIndex2Info) {
+    
insertBaseStatement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info);
+  }
+
+  @Override
+  protected Map<PartialPath, List<Pair<String, Integer>>> 
getMapFromDeviceToMeasurementAndIndex() {
+    return insertBaseStatement.getMapFromDeviceToMeasurementAndIndex();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return insertBaseStatement.isEmpty();
+  }
+
+  @Override
+  public ISchemaValidation getSchemaValidation() {
+    return insertBaseStatement.getSchemaValidation();
+  }
+
+  @Override
+  public List<ISchemaValidation> getSchemaValidationList() {
+    return insertBaseStatement.getSchemaValidationList();
+  }
+
+  @Override
+  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) 
{
+    return insertBaseStatement.checkAndCastDataType(columnIndex, dataType);
+  }
+
+  @Override
+  public long getMinTime() {
+    return insertBaseStatement.getMinTime();
+  }
+
+  @Override
+  public Object getFirstValueOfIndex(int index) {
+    return insertBaseStatement.getFirstValueOfIndex(index);
+  }
+
+  @Override
+  public InsertBaseStatement removeLogicalView() {
+    return new 
PipeEnrichedInsertBaseStatement(insertBaseStatement.removeLogicalView());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
new file mode 100644
index 00000000000..c25749c4838
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.crud;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import java.io.File;
+import java.util.List;
+
+public class PipeEnrichedLoadTsFileStatement extends LoadTsFileStatement {
+
+  private final LoadTsFileStatement loadTsFileStatement;
+
+  public PipeEnrichedLoadTsFileStatement(LoadTsFileStatement 
loadTsFileStatement) {
+    this.loadTsFileStatement = loadTsFileStatement;
+    statementType = StatementType.MULTI_BATCH_INSERT;
+  }
+
+  public LoadTsFileStatement getLoadTsFileStatement() {
+    return loadTsFileStatement;
+  }
+
+  @Override
+  public boolean isDebug() {
+    return loadTsFileStatement.isDebug();
+  }
+
+  @Override
+  public void setDebug(boolean debug) {
+    loadTsFileStatement.setDebug(debug);
+  }
+
+  @Override
+  public boolean isQuery() {
+    return loadTsFileStatement.isQuery();
+  }
+
+  @Override
+  public boolean isAuthenticationRequired() {
+    return loadTsFileStatement.isAuthenticationRequired();
+  }
+
+  @Override
+  public void setDeleteAfterLoad(boolean deleteAfterLoad) {
+    loadTsFileStatement.setDeleteAfterLoad(deleteAfterLoad);
+  }
+
+  @Override
+  public void setDatabaseLevel(int databaseLevel) {
+    loadTsFileStatement.setDatabaseLevel(databaseLevel);
+  }
+
+  @Override
+  public void setVerifySchema(boolean verifySchema) {
+    loadTsFileStatement.setVerifySchema(verifySchema);
+  }
+
+  @Override
+  public void setAutoCreateDatabase(boolean autoCreateDatabase) {
+    loadTsFileStatement.setAutoCreateDatabase(autoCreateDatabase);
+  }
+
+  @Override
+  public boolean isVerifySchema() {
+    return loadTsFileStatement.isVerifySchema();
+  }
+
+  @Override
+  public boolean isDeleteAfterLoad() {
+    return loadTsFileStatement.isDeleteAfterLoad();
+  }
+
+  @Override
+  public boolean isAutoCreateDatabase() {
+    return loadTsFileStatement.isAutoCreateDatabase();
+  }
+
+  @Override
+  public int getDatabaseLevel() {
+    return loadTsFileStatement.getDatabaseLevel();
+  }
+
+  @Override
+  public List<File> getTsFiles() {
+    return loadTsFileStatement.getTsFiles();
+  }
+
+  @Override
+  public void addTsFileResource(TsFileResource resource) {
+    loadTsFileStatement.addTsFileResource(resource);
+  }
+
+  @Override
+  public List<TsFileResource> getResources() {
+    return loadTsFileStatement.getResources();
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return loadTsFileStatement.getPaths();
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitPipeEnrichedLoadFile(this, context);
+  }
+
+  @Override
+  public String toString() {
+    return "PipeEnrichedLoadTsFileStatement{" + "loadTsFileStatement=" + 
loadTsFileStatement + '}';
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/CreatePipeStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
similarity index 97%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/CreatePipeStatement.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
index d58eb871bac..8737969d724 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/CreatePipeStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.plan.statement.sys.pipe;
+package org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/DropPipeStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
similarity index 96%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/DropPipeStatement.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
index 7c8dbbf4358..e5a1717123a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/DropPipeStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.plan.statement.sys.pipe;
+package org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/ShowPipesStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/ShowPipesStatement.java
similarity index 96%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/ShowPipesStatement.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/ShowPipesStatement.java
index 84164e012be..0c7d209516c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/ShowPipesStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/ShowPipesStatement.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.plan.statement.sys.pipe;
+package org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe;
 
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/StartPipeStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StartPipeStatement.java
similarity index 96%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/StartPipeStatement.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StartPipeStatement.java
index b908e9a9b35..54fa1fd97cf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/StartPipeStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StartPipeStatement.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.plan.statement.sys.pipe;
+package org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/StopPipeStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StopPipeStatement.java
similarity index 96%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/StopPipeStatement.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StopPipeStatement.java
index 989bf45bc5f..5dc50ee9eae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/pipe/StopPipeStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StopPipeStatement.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.plan.statement.sys.pipe;
+package org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 9fc9b49c38d..d1b8cd8c1ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -796,13 +796,14 @@ public class StorageEngine implements IService {
     return RpcUtils.SUCCESS_STATUS;
   }
 
-  public TSStatus executeLoadCommand(LoadTsFileScheduler.LoadCommand 
loadCommand, String uuid) {
+  public TSStatus executeLoadCommand(
+      LoadTsFileScheduler.LoadCommand loadCommand, String uuid, boolean 
isGeneratedByPipe) {
     TSStatus status = new TSStatus();
 
     try {
       switch (loadCommand) {
         case EXECUTE:
-          if (getLoadTsFileManager().loadAll(uuid)) {
+          if (getLoadTsFileManager().loadAll(uuid, isGeneratedByPipe)) {
             status = RpcUtils.SUCCESS_STATUS;
           } else {
             status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 0011db21534..95ffb6264a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2197,8 +2197,10 @@ public class DataRegion implements IDataRegionForQuery {
    *
    * @param newTsFileResource tsfile resource @UsedBy load external tsfile 
module
    * @param deleteOriginFile whether to delete origin tsfile
+   * @param isGeneratedByPipe whether the load tsfile request is generated by 
pipe
    */
-  public void loadNewTsFile(TsFileResource newTsFileResource, boolean 
deleteOriginFile)
+  public void loadNewTsFile(
+      TsFileResource newTsFileResource, boolean deleteOriginFile, boolean 
isGeneratedByPipe)
       throws LoadFileException {
     File tsfileToBeInserted = newTsFileResource.getTsFile();
     long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
@@ -2223,7 +2225,8 @@ public class DataRegion implements IDataRegionForQuery {
       loadTsFileToUnSequence(
           tsfileToBeInserted, newTsFileResource, newFilePartitionId, 
deleteOriginFile);
 
-      PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId, 
newTsFileResource);
+      PipeInsertionDataNodeListener.getInstance()
+          .listenToTsFile(dataRegionId, newTsFileResource, isGeneratedByPipe);
 
       FileMetrics.getInstance()
           .addFile(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index dc7086dde70..45e755debb3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -893,7 +893,8 @@ public class TsFileProcessor {
       try {
         PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
         PipeInsertionDataNodeListener.getInstance()
-            .listenToTsFile(dataRegionInfo.getDataRegion().getDataRegionId(), 
tsFileResource);
+            .listenToTsFile(
+                dataRegionInfo.getDataRegion().getDataRegionId(), 
tsFileResource, false);
 
         // When invoke closing TsFile after insert data to memTable, we 
shouldn't flush until invoke
         // flushing memTable in System module.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
index bb07e4457de..70037a99089 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
@@ -85,6 +85,7 @@ public class DataNodeThrottleQuotaManager {
       case BATCH_INSERT_ONE_DEVICE:
       case BATCH_INSERT_ROWS:
       case MULTI_BATCH_INSERT:
+      case PIPE_ENRICHED_INSERT:
         return checkQuota(userName, 1, 0, s);
       case QUERY:
       case GROUP_BY_TIME:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
index db9c12114e8..b6b714b5de2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.rescon.quotas;
 
 import org.apache.iotdb.commons.exception.RpcThrottlingException;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -28,6 +29,8 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.BitMap;
@@ -92,7 +95,11 @@ public class DefaultOperationQuota implements OperationQuota 
{
   protected void updateEstimateConsumeQuota(int numWrites, int numReads, 
Statement s) {
     if (numWrites > 0) {
       long avgSize = 0;
-      switch (s.getType()) {
+      final StatementType statementType =
+          s.getType() == StatementType.PIPE_ENRICHED_INSERT
+              ? ((PipeEnrichedInsertBaseStatement) 
s).getInsertBaseStatement().getType()
+              : s.getType();
+      switch (statementType) {
         case INSERT:
           // InsertStatement  InsertRowStatement
           if (s instanceof InsertStatement) {
@@ -131,7 +138,10 @@ public class DefaultOperationQuota implements 
OperationQuota {
           }
           break;
         case MULTI_BATCH_INSERT:
-          // LoadTsFileStatement  InsertMultiTabletsStatement
+          // PipeEnrichedLoadTsFileStatement  LoadTsFileStatement  
InsertMultiTabletsStatement
+          if (s instanceof PipeEnrichedLoadTsFileStatement) {
+            s = ((PipeEnrichedLoadTsFileStatement) s).getLoadTsFileStatement();
+          }
           if (s instanceof LoadTsFileStatement) {
             LoadTsFileStatement loadTsFileStatement = (LoadTsFileStatement) s;
             for (int i = 0; i < loadTsFileStatement.getResources().size(); 
i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
index 1d91ec43ddf..68f87bcc86f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
@@ -42,6 +42,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import org.apache.iotdb.db.trigger.service.TriggerManagementService;
 import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp;
@@ -249,6 +250,25 @@ public class TriggerFireVisitor extends 
PlanVisitor<TriggerFireResult, TriggerEv
     return hasFailedTrigger ? TriggerFireResult.FAILED_NO_TERMINATION : 
TriggerFireResult.SUCCESS;
   }
 
+  @Override
+  public TriggerFireResult visitPipeEnrichedInsert(
+      PipeEnrichedInsertNode node, TriggerEvent context) {
+    final InsertNode realInsertNode = node.getInsertNode();
+    if (realInsertNode instanceof InsertRowNode) {
+      return visitInsertRow((InsertRowNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertTabletNode) {
+      return visitInsertTablet((InsertTabletNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertRowsNode) {
+      return visitInsertRows((InsertRowsNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertMultiTabletsNode) {
+      return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode, 
context);
+    } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) {
+      return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode) 
realInsertNode, context);
+    } else {
+      return visitPlan(realInsertNode, context);
+    }
+  }
+
   private Map<String, Integer> constructMeasurementToSchemaIndexMap(
       String[] measurements, MeasurementSchema[] schemas) {
     // The index of measurement and schema is the same now.
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
index f6dabc9b81d..1900ff539bb 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
@@ -286,7 +286,8 @@ public class PipeRealtimeExtractTest {
                         null,
                         false),
                     resource);
-            
PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId, 
resource);
+            PipeInsertionDataNodeListener.getInstance()
+                .listenToTsFile(dataRegionId, resource, false);
           }
         });
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/statement/sys/pipe/PipeStatementTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/statement/sys/pipe/PipeStatementTest.java
index f3c982ac5d3..dd35996e37e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/statement/sys/pipe/PipeStatementTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/statement/sys/pipe/PipeStatementTest.java
@@ -21,11 +21,11 @@ package 
org.apache.iotdb.db.queryengine.plan.plan.statement.sys.pipe;
 
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.CreatePipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.DropPipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.ShowPipesStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StartPipeStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.pipe.StopPipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
 
 import org.junit.Assert;
 import org.junit.Test;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
index 741db029ea0..161a95b4ec9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
@@ -61,7 +61,8 @@ public class LoadTsFileSchedulerTest {
                 mock(MPPQueryContext.class),
                 mock(QueryStateMachine.class),
                 mock(IClientManager.class),
-                mock(IPartitionFetcher.class)));
+                mock(IPartitionFetcher.class),
+                false));
     t.start();
     Assert.assertNull(t.getTotalCpuTime());
     Assert.assertNull(t.getFragmentInfo());
diff --git a/iotdb-protocol/thrift/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
index 30e1e0d676f..1268c399c5f 100644
--- a/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
@@ -313,6 +313,7 @@ struct TTsFilePieceReq{
 struct TLoadCommandReq{
     1: required i32 commandType
     2: required string uuid
+    3: optional bool isGeneratedByPipe
 }
 
 struct TLoadResp{

Reply via email to