This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch revert-pipe-mark
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e66ef6d6376a3c01d850def5001d8d8f6396b3d5
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Dec 15 10:08:30 2023 +0800

    Revert "Pipe Schema: Receiver Agent: Added pipe enriched planNode to enable 
pipe request detection to configure "forwarding-pipe-request" (#11672)"
    
    This reverts commit 3e4c6a4df918513cb1c002614da3837090a4dfc8.
---
 .../org/apache/iotdb/db/audit/AuditLogger.java     |   2 +-
 .../dataregion/DataExecutionVisitor.java           |  29 ++-
 .../schemaregion/SchemaExecutionVisitor.java       |  14 --
 .../protocol/writeback/WriteBackConnector.java     |   4 +-
 .../receiver/thrift/IoTDBThriftReceiverV1.java     |  10 +-
 .../execution/executor/RegionWriteExecutor.java    | 276 +++------------------
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  42 +++-
 .../queryengine/plan/execution/QueryExecution.java |  10 +-
 .../plan/planner/LogicalPlanVisitor.java           |  56 +++--
 .../plan/planner/plan/node/PlanNode.java           |  10 -
 .../plan/planner/plan/node/PlanNodeType.java       |  18 +-
 .../plan/planner/plan/node/PlanVisitor.java        |  23 +-
 .../planner/plan/node/load/LoadTsFileNode.java     |   7 +-
 .../node/metedata/write/CreateTimeSeriesNode.java  |   2 +-
 .../metedata/write/view/AlterLogicalViewNode.java  |  43 +++-
 .../node/pipe/PipeEnrichedConfigSchemaNode.java    | 161 ------------
 .../plan/node/pipe/PipeEnrichedDeleteDataNode.java | 169 -------------
 .../node/pipe/PipeEnrichedWriteSchemaNode.java     | 192 --------------
 .../plan/planner/plan/node/write/InsertNode.java   |  10 +
 .../{pipe => write}/PipeEnrichedInsertNode.java    |  19 +-
 .../queryengine/plan/statement/StatementType.java  |   3 +-
 .../plan/statement/StatementVisitor.java           |  13 +-
 .../crud/PipeEnrichedInsertBaseStatement.java      | 220 ++++++++++++++++
 .../crud/PipeEnrichedLoadTsFileStatement.java      | 137 ++++++++++
 .../plan/statement/pipe/PipeEnrichedStatement.java |  72 ------
 .../quotas/DataNodeThrottleQuotaManager.java       |   2 +-
 .../rescon/quotas/DefaultOperationQuota.java       |  16 +-
 .../db/trigger/executor/TriggerFireVisitor.java    |  17 +-
 28 files changed, 595 insertions(+), 982 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
index 4f6cecb32a4..ed503128aee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
@@ -215,7 +215,7 @@ public class AuditLogger {
       case BATCH_INSERT_ROWS:
       case BATCH_INSERT_ONE_DEVICE:
       case MULTI_BATCH_INSERT:
-      case PIPE_ENRICHED:
+      case PIPE_ENRICHED_INSERT:
       case DELETE:
       case SELECT_INTO:
       case LOAD_FILES:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 845532bc96d..53813fb93ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -27,14 +27,14 @@ import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -169,8 +169,23 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
 
   @Override
   public TSStatus visitPipeEnrichedInsert(PipeEnrichedInsertNode node, 
DataRegion context) {
-    node.getInsertNode().markAsGeneratedByPipe();
-    return node.getInsertNode().accept(this, context);
+    final InsertNode realInsertNode = node.getInsertNode();
+
+    realInsertNode.markAsGeneratedByPipe();
+
+    if (realInsertNode instanceof InsertRowNode) {
+      return visitInsertRow((InsertRowNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertTabletNode) {
+      return visitInsertTablet((InsertTabletNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertRowsNode) {
+      return visitInsertRows((InsertRowsNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertMultiTabletsNode) {
+      return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode, 
context);
+    } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) {
+      return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode) 
realInsertNode, context);
+    } else {
+      return visitPlan(realInsertNode, context);
+    }
   }
 
   @Override
@@ -200,10 +215,4 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
       return new TSStatus(TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode());
     }
   }
-
-  @Override
-  public TSStatus visitPipeEnrichedDeleteData(PipeEnrichedDeleteDataNode node, 
DataRegion context) {
-    node.getDeleteDataNode().markAsGeneratedByPipe();
-    return visitDeleteData((DeleteDataNode) node.getDeleteDataNode(), context);
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
index a1496e7b6d7..904863ddcc6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
@@ -50,8 +50,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.vie
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.RollbackLogicalViewBlackListNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedConfigSchemaNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
 import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateAlignedTimeSeriesPlan;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateTimeSeriesPlan;
@@ -522,18 +520,6 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
     }
   }
 
-  @Override
-  public TSStatus visitPipeEnrichedWriteSchema(
-      PipeEnrichedWriteSchemaNode node, ISchemaRegion schemaRegion) {
-    return node.getWriteSchemaNode().accept(this, schemaRegion);
-  }
-
-  @Override
-  public TSStatus visitPipeEnrichedConfigSchema(
-      PipeEnrichedConfigSchemaNode node, ISchemaRegion schemaRegion) {
-    return node.getConfigSchemaNode().accept(this, schemaRegion);
-  }
-
   @Override
   public TSStatus visitPlan(PlanNode node, ISchemaRegion context) {
     return null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
index 9a2883e6935..eb262704c68 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
@@ -36,7 +36,7 @@ import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -159,7 +159,7 @@ public class WriteBackConnector implements PipeConnector {
   private TSStatus executeStatement(InsertBaseStatement statement) {
     return Coordinator.getInstance()
         .execute(
-            new PipeEnrichedStatement(statement),
+            new PipeEnrichedInsertBaseStatement(statement),
             SessionManager.getInstance().requestQueryId(),
             new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault().getId()),
             "",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 128d59af02b..03c58874292 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -48,11 +48,13 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
 import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -554,7 +556,11 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
           TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null 
statement.");
     }
 
-    statement = new PipeEnrichedStatement(statement);
+    if (statement instanceof InsertBaseStatement) {
+      statement = new PipeEnrichedInsertBaseStatement((InsertBaseStatement) 
statement);
+    } else if (statement instanceof LoadTsFileStatement) {
+      statement = new PipeEnrichedLoadTsFileStatement((LoadTsFileStatement) 
statement);
+    }
 
     final ExecutionResult result =
         Coordinator.getInstance()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index 7c6348fdbb4..8c53c63bddb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -52,9 +52,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.Int
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.MeasurementGroup;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -62,6 +59,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import org.apache.iotdb.db.schemaengine.SchemaEngine;
 import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
@@ -106,10 +104,6 @@ public class RegionWriteExecutor {
 
   private final TriggerFireVisitor triggerFireVisitor;
 
-  private final WritePlanNodeExecutionVisitor executionVisitor;
-
-  private final PipeEnrichedWriteSchemaNodeExecutionVisitor 
pipeExecutionVisitor;
-
   public RegionWriteExecutor() {
     dataRegionConsensus = DataRegionConsensusImpl.getInstance();
     schemaRegionConsensus = SchemaRegionConsensusImpl.getInstance();
@@ -117,8 +111,6 @@ public class RegionWriteExecutor {
     schemaEngine = SchemaEngine.getInstance();
     clusterTemplateManager = ClusterTemplateManager.getInstance();
     triggerFireVisitor = new TriggerFireVisitor();
-    executionVisitor = new WritePlanNodeExecutionVisitor();
-    pipeExecutionVisitor = new 
PipeEnrichedWriteSchemaNodeExecutionVisitor(executionVisitor);
   }
 
   @TestOnly
@@ -135,8 +127,6 @@ public class RegionWriteExecutor {
     this.schemaEngine = schemaEngine;
     this.clusterTemplateManager = clusterTemplateManager;
     this.triggerFireVisitor = triggerFireVisitor;
-    executionVisitor = new WritePlanNodeExecutionVisitor();
-    pipeExecutionVisitor = new 
PipeEnrichedWriteSchemaNodeExecutionVisitor(executionVisitor);
   }
 
   @SuppressWarnings("squid:S1181")
@@ -144,6 +134,7 @@ public class RegionWriteExecutor {
     try {
       WritePlanNodeExecutionContext context =
           new WritePlanNodeExecutionContext(groupId, 
regionManager.getRegionLock(groupId));
+      WritePlanNodeExecutionVisitor executionVisitor = new 
WritePlanNodeExecutionVisitor();
       return planNode.accept(executionVisitor, context);
     } catch (Throwable e) {
       LOGGER.error(e.getMessage(), e);
@@ -255,13 +246,13 @@ public class RegionWriteExecutor {
       }
     }
 
-    private TSStatus fireTriggerAndInsert(ConsensusGroupId groupId, InsertNode 
insertNode)
+    private TSStatus fireTriggerAndInsert(ConsensusGroupId groupId, PlanNode 
planNode)
         throws ConsensusException {
       long triggerCostTime = 0;
       TSStatus status;
       long startTime = System.nanoTime();
       // fire Trigger before the insertion
-      TriggerFireResult result = triggerFireVisitor.process(insertNode, 
TriggerEvent.BEFORE_INSERT);
+      TriggerFireResult result = triggerFireVisitor.process(planNode, 
TriggerEvent.BEFORE_INSERT);
       triggerCostTime += (System.nanoTime() - startTime);
       if (result.equals(TriggerFireResult.TERMINATION)) {
         status =
@@ -270,14 +261,14 @@ public class RegionWriteExecutor {
                 "Failed to complete the insertion because trigger error before 
the insertion.");
       } else {
         long startWriteTime = System.nanoTime();
-        status = dataRegionConsensus.write(groupId, insertNode);
+        status = dataRegionConsensus.write(groupId, planNode);
         
PERFORMANCE_OVERVIEW_METRICS.recordScheduleStorageCost(System.nanoTime() - 
startWriteTime);
 
         // fire Trigger after the insertion
         startTime = System.nanoTime();
         boolean hasFailedTriggerBeforeInsertion =
             result.equals(TriggerFireResult.FAILED_NO_TERMINATION);
-        result = triggerFireVisitor.process(insertNode, 
TriggerEvent.AFTER_INSERT);
+        result = triggerFireVisitor.process(planNode, 
TriggerEvent.AFTER_INSERT);
         if (hasFailedTriggerBeforeInsertion || 
!result.equals(TriggerFireResult.SUCCESS)) {
           status =
               RpcUtils.getStatus(
@@ -290,18 +281,6 @@ public class RegionWriteExecutor {
       return status;
     }
 
-    @Override
-    public RegionExecutionResult visitPipeEnrichedDeleteData(
-        PipeEnrichedDeleteDataNode node, WritePlanNodeExecutionContext 
context) {
-      // data deletion should block data insertion, especially when executed 
for deleting timeseries
-      context.getRegionWriteValidationRWLock().writeLock().lock();
-      try {
-        return super.visitPipeEnrichedDeleteData(node, context);
-      } finally {
-        context.getRegionWriteValidationRWLock().writeLock().unlock();
-      }
-    }
-
     @Override
     public RegionExecutionResult visitDeleteData(
         DeleteDataNode node, WritePlanNodeExecutionContext context) {
@@ -317,13 +296,6 @@ public class RegionWriteExecutor {
     @Override
     public RegionExecutionResult visitCreateTimeSeries(
         CreateTimeSeriesNode node, WritePlanNodeExecutionContext context) {
-      return executeCreateTimeSeries(node, context, false);
-    }
-
-    private RegionExecutionResult executeCreateTimeSeries(
-        CreateTimeSeriesNode node,
-        WritePlanNodeExecutionContext context,
-        boolean receivedFromPipe) {
       ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
       RegionExecutionResult result =
@@ -340,9 +312,7 @@ public class RegionWriteExecutor {
                   Collections.singletonList(node.getPath().getMeasurement()),
                   Collections.singletonList(node.getAlias()));
           if (failingMeasurementMap.isEmpty()) {
-            return receivedFromPipe
-                ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
-                : super.visitCreateTimeSeries(node, context);
+            return super.visitCreateTimeSeries(node, context);
           } else {
             MetadataException metadataException = failingMeasurementMap.get(0);
             LOGGER.error(METADATA_ERROR_MSG, metadataException);
@@ -358,22 +328,13 @@ public class RegionWriteExecutor {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
         }
       } else {
-        return receivedFromPipe
-            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
-            : super.visitCreateTimeSeries(node, context);
+        return super.visitCreateTimeSeries(node, context);
       }
     }
 
     @Override
     public RegionExecutionResult visitCreateAlignedTimeSeries(
         CreateAlignedTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
-      return executeCreateAlignedTimeSeries(node, context, false);
-    }
-
-    private RegionExecutionResult executeCreateAlignedTimeSeries(
-        CreateAlignedTimeSeriesNode node,
-        WritePlanNodeExecutionContext context,
-        boolean receivedFromPipe) {
       ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
       RegionExecutionResult result =
@@ -389,9 +350,7 @@ public class RegionWriteExecutor {
               schemaRegion.checkMeasurementExistence(
                   node.getDevicePath(), node.getMeasurements(), 
node.getAliasList());
           if (failingMeasurementMap.isEmpty()) {
-            return receivedFromPipe
-                ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
-                : super.visitCreateAlignedTimeSeries(node, context);
+            return super.visitCreateAlignedTimeSeries(node, context);
           } else {
             MetadataException metadataException = 
failingMeasurementMap.values().iterator().next();
             LOGGER.error(METADATA_ERROR_MSG, metadataException);
@@ -407,22 +366,13 @@ public class RegionWriteExecutor {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
         }
       } else {
-        return receivedFromPipe
-            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
-            : super.visitCreateAlignedTimeSeries(node, context);
+        return super.visitCreateAlignedTimeSeries(node, context);
       }
     }
 
     @Override
     public RegionExecutionResult visitCreateMultiTimeSeries(
         CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) 
{
-      return executeCreateMultiTimeSeries(node, context, false);
-    }
-
-    private RegionExecutionResult executeCreateMultiTimeSeries(
-        CreateMultiTimeSeriesNode node,
-        WritePlanNodeExecutionContext context,
-        boolean receivedFromPipe) {
       ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
       RegionExecutionResult result;
@@ -450,8 +400,7 @@ public class RegionWriteExecutor {
           }
 
           RegionExecutionResult failingResult =
-              registerTimeSeries(
-                  measurementGroupMap, node, context, failingStatus, 
receivedFromPipe);
+              registerTimeSeries(measurementGroupMap, node, context, 
failingStatus);
 
           if (failingResult != null) {
             return failingResult;
@@ -467,9 +416,7 @@ public class RegionWriteExecutor {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
         }
       } else {
-        return receivedFromPipe
-            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
-            : super.visitCreateMultiTimeSeries(node, context);
+        return super.visitCreateMultiTimeSeries(node, context);
       }
     }
 
@@ -508,14 +455,10 @@ public class RegionWriteExecutor {
         Map<PartialPath, MeasurementGroup> measurementGroupMap,
         CreateMultiTimeSeriesNode node,
         WritePlanNodeExecutionContext context,
-        List<TSStatus> failingStatus,
-        boolean receivedFromPipe) {
+        List<TSStatus> failingStatus) {
       if (!measurementGroupMap.isEmpty()) {
         // try registering the rest timeseries
-        RegionExecutionResult executionResult =
-            receivedFromPipe
-                ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
-                : super.visitCreateMultiTimeSeries(node, context);
+        RegionExecutionResult executionResult = 
super.visitCreateMultiTimeSeries(node, context);
         if (failingStatus.isEmpty()) {
           return executionResult;
         }
@@ -533,13 +476,6 @@ public class RegionWriteExecutor {
     @Override
     public RegionExecutionResult visitInternalCreateTimeSeries(
         InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
-      return executeInternalCreateTimeSeries(node, context, false);
-    }
-
-    private RegionExecutionResult executeInternalCreateTimeSeries(
-        InternalCreateTimeSeriesNode node,
-        WritePlanNodeExecutionContext context,
-        boolean receivedFromPipe) {
       ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
       RegionExecutionResult result =
@@ -583,32 +519,20 @@ public class RegionWriteExecutor {
           measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
 
           return processExecutionResultOfInternalCreateSchema(
-              receivedFromPipe
-                  ? super.visitPipeEnrichedWriteSchema(
-                      new PipeEnrichedWriteSchemaNode(node), context)
-                  : super.visitInternalCreateTimeSeries(node, context),
+              super.visitInternalCreateTimeSeries(node, context),
               failingStatus,
               alreadyExistingStatus);
         } finally {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
         }
       } else {
-        return receivedFromPipe
-            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
-            : super.visitInternalCreateTimeSeries(node, context);
+        return super.visitInternalCreateTimeSeries(node, context);
       }
     }
 
     @Override
     public RegionExecutionResult visitInternalCreateMultiTimeSeries(
         InternalCreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
-      return executeInternalCreateMultiTimeSeries(node, context, false);
-    }
-
-    private RegionExecutionResult executeInternalCreateMultiTimeSeries(
-        InternalCreateMultiTimeSeriesNode node,
-        WritePlanNodeExecutionContext context,
-        boolean receivedFromPipe) {
       ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
       RegionExecutionResult result;
@@ -662,19 +586,14 @@ public class RegionWriteExecutor {
           }
 
           return processExecutionResultOfInternalCreateSchema(
-              receivedFromPipe
-                  ? super.visitPipeEnrichedWriteSchema(
-                      new PipeEnrichedWriteSchemaNode(node), context)
-                  : super.visitInternalCreateMultiTimeSeries(node, context),
+              super.visitInternalCreateMultiTimeSeries(node, context),
               failingStatus,
               alreadyExistingStatus);
         } finally {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
         }
       } else {
-        return receivedFromPipe
-            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
-            : super.visitInternalCreateMultiTimeSeries(node, context);
+        return super.visitInternalCreateMultiTimeSeries(node, context);
       }
     }
 
@@ -757,22 +676,17 @@ public class RegionWriteExecutor {
     @Override
     public RegionExecutionResult visitAlterTimeSeries(
         AlterTimeSeriesNode node, WritePlanNodeExecutionContext context) {
-      return executeAlterTimeSeries(node, context, false);
-    }
-
-    private RegionExecutionResult executeAlterTimeSeries(
-        AlterTimeSeriesNode node, WritePlanNodeExecutionContext context, 
boolean receivedFromPipe) {
       ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
       try {
         MeasurementPath measurementPath = 
schemaRegion.fetchMeasurementPath(node.getPath());
-        if (node.isAlterView() && 
!measurementPath.getMeasurementSchema().isLogicalView()) {
-          throw new MetadataException(
-              String.format("%s is not view.", measurementPath.getFullPath()));
+        if (node.isAlterView()) {
+          if (!measurementPath.getMeasurementSchema().isLogicalView()) {
+            throw new MetadataException(
+                String.format("%s is not view.", 
measurementPath.getFullPath()));
+          }
         }
-        return receivedFromPipe
-            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
-            : super.visitAlterTimeSeries(node, context);
+        return super.visitAlterTimeSeries(node, context);
       } catch (MetadataException e) {
         RegionExecutionResult result = new RegionExecutionResult();
         result.setAccepted(true);
@@ -785,13 +699,6 @@ public class RegionWriteExecutor {
     @Override
     public RegionExecutionResult visitActivateTemplate(
         ActivateTemplateNode node, WritePlanNodeExecutionContext context) {
-      return executeActivateTemplate(node, context, false);
-    }
-
-    private RegionExecutionResult executeActivateTemplate(
-        ActivateTemplateNode node,
-        WritePlanNodeExecutionContext context,
-        boolean receivedFromPipe) {
       // activate template operation shall be blocked by unset template check
       context.getRegionWriteValidationRWLock().readLock().lock();
       try {
@@ -815,13 +722,7 @@ public class RegionWriteExecutor {
         RegionExecutionResult result =
             checkQuotaBeforeCreatingTimeSeries(
                 schemaRegion, node.getActivatePath(), 
templateSetInfo.left.getMeasurementNumber());
-        if (result == null) {
-          return receivedFromPipe
-              ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
-              : super.visitActivateTemplate(node, context);
-        } else {
-          return result;
-        }
+        return result == null ? super.visitActivateTemplate(node, context) : 
result;
       } finally {
         context.getRegionWriteValidationRWLock().readLock().unlock();
       }
@@ -830,13 +731,6 @@ public class RegionWriteExecutor {
     @Override
     public RegionExecutionResult visitBatchActivateTemplate(
         BatchActivateTemplateNode node, WritePlanNodeExecutionContext context) 
{
-      return executeBatchActivateTemplate(node, context, false);
-    }
-
-    private RegionExecutionResult executeBatchActivateTemplate(
-        BatchActivateTemplateNode node,
-        WritePlanNodeExecutionContext context,
-        boolean receivedFromPipe) {
       // activate template operation shall be blocked by unset template check
       context.getRegionWriteValidationRWLock().readLock().lock();
       try {
@@ -866,9 +760,7 @@ public class RegionWriteExecutor {
           }
         }
 
-        return receivedFromPipe
-            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
-            : super.visitBatchActivateTemplate(node, context);
+        return super.visitBatchActivateTemplate(node, context);
       } finally {
         context.getRegionWriteValidationRWLock().readLock().unlock();
       }
@@ -877,13 +769,6 @@ public class RegionWriteExecutor {
     @Override
     public RegionExecutionResult visitInternalBatchActivateTemplate(
         InternalBatchActivateTemplateNode node, WritePlanNodeExecutionContext 
context) {
-      return executeInternalBatchActivateTemplate(node, context, false);
-    }
-
-    private RegionExecutionResult executeInternalBatchActivateTemplate(
-        InternalBatchActivateTemplateNode node,
-        WritePlanNodeExecutionContext context,
-        boolean receivedFromPipe) {
       // activate template operation shall be blocked by unset template check
       context.getRegionWriteValidationRWLock().readLock().lock();
       try {
@@ -916,9 +801,7 @@ public class RegionWriteExecutor {
           }
         }
 
-        return receivedFromPipe
-            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
-            : super.visitInternalBatchActivateTemplate(node, context);
+        return super.visitInternalBatchActivateTemplate(node, context);
       } finally {
         context.getRegionWriteValidationRWLock().readLock().unlock();
       }
@@ -927,19 +810,12 @@ public class RegionWriteExecutor {
     @Override
     public RegionExecutionResult visitCreateLogicalView(
         CreateLogicalViewNode node, WritePlanNodeExecutionContext context) {
-      return executeCreateLogicalView(node, context, false);
-    }
-
-    private RegionExecutionResult executeCreateLogicalView(
-        CreateLogicalViewNode node,
-        WritePlanNodeExecutionContext context,
-        boolean receivedFromPipe) {
       ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
       if 
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {
         context.getRegionWriteValidationRWLock().writeLock().lock();
         try {
-          // step 1. make sure all target paths do NOT exist.
+          // step 1. make sure all target paths are NOT exist.
           List<PartialPath> targetPaths = node.getViewPathList();
           List<MetadataException> failingMetadataException = new ArrayList<>();
           for (PartialPath thisPath : targetPaths) {
@@ -949,12 +825,12 @@ public class RegionWriteExecutor {
                     thisPath.getDevicePath(),
                     Collections.singletonList(thisPath.getMeasurement()),
                     null);
-            // merge all exceptions into one map
+            // merge all exception into one map
             for (Map.Entry<Integer, MetadataException> entry : 
failingMeasurementMap.entrySet()) {
               failingMetadataException.add(entry.getValue());
             }
           }
-          // if there are some exceptions, handle each exception and return 
first of them.
+          // if there is some exception, handle each exception and return 
first of them.
           if (!failingMetadataException.isEmpty()) {
             MetadataException metadataException = 
failingMetadataException.get(0);
             LOGGER.error(METADATA_ERROR_MSG, metadataException);
@@ -966,102 +842,16 @@ public class RegionWriteExecutor {
                     metadataException.getErrorCode(), 
metadataException.getMessage()));
             return result;
           }
-          // step 2. make sure all source paths exist.
-          return receivedFromPipe
-              ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
-              : super.visitCreateLogicalView(node, context);
+          // step 2. make sure all source paths are existed.
+          return super.visitCreateLogicalView(node, context);
         } finally {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
         }
       } else {
-        return receivedFromPipe
-            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
-            : super.visitCreateLogicalView(node, context);
+        return super.visitCreateLogicalView(node, context);
       }
       // end of visitCreateLogicalView
     }
-
-    @Override
-    public RegionExecutionResult visitPipeEnrichedWriteSchema(
-        PipeEnrichedWriteSchemaNode node, WritePlanNodeExecutionContext 
context) {
-      return node.getWriteSchemaNode().accept(pipeExecutionVisitor, context);
-    }
-  }
-
-  private class PipeEnrichedWriteSchemaNodeExecutionVisitor
-      extends PlanVisitor<RegionExecutionResult, 
WritePlanNodeExecutionContext> {
-
-    WritePlanNodeExecutionVisitor visitor;
-
-    private 
PipeEnrichedWriteSchemaNodeExecutionVisitor(WritePlanNodeExecutionVisitor 
visitor) {
-      this.visitor = visitor;
-    }
-
-    @Override
-    public RegionExecutionResult visitPlan(PlanNode node, 
WritePlanNodeExecutionContext context) {
-      throw new UnsupportedOperationException(
-          "PipeEnrichedWriteSchemaNodeExecutionVisitor does not support 
visiting general plan.");
-    }
-
-    @Override
-    public RegionExecutionResult visitCreateTimeSeries(
-        CreateTimeSeriesNode node, WritePlanNodeExecutionContext context) {
-      return visitor.executeCreateTimeSeries(node, context, true);
-    }
-
-    @Override
-    public RegionExecutionResult visitCreateAlignedTimeSeries(
-        CreateAlignedTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
-      return visitor.executeCreateAlignedTimeSeries(node, context, true);
-    }
-
-    @Override
-    public RegionExecutionResult visitCreateMultiTimeSeries(
-        CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) 
{
-      return visitor.executeCreateMultiTimeSeries(node, context, true);
-    }
-
-    @Override
-    public RegionExecutionResult visitInternalCreateTimeSeries(
-        InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
-      return visitor.executeInternalCreateTimeSeries(node, context, true);
-    }
-
-    @Override
-    public RegionExecutionResult visitInternalCreateMultiTimeSeries(
-        InternalCreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
-      return visitor.executeInternalCreateMultiTimeSeries(node, context, true);
-    }
-
-    @Override
-    public RegionExecutionResult visitAlterTimeSeries(
-        AlterTimeSeriesNode node, WritePlanNodeExecutionContext context) {
-      return visitor.executeAlterTimeSeries(node, context, true);
-    }
-
-    @Override
-    public RegionExecutionResult visitActivateTemplate(
-        ActivateTemplateNode node, WritePlanNodeExecutionContext context) {
-      return visitor.executeActivateTemplate(node, context, true);
-    }
-
-    @Override
-    public RegionExecutionResult visitBatchActivateTemplate(
-        BatchActivateTemplateNode node, WritePlanNodeExecutionContext context) 
{
-      return visitor.executeBatchActivateTemplate(node, context, true);
-    }
-
-    @Override
-    public RegionExecutionResult visitInternalBatchActivateTemplate(
-        InternalBatchActivateTemplateNode node, WritePlanNodeExecutionContext 
context) {
-      return visitor.executeInternalBatchActivateTemplate(node, context, true);
-    }
-
-    @Override
-    public RegionExecutionResult visitCreateLogicalView(
-        CreateLogicalViewNode node, WritePlanNodeExecutionContext context) {
-      return visitor.executeCreateLogicalView(node, context, true);
-    }
   }
 
   private static class WritePlanNodeExecutionContext {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 6ddf58895ac..89842c3d1af 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -100,6 +100,8 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -131,7 +133,6 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ShowPath
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ShowSchemaTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.ShowLogicalViewStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
@@ -2569,13 +2570,33 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
   }
 
   @Override
-  public Analysis visitPipeEnrichedStatement(
-      PipeEnrichedStatement pipeEnrichedStatement, MPPQueryContext context) {
-    Analysis analysis = pipeEnrichedStatement.getInnerStatement().accept(this, 
context);
+  public Analysis visitPipeEnrichedInsert(
+      PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement, 
MPPQueryContext context) {
+    Analysis analysis;
+
+    final InsertBaseStatement insertBaseStatement =
+        pipeEnrichedInsertBaseStatement.getInsertBaseStatement();
+    if (insertBaseStatement instanceof InsertTabletStatement) {
+      analysis = visitInsertTablet((InsertTabletStatement) 
insertBaseStatement, context);
+    } else if (insertBaseStatement instanceof InsertMultiTabletsStatement) {
+      analysis =
+          visitInsertMultiTablets((InsertMultiTabletsStatement) 
insertBaseStatement, context);
+    } else if (insertBaseStatement instanceof InsertRowStatement) {
+      analysis = visitInsertRow((InsertRowStatement) insertBaseStatement, 
context);
+    } else if (insertBaseStatement instanceof InsertRowsStatement) {
+      analysis = visitInsertRows((InsertRowsStatement) insertBaseStatement, 
context);
+    } else if (insertBaseStatement instanceof InsertRowsOfOneDeviceStatement) {
+      analysis =
+          visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceStatement) 
insertBaseStatement, context);
+    } else {
+      throw new UnsupportedOperationException(
+          "Unsupported insert statement type: " + 
insertBaseStatement.getClass().getName());
+    }
 
     // statement may be changed because of logical view
-    pipeEnrichedStatement.setInnerStatement(analysis.getStatement());
-    analysis.setStatement(pipeEnrichedStatement);
+    pipeEnrichedInsertBaseStatement.setInsertBaseStatement(
+        (InsertBaseStatement) analysis.getStatement());
+    analysis.setStatement(pipeEnrichedInsertBaseStatement);
     return analysis;
   }
 
@@ -2632,6 +2653,15 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         .analyzeFileByFile();
   }
 
+  @Override
+  public Analysis visitPipeEnrichedLoadFile(
+      PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, 
MPPQueryContext context) {
+    final Analysis analysis =
+        
visitLoadFile(pipeEnrichedLoadTsFileStatement.getLoadTsFileStatement(), 
context);
+    analysis.setStatement(pipeEnrichedLoadTsFileStatement);
+    return analysis;
+  }
+
   /** get analysis according to statement and params */
   private Analysis getAnalysisForWriting(
       Analysis analysis, List<DataPartitionQueryParam> 
dataPartitionQueryParams, String userName) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 9af8cfa867b..ce33e169b36 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -65,7 +65,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -318,11 +318,7 @@ public class QueryExecution implements IQueryExecution {
 
   private void schedule() {
     final long startTime = System.nanoTime();
-    boolean isPipeEnrichedTsFileLoad =
-        rawStatement instanceof PipeEnrichedStatement
-            && ((PipeEnrichedStatement) rawStatement).getInnerStatement()
-                instanceof LoadTsFileStatement;
-    if (rawStatement instanceof LoadTsFileStatement || 
isPipeEnrichedTsFileLoad) {
+    if (rawStatement instanceof LoadTsFileStatement) {
       this.scheduler =
           new LoadTsFileScheduler(
               distributedPlan,
@@ -330,7 +326,7 @@ public class QueryExecution implements IQueryExecution {
               stateMachine,
               syncInternalServiceClientManager,
               partitionFetcher,
-              isPipeEnrichedTsFileLoad);
+              rawStatement instanceof PipeEnrichedLoadTsFileStatement);
       this.scheduler.start();
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index e55521ea107..090c2a55a6f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -31,7 +31,6 @@ import 
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
 import 
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
 import 
org.apache.iotdb.db.queryengine.plan.expression.visitor.TransformToViewExpressionVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
@@ -44,9 +43,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.Int
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.MeasurementGroup;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
@@ -55,16 +51,20 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -87,7 +87,6 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchAct
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ShowPathsUsingTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.ShowLogicalViewStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.db.schemaengine.template.Template;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -559,20 +558,37 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
   }
 
   @Override
-  public PlanNode visitPipeEnrichedStatement(
-      PipeEnrichedStatement pipeEnrichedStatement, MPPQueryContext context) {
-    WritePlanNode node =
-        (WritePlanNode) pipeEnrichedStatement.getInnerStatement().accept(this, 
context);
-
-    if (node instanceof LoadTsFileNode) {
-      return node;
-    } else if (node instanceof InsertNode) {
-      return new PipeEnrichedInsertNode((InsertNode) node);
-    } else if (node instanceof DeleteDataNode) {
-      return new PipeEnrichedDeleteDataNode((DeleteDataNode) node);
+  public PlanNode visitPipeEnrichedInsert(
+      PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement, 
MPPQueryContext context) {
+    InsertNode insertNode;
+
+    final InsertBaseStatement insertBaseStatement =
+        pipeEnrichedInsertBaseStatement.getInsertBaseStatement();
+    if (insertBaseStatement instanceof InsertRowStatement) {
+      insertNode =
+          (InsertRowNode) visitInsertRow((InsertRowStatement) 
insertBaseStatement, context);
+    } else if (insertBaseStatement instanceof InsertRowsStatement) {
+      insertNode =
+          (InsertRowsNode) visitInsertRows((InsertRowsStatement) 
insertBaseStatement, context);
+    } else if (insertBaseStatement instanceof InsertRowsOfOneDeviceStatement) {
+      insertNode =
+          (InsertRowsOfOneDeviceNode)
+              visitInsertRowsOfOneDevice(
+                  (InsertRowsOfOneDeviceStatement) insertBaseStatement, 
context);
+    } else if (insertBaseStatement instanceof InsertTabletStatement) {
+      insertNode =
+          (InsertTabletNode)
+              visitInsertTablet((InsertTabletStatement) insertBaseStatement, 
context);
+    } else if (insertBaseStatement instanceof InsertMultiTabletsStatement) {
+      insertNode =
+          (InsertMultiTabletsNode)
+              visitInsertMultiTablets((InsertMultiTabletsStatement) 
insertBaseStatement, context);
+    } else {
+      throw new UnsupportedOperationException(
+          "Unsupported insert statement type: " + 
insertBaseStatement.getClass().getName());
     }
 
-    return new PipeEnrichedWriteSchemaNode(node);
+    return new PipeEnrichedInsertNode(insertNode);
   }
 
   @Override
@@ -581,6 +597,12 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
         context.getQueryId().genPlanNodeId(), 
loadTsFileStatement.getResources());
   }
 
+  @Override
+  public PlanNode visitPipeEnrichedLoadFile(
+      PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, 
MPPQueryContext context) {
+    return 
visitLoadFile(pipeEnrichedLoadTsFileStatement.getLoadTsFileStatement(), 
context);
+  }
+
   @Override
   public PlanNode visitShowTimeSeries(
       ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext 
context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
index 18f678974b7..2b73576ea03 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
@@ -46,8 +46,6 @@ public abstract class PlanNode implements IConsensusRequest {
 
   protected PlanNodeId id;
 
-  protected boolean isGeneratedByPipe = false;
-
   protected PlanNode(PlanNodeId id) {
     requireNonNull(id, "id is null");
     this.id = id;
@@ -61,14 +59,6 @@ public abstract class PlanNode implements IConsensusRequest {
     this.id = id;
   }
 
-  public boolean isGeneratedByPipe() {
-    return isGeneratedByPipe;
-  }
-
-  public void markAsGeneratedByPipe() {
-    isGeneratedByPipe = true;
-  }
-
   public abstract List<PlanNode> getChildren();
 
   public abstract void addChild(PlanNode child);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index a17ea3493fd..ea819b466f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -56,10 +56,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.vie
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.RollbackLogicalViewBlackListNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedConfigSchemaNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
@@ -101,6 +97,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataInputStream;
@@ -192,11 +189,7 @@ public enum PlanNodeType {
 
   LAST_QUERY_TRANSFORM((short) 81),
   TOP_K((short) 82),
-  COLUMN_INJECT((short) 83),
-  PIPE_ENRICHED_DELETE_DATA((short) 84),
-  PIPE_ENRICHED_WRITE_SCHEMA((short) 85),
-  PIPE_ENRICHED_DELETE_SCHEMA((short) 86),
-  ;
+  COLUMN_INJECT((short) 83);
 
   public static final int BYTES = Short.BYTES;
 
@@ -411,13 +404,6 @@ public enum PlanNodeType {
         return TopKNode.deserialize(buffer);
       case 83:
         return ColumnInjectNode.deserialize(buffer);
-      case 84:
-        return PipeEnrichedDeleteDataNode.deserialize(buffer);
-      case 85:
-        return PipeEnrichedWriteSchemaNode.deserialize(buffer);
-      case 86:
-        return PipeEnrichedConfigSchemaNode.deserialize(buffer);
-
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 104242486d7..9dbb3df3b57 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -53,10 +53,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.vie
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.RollbackLogicalViewBlackListNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedConfigSchemaNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
@@ -101,6 +97,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 
 public abstract class PlanVisitor<R, C> {
 
@@ -442,27 +439,11 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
-  public R visitDeleteData(DeleteDataNode node, C context) {
-    return visitPlan(node, context);
-  }
-
-  
/////////////////////////////////////////////////////////////////////////////////////////////////
-  // Pipe Enriched Node
-  
/////////////////////////////////////////////////////////////////////////////////////////////////
-
   public R visitPipeEnrichedInsert(PipeEnrichedInsertNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitPipeEnrichedDeleteData(PipeEnrichedDeleteDataNode node, C 
context) {
-    return visitPlan(node, context);
-  }
-
-  public R visitPipeEnrichedWriteSchema(PipeEnrichedWriteSchemaNode node, C 
context) {
-    return visitPlan(node, context);
-  }
-
-  public R visitPipeEnrichedConfigSchema(PipeEnrichedConfigSchemaNode node, C 
context) {
+  public R visitDeleteData(DeleteDataNode node, C context) {
     return visitPlan(node, context);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index 997f5bd89e6..23a5620a611 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -25,7 +25,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
@@ -89,11 +88,7 @@ public class LoadTsFileNode extends WritePlanNode {
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
     List<WritePlanNode> res = new ArrayList<>();
-    LoadTsFileStatement statement =
-        analysis.getStatement() instanceof PipeEnrichedStatement
-            ? (LoadTsFileStatement)
-                ((PipeEnrichedStatement) 
analysis.getStatement()).getInnerStatement()
-            : (LoadTsFileStatement) analysis.getStatement();
+    LoadTsFileStatement statement = (LoadTsFileStatement) 
analysis.getStatement();
 
     for (int i = 0; i < resources.size(); i++) {
       res.add(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 060acc2f9ca..43b8f9c6510 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -187,7 +187,7 @@ public class CreateTimeSeriesNode extends WritePlanNode 
implements ICreateTimeSe
 
   public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
     String id;
-    PartialPath path;
+    PartialPath path = null;
     TSDataType dataType;
     TSEncoding encoding;
     CompressionType compressor;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/AlterLogicalViewNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/AlterLogicalViewNode.java
index 664575e1bf7..4122ebfb461 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/AlterLogicalViewNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/AlterLogicalViewNode.java
@@ -19,13 +19,16 @@
 
 package 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view;
 
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathDeserializeUtil;
 import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -38,7 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class AlterLogicalViewNode extends PlanNode {
+public class AlterLogicalViewNode extends WritePlanNode {
 
   /**
    * A map from target path to source expression. Yht target path is the name 
of this logical view,
@@ -46,6 +49,12 @@ public class AlterLogicalViewNode extends PlanNode {
    */
   private final Map<PartialPath, ViewExpression> viewPathToSourceMap;
 
+  /**
+   * This variable will be set in function splitByPartition() according to 
analysis. And it will be
+   * set when creating new split nodes.
+   */
+  private final TRegionReplicaSet regionReplicaSet = null;
+
   public AlterLogicalViewNode(PlanNodeId id, Map<PartialPath, ViewExpression> 
viewPathToSourceMap) {
     super(id);
     this.viewPathToSourceMap = viewPathToSourceMap;
@@ -62,6 +71,11 @@ public class AlterLogicalViewNode extends PlanNode {
     return visitor.visitAlterLogicalView(this, context);
   }
 
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return this.regionReplicaSet;
+  }
+
   @Override
   public List<PlanNode> getChildren() {
     return new ArrayList<>();
@@ -146,5 +160,32 @@ public class AlterLogicalViewNode extends PlanNode {
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     return new AlterLogicalViewNode(planNodeId, viewPathToSourceMap);
   }
+
+  @Override
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
+    Map<TRegionReplicaSet, Map<PartialPath, ViewExpression>> splitMap = new 
HashMap<>();
+    for (Map.Entry<PartialPath, ViewExpression> entry : 
this.viewPathToSourceMap.entrySet()) {
+      // for each entry in the map for target path to source expression,
+      // build a map from TRegionReplicaSet to this entry.
+      // Please note that getSchemaRegionReplicaSet needs a device path as 
parameter.
+      TRegionReplicaSet regionReplicaSet =
+          
analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(entry.getKey().getDevice());
+
+      // create a map if the key(regionReplicaSet) is not exists,
+      // then put this entry into this map(from regionReplicaSet to this entry)
+      splitMap
+          .computeIfAbsent(regionReplicaSet, k -> new HashMap<>())
+          .put(entry.getKey(), entry.getValue());
+    }
+
+    // split this node into several nodes according to their regionReplicaSet
+    List<WritePlanNode> result = new ArrayList<>();
+    for (Map.Entry<TRegionReplicaSet, Map<PartialPath, ViewExpression>> entry :
+        splitMap.entrySet()) {
+      // for each entry in splitMap, create a plan node.
+      result.add(new CreateLogicalViewNode(getPlanNodeId(), entry.getValue(), 
entry.getKey()));
+    }
+    return result;
+  }
   // endregion
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedConfigSchemaNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedConfigSchemaNode.java
deleted file mode 100644
index 1ff6cabb7a2..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedConfigSchemaNode.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe;
-
-import 
org.apache.iotdb.db.consensus.statemachine.schemaregion.SchemaExecutionVisitor;
-import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ConstructSchemaBlackListNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeactivateTemplateNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode;
-import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/**
- * This class aims to mark the {@link PlanNode} of schema operation assigned 
by configNode to
- * selectively forward the operations, {@link Statement}s of which extending 
{@link
- * IConfigStatement}. The handling logic is defined only in {@link 
SchemaExecutionVisitor} to
- * execute deletion logic and mark it as received, since the request does not 
need to pass {@link
- * RegionWriteExecutor} and is assigned directly to regions by configNode.
- *
- * <p>
- *
- * <p>Notes: The contents includes all the collected {@link PlanNode}s of 
schema deletion:
- *
- * <p>- Timeseries: {@link DeleteTimeSeriesNode}
- *
- * <p>- Template: {@link DeactivateTemplateNode}
- *
- * <p>- LogicalView: {@link AlterLogicalViewNode}, {@link 
DeleteLogicalViewNode}
- *
- * <p>Intermediate nodes like {@link ConstructSchemaBlackListNode} will not be 
included since they
- * will not be collected by pipe.
- */
-public class PipeEnrichedConfigSchemaNode extends PlanNode {
-
-  private final PlanNode configSchemaNode;
-
-  public PipeEnrichedConfigSchemaNode(PlanNode configSchemaNode) {
-    super(configSchemaNode.getPlanNodeId());
-    this.configSchemaNode = configSchemaNode;
-  }
-
-  public PlanNode getConfigSchemaNode() {
-    return configSchemaNode;
-  }
-
-  @Override
-  public boolean isGeneratedByPipe() {
-    return configSchemaNode.isGeneratedByPipe();
-  }
-
-  @Override
-  public void markAsGeneratedByPipe() {
-    configSchemaNode.markAsGeneratedByPipe();
-  }
-
-  @Override
-  public PlanNodeId getPlanNodeId() {
-    return configSchemaNode.getPlanNodeId();
-  }
-
-  @Override
-  public void setPlanNodeId(PlanNodeId id) {
-    configSchemaNode.setPlanNodeId(id);
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return configSchemaNode.getChildren();
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    configSchemaNode.addChild(child);
-  }
-
-  @Override
-  public PlanNode clone() {
-    return new PipeEnrichedConfigSchemaNode(configSchemaNode.clone());
-  }
-
-  @Override
-  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
-    return new PipeEnrichedConfigSchemaNode(
-        configSchemaNode.createSubNode(subNodeId, startIndex, endIndex));
-  }
-
-  @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return new 
PipeEnrichedConfigSchemaNode(configSchemaNode.cloneWithChildren(children));
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return configSchemaNode.allowedChildCount();
-  }
-
-  @Override
-  public List<String> getOutputColumnNames() {
-    return configSchemaNode.getOutputColumnNames();
-  }
-
-  @Override
-  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-    return visitor.visitPipeEnrichedConfigSchema(this, context);
-  }
-
-  @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {
-    PlanNodeType.PIPE_ENRICHED_DELETE_SCHEMA.serialize(byteBuffer);
-    configSchemaNode.serialize(byteBuffer);
-  }
-
-  @Override
-  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
-    PlanNodeType.PIPE_ENRICHED_DELETE_SCHEMA.serialize(stream);
-    configSchemaNode.serialize(stream);
-  }
-
-  public static PipeEnrichedConfigSchemaNode deserialize(ByteBuffer buffer) {
-    return new PipeEnrichedConfigSchemaNode(PlanNodeType.deserialize(buffer));
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    return o instanceof PipeEnrichedConfigSchemaNode
-        && configSchemaNode.equals(((PipeEnrichedConfigSchemaNode) 
o).configSchemaNode);
-  }
-
-  @Override
-  public int hashCode() {
-    return configSchemaNode.hashCode();
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
deleted file mode 100644
index 29d2e011c58..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe;
-
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import 
org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor;
-import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
-import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * This class aims to mark the {@link DeleteDataNode} to enable selectively 
forwarding pipe
- * deletion. The handling logic is defined in:
- *
- * <p>1.{@link RegionWriteExecutor}, to serialize and reach the target data 
region.
- *
- * <p>2.{@link DataExecutionVisitor}, to actually write data on data region 
and mark it as received
- * from pipe.
- */
-public class PipeEnrichedDeleteDataNode extends DeleteDataNode {
-
-  private final DeleteDataNode deleteDataNode;
-
-  public PipeEnrichedDeleteDataNode(DeleteDataNode deleteDataNode) {
-    super(
-        deleteDataNode.getPlanNodeId(),
-        deleteDataNode.getPathList(),
-        deleteDataNode.getDeleteStartTime(),
-        deleteDataNode.getDeleteEndTime());
-    this.deleteDataNode = deleteDataNode;
-  }
-
-  public PlanNode getDeleteDataNode() {
-    return deleteDataNode;
-  }
-
-  @Override
-  public boolean isGeneratedByPipe() {
-    return deleteDataNode.isGeneratedByPipe();
-  }
-
-  @Override
-  public void markAsGeneratedByPipe() {
-    deleteDataNode.markAsGeneratedByPipe();
-  }
-
-  @Override
-  public PlanNodeId getPlanNodeId() {
-    return deleteDataNode.getPlanNodeId();
-  }
-
-  @Override
-  public void setPlanNodeId(PlanNodeId id) {
-    deleteDataNode.setPlanNodeId(id);
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return deleteDataNode.getChildren();
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    deleteDataNode.addChild(child);
-  }
-
-  @Override
-  public DeleteDataNode clone() {
-    return new PipeEnrichedDeleteDataNode((DeleteDataNode) 
deleteDataNode.clone());
-  }
-
-  @Override
-  public DeleteDataNode createSubNode(int subNodeId, int startIndex, int 
endIndex) {
-    return new PipeEnrichedDeleteDataNode(
-        (DeleteDataNode) deleteDataNode.createSubNode(subNodeId, startIndex, 
endIndex));
-  }
-
-  @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return new PipeEnrichedDeleteDataNode(
-        (DeleteDataNode) deleteDataNode.cloneWithChildren(children));
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return deleteDataNode.allowedChildCount();
-  }
-
-  @Override
-  public List<String> getOutputColumnNames() {
-    return deleteDataNode.getOutputColumnNames();
-  }
-
-  @Override
-  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-    return visitor.visitPipeEnrichedDeleteData(this, context);
-  }
-
-  @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {
-    PlanNodeType.PIPE_ENRICHED_DELETE_DATA.serialize(byteBuffer);
-    deleteDataNode.serialize(byteBuffer);
-  }
-
-  @Override
-  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
-    PlanNodeType.PIPE_ENRICHED_DELETE_DATA.serialize(stream);
-    deleteDataNode.serialize(stream);
-  }
-
-  public static PipeEnrichedDeleteDataNode deserialize(ByteBuffer buffer) {
-    return new PipeEnrichedDeleteDataNode((DeleteDataNode) 
PlanNodeType.deserialize(buffer));
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    return o instanceof PipeEnrichedDeleteDataNode
-        && deleteDataNode.equals(((PipeEnrichedDeleteDataNode) 
o).deleteDataNode);
-  }
-
-  @Override
-  public int hashCode() {
-    return deleteDataNode.hashCode();
-  }
-
-  @Override
-  public TRegionReplicaSet getRegionReplicaSet() {
-    return deleteDataNode.getRegionReplicaSet();
-  }
-
-  @Override
-  public List<WritePlanNode> splitByPartition(Analysis analysis) {
-    return deleteDataNode.splitByPartition(analysis).stream()
-        .map(
-            plan ->
-                plan instanceof PipeEnrichedDeleteDataNode
-                    ? plan
-                    : new PipeEnrichedDeleteDataNode((DeleteDataNode) plan))
-        .collect(Collectors.toList());
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWriteSchemaNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWriteSchemaNode.java
deleted file mode 100644
index 4ac9ffbe69e..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWriteSchemaNode.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe;
-
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import 
org.apache.iotdb.db.consensus.statemachine.schemaregion.SchemaExecutionVisitor;
-import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
-import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.BatchActivateTemplateNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * This class aims to mark the {@link WritePlanNode} of schema writing to 
selectively forward pipe
- * schema operations, {@link Statement}s of which passing through {@link 
SchemaExecutionVisitor}.
- * The handling logic is defined in:
- *
- * <p>1.{@link RegionWriteExecutor}, to serialize and reach the target schema 
region.
- *
- * <p>2.{@link SchemaExecutionVisitor}, to actually write data on schema 
region and mark it as
- * received from pipe.
- *
- * <p>
- *
- * <p>Notes: The content varies in all the {@link WritePlanNode}s of schema 
writing nodes:
- *
- * <p>- Timeseries(Manual): {@link CreateTimeSeriesNode}, {@link 
CreateAlignedTimeSeriesNode},
- * {@link CreateMultiTimeSeriesNode}, {@link AlterTimeSeriesNode}
- *
- * <p>- Timeseries(Auto): {@link InternalCreateTimeSeriesNode}, {@link
- * InternalCreateMultiTimeSeriesNode}
- *
- * <p>- Template(Manual): {@link ActivateTemplateNode}, {@link 
BatchActivateTemplateNode}
- *
- * <p>- Template(Auto): {@link InternalBatchActivateTemplateNode}
- *
- * <p>- LogicalView: {@link CreateLogicalViewNode}
- */
-public class PipeEnrichedWriteSchemaNode extends WritePlanNode {
-
-  private final WritePlanNode writeSchemaNode;
-
-  public PipeEnrichedWriteSchemaNode(WritePlanNode schemaWriteNode) {
-    super(schemaWriteNode.getPlanNodeId());
-    this.writeSchemaNode = schemaWriteNode;
-  }
-
-  public WritePlanNode getWriteSchemaNode() {
-    return writeSchemaNode;
-  }
-
-  @Override
-  public boolean isGeneratedByPipe() {
-    return writeSchemaNode.isGeneratedByPipe();
-  }
-
-  @Override
-  public void markAsGeneratedByPipe() {
-    writeSchemaNode.markAsGeneratedByPipe();
-  }
-
-  @Override
-  public PlanNodeId getPlanNodeId() {
-    return writeSchemaNode.getPlanNodeId();
-  }
-
-  @Override
-  public void setPlanNodeId(PlanNodeId id) {
-    writeSchemaNode.setPlanNodeId(id);
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return writeSchemaNode.getChildren();
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    writeSchemaNode.addChild(child);
-  }
-
-  @Override
-  public WritePlanNode clone() {
-    return new PipeEnrichedWriteSchemaNode((WritePlanNode) 
writeSchemaNode.clone());
-  }
-
-  @Override
-  public WritePlanNode createSubNode(int subNodeId, int startIndex, int 
endIndex) {
-    return new PipeEnrichedWriteSchemaNode(
-        (WritePlanNode) writeSchemaNode.createSubNode(subNodeId, startIndex, 
endIndex));
-  }
-
-  @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return new PipeEnrichedWriteSchemaNode(
-        (WritePlanNode) writeSchemaNode.cloneWithChildren(children));
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return writeSchemaNode.allowedChildCount();
-  }
-
-  @Override
-  public List<String> getOutputColumnNames() {
-    return writeSchemaNode.getOutputColumnNames();
-  }
-
-  @Override
-  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-    return visitor.visitPipeEnrichedWriteSchema(this, context);
-  }
-
-  @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {
-    PlanNodeType.PIPE_ENRICHED_WRITE_SCHEMA.serialize(byteBuffer);
-    writeSchemaNode.serialize(byteBuffer);
-  }
-
-  @Override
-  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
-    PlanNodeType.PIPE_ENRICHED_WRITE_SCHEMA.serialize(stream);
-    writeSchemaNode.serialize(stream);
-  }
-
-  public static PipeEnrichedWriteSchemaNode deserialize(ByteBuffer buffer) {
-    return new PipeEnrichedWriteSchemaNode((WritePlanNode) 
PlanNodeType.deserialize(buffer));
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    return o instanceof PipeEnrichedWriteSchemaNode
-        && writeSchemaNode.equals(((PipeEnrichedWriteSchemaNode) 
o).writeSchemaNode);
-  }
-
-  @Override
-  public int hashCode() {
-    return writeSchemaNode.hashCode();
-  }
-
-  @Override
-  public TRegionReplicaSet getRegionReplicaSet() {
-    return writeSchemaNode.getRegionReplicaSet();
-  }
-
-  @Override
-  public List<WritePlanNode> splitByPartition(Analysis analysis) {
-    return writeSchemaNode.splitByPartition(analysis).stream()
-        .map(
-            plan ->
-                plan instanceof PipeEnrichedWriteSchemaNode
-                    ? plan
-                    : new PipeEnrichedWriteSchemaNode(plan))
-        .collect(Collectors.toList());
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 5f0b83b64fa..0b0fe8da749 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -78,6 +78,8 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
 
   protected ProgressIndex progressIndex;
 
+  protected boolean isGeneratedByPipe = false;
+
   protected InsertNode(PlanNodeId id) {
     super(id);
   }
@@ -174,6 +176,14 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
     throw new NotImplementedException("serializeAttributes of InsertNode is 
not implemented");
   }
 
+  public boolean isGeneratedByPipe() {
+    return isGeneratedByPipe;
+  }
+
+  public void markAsGeneratedByPipe() {
+    isGeneratedByPipe = true;
+  }
+
   // region Serialization methods for WAL
   /** Serialized size of measurement schemas, ignoring failed time series */
   protected int serializeMeasurementSchemasSize() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
similarity index 89%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
index 2f2408f8097..28d217176f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
@@ -17,22 +17,18 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe;
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.path.PartialPath;
-import 
org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor;
-import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IDeviceID;
-import org.apache.iotdb.db.trigger.executor.TriggerFireVisitor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -42,17 +38,6 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.stream.Collectors;
 
-/**
- * This class aims to mark the {@link InsertNode} to prevent forwarding pipe 
insertions. The
- * handling logic is defined in:
- *
- * <p>1.{@link RegionWriteExecutor}, to serialize and reach the target data 
region.
- *
- * <p>2.{@link TriggerFireVisitor}, to fire the trigger before writing to data 
region.
- *
- * <p>3.{@link DataExecutionVisitor}, to actually write data on data region 
and mark it as received
- * from pipe.
- */
 public class PipeEnrichedInsertNode extends InsertNode {
 
   private final InsertNode insertNode;
@@ -230,7 +215,7 @@ public class PipeEnrichedInsertNode extends InsertNode {
     insertNode.serialize(stream);
   }
 
-  public static PipeEnrichedInsertNode deserialize(ByteBuffer buffer) {
+  public static PlanNode deserialize(ByteBuffer buffer) {
     return new PipeEnrichedInsertNode((InsertNode) 
PlanNodeType.deserialize(buffer));
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
index 2b1cf8a68f8..b673703649a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
@@ -64,6 +64,7 @@ public enum StatementType {
   BATCH_INSERT_ROWS,
   BATCH_INSERT_ONE_DEVICE,
   MULTI_BATCH_INSERT,
+  PIPE_ENRICHED_INSERT,
 
   DELETE,
 
@@ -173,6 +174,4 @@ public enum StatementType {
   DELETE_LOGICAL_VIEW,
   RENAME_LOGICAL_VIEW,
   ALTER_LOGICAL_VIEW,
-
-  PIPE_ENRICHED,
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
index c824593bd15..2d34e5affba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
@@ -27,6 +27,8 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -96,7 +98,6 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogica
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.DeleteLogicalViewStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.RenameLogicalViewStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.ShowLogicalViewStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.AuthorStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ClearCacheStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement;
@@ -282,6 +283,11 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(loadTsFileStatement, context);
   }
 
+  public R visitPipeEnrichedLoadFile(
+      PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, C 
context) {
+    return visitStatement(pipeEnrichedLoadTsFileStatement, context);
+  }
+
   public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
     return visitStatement(insertRowStatement, context);
   }
@@ -300,8 +306,9 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(insertRowsOfOneDeviceStatement, context);
   }
 
-  public R visitPipeEnrichedStatement(PipeEnrichedStatement 
pipeEnrichedStatement, C context) {
-    return visitStatement(pipeEnrichedStatement, context);
+  public R visitPipeEnrichedInsert(
+      PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement, C 
context) {
+    return visitStatement(pipeEnrichedInsertBaseStatement, context);
   }
 
   /** Data Control Language (DCL) */
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
new file mode 100644
index 00000000000..4b659b4ab15
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.crud;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class PipeEnrichedInsertBaseStatement extends InsertBaseStatement {
+
+  private InsertBaseStatement insertBaseStatement;
+
+  public PipeEnrichedInsertBaseStatement(InsertBaseStatement 
insertBaseStatement) {
+    statementType = StatementType.PIPE_ENRICHED_INSERT;
+    this.insertBaseStatement = insertBaseStatement;
+  }
+
+  public InsertBaseStatement getInsertBaseStatement() {
+    return insertBaseStatement;
+  }
+
+  public void setInsertBaseStatement(InsertBaseStatement insertBaseStatement) {
+    this.insertBaseStatement = insertBaseStatement;
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitPipeEnrichedInsert(this, context);
+  }
+
+  @Override
+  public boolean isDebug() {
+    return insertBaseStatement.isDebug();
+  }
+
+  @Override
+  public void setDebug(boolean debug) {
+    insertBaseStatement.setDebug(debug);
+  }
+
+  @Override
+  public boolean isQuery() {
+    return !Objects.isNull(insertBaseStatement) && 
insertBaseStatement.isQuery();
+  }
+
+  @Override
+  public PartialPath getDevicePath() {
+    return insertBaseStatement.getDevicePath();
+  }
+
+  @Override
+  public void setDevicePath(PartialPath devicePath) {
+    insertBaseStatement.setDevicePath(devicePath);
+  }
+
+  @Override
+  public String[] getMeasurements() {
+    return insertBaseStatement.getMeasurements();
+  }
+
+  @Override
+  public void setMeasurements(String[] measurements) {
+    insertBaseStatement.setMeasurements(measurements);
+  }
+
+  @Override
+  public MeasurementSchema[] getMeasurementSchemas() {
+    return insertBaseStatement.getMeasurementSchemas();
+  }
+
+  @Override
+  public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) {
+    insertBaseStatement.setMeasurementSchemas(measurementSchemas);
+  }
+
+  @Override
+  public boolean isAligned() {
+    return insertBaseStatement.isAligned();
+  }
+
+  @Override
+  public void setAligned(boolean aligned) {
+    insertBaseStatement.setAligned(aligned);
+  }
+
+  @Override
+  public TSDataType[] getDataTypes() {
+    return insertBaseStatement.getDataTypes();
+  }
+
+  @Override
+  public void setDataTypes(TSDataType[] dataTypes) {
+    insertBaseStatement.setDataTypes(dataTypes);
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return insertBaseStatement.getPaths();
+  }
+
+  @Override
+  public void updateAfterSchemaValidation() throws QueryProcessException {
+    insertBaseStatement.updateAfterSchemaValidation();
+  }
+
+  @Override
+  protected void selfCheckDataTypes(int index)
+      throws DataTypeMismatchException, PathNotExistException {
+    insertBaseStatement.selfCheckDataTypes(index);
+  }
+
+  @Override
+  public void markFailedMeasurement(int index, Exception cause) {
+    insertBaseStatement.markFailedMeasurement(index, cause);
+  }
+
+  @Override
+  public boolean hasValidMeasurements() {
+    return insertBaseStatement.hasValidMeasurements();
+  }
+
+  @Override
+  public boolean hasFailedMeasurements() {
+    return insertBaseStatement.hasFailedMeasurements();
+  }
+
+  @Override
+  public int getFailedMeasurementNumber() {
+    return insertBaseStatement.getFailedMeasurementNumber();
+  }
+
+  @Override
+  public List<String> getFailedMeasurements() {
+    return insertBaseStatement.getFailedMeasurements();
+  }
+
+  @Override
+  public List<Exception> getFailedExceptions() {
+    return insertBaseStatement.getFailedExceptions();
+  }
+
+  @Override
+  public List<String> getFailedMessages() {
+    return insertBaseStatement.getFailedMessages();
+  }
+
+  @Override
+  public void setFailedMeasurementIndex2Info(
+      Map<Integer, InsertBaseStatement.FailedMeasurementInfo> 
failedMeasurementIndex2Info) {
+    
insertBaseStatement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info);
+  }
+
+  @Override
+  protected Map<PartialPath, List<Pair<String, Integer>>> 
getMapFromDeviceToMeasurementAndIndex() {
+    return insertBaseStatement.getMapFromDeviceToMeasurementAndIndex();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return insertBaseStatement.isEmpty();
+  }
+
+  @Override
+  public ISchemaValidation getSchemaValidation() {
+    return insertBaseStatement.getSchemaValidation();
+  }
+
+  @Override
+  public List<ISchemaValidation> getSchemaValidationList() {
+    return insertBaseStatement.getSchemaValidationList();
+  }
+
+  @Override
+  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) 
{
+    return insertBaseStatement.checkAndCastDataType(columnIndex, dataType);
+  }
+
+  @Override
+  public long getMinTime() {
+    return insertBaseStatement.getMinTime();
+  }
+
+  @Override
+  public Object getFirstValueOfIndex(int index) {
+    return insertBaseStatement.getFirstValueOfIndex(index);
+  }
+
+  @Override
+  public InsertBaseStatement removeLogicalView() {
+    return new 
PipeEnrichedInsertBaseStatement(insertBaseStatement.removeLogicalView());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
new file mode 100644
index 00000000000..c2d9f349052
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.crud;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import java.io.File;
+import java.util.List;
+
+public class PipeEnrichedLoadTsFileStatement extends LoadTsFileStatement {
+
+  private final LoadTsFileStatement loadTsFileStatement;
+
+  public PipeEnrichedLoadTsFileStatement(LoadTsFileStatement 
loadTsFileStatement) {
+    this.loadTsFileStatement = loadTsFileStatement;
+    statementType = StatementType.MULTI_BATCH_INSERT;
+  }
+
+  public LoadTsFileStatement getLoadTsFileStatement() {
+    return loadTsFileStatement;
+  }
+
+  @Override
+  public boolean isDebug() {
+    return loadTsFileStatement.isDebug();
+  }
+
+  @Override
+  public void setDebug(boolean debug) {
+    loadTsFileStatement.setDebug(debug);
+  }
+
+  @Override
+  public boolean isQuery() {
+    return loadTsFileStatement.isQuery();
+  }
+
+  @Override
+  public void setDeleteAfterLoad(boolean deleteAfterLoad) {
+    loadTsFileStatement.setDeleteAfterLoad(deleteAfterLoad);
+  }
+
+  @Override
+  public void setDatabaseLevel(int databaseLevel) {
+    loadTsFileStatement.setDatabaseLevel(databaseLevel);
+  }
+
+  @Override
+  public void setVerifySchema(boolean verifySchema) {
+    loadTsFileStatement.setVerifySchema(verifySchema);
+  }
+
+  @Override
+  public void setAutoCreateDatabase(boolean autoCreateDatabase) {
+    loadTsFileStatement.setAutoCreateDatabase(autoCreateDatabase);
+  }
+
+  @Override
+  public boolean isVerifySchema() {
+    return loadTsFileStatement.isVerifySchema();
+  }
+
+  @Override
+  public boolean isDeleteAfterLoad() {
+    return loadTsFileStatement.isDeleteAfterLoad();
+  }
+
+  @Override
+  public boolean isAutoCreateDatabase() {
+    return loadTsFileStatement.isAutoCreateDatabase();
+  }
+
+  @Override
+  public int getDatabaseLevel() {
+    return loadTsFileStatement.getDatabaseLevel();
+  }
+
+  @Override
+  public List<File> getTsFiles() {
+    return loadTsFileStatement.getTsFiles();
+  }
+
+  @Override
+  public void addTsFileResource(TsFileResource resource) {
+    loadTsFileStatement.addTsFileResource(resource);
+  }
+
+  @Override
+  public List<TsFileResource> getResources() {
+    return loadTsFileStatement.getResources();
+  }
+
+  @Override
+  public void addWritePointCount(long writePointCount) {
+    loadTsFileStatement.addWritePointCount(writePointCount);
+  }
+
+  @Override
+  public long getWritePointCount(int resourceIndex) {
+    return loadTsFileStatement.getWritePointCount(resourceIndex);
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return loadTsFileStatement.getPaths();
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitPipeEnrichedLoadFile(this, context);
+  }
+
+  @Override
+  public String toString() {
+    return "PipeEnrichedLoadTsFileStatement{" + "loadTsFileStatement=" + 
loadTsFileStatement + '}';
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
deleted file mode 100644
index 1f2080be372..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.plan.statement.pipe;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
-import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
-import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-
-public class PipeEnrichedStatement extends Statement {
-
-  private Statement innerStatement;
-
-  public PipeEnrichedStatement(Statement innerStatement) {
-    statementType = StatementType.PIPE_ENRICHED;
-    this.innerStatement = innerStatement;
-  }
-
-  public Statement getInnerStatement() {
-    return innerStatement;
-  }
-
-  public void setInnerStatement(Statement innerStatement) {
-    this.innerStatement = innerStatement;
-  }
-
-  @Override
-  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
-    return visitor.visitPipeEnrichedStatement(this, context);
-  }
-
-  @Override
-  public boolean isDebug() {
-    return innerStatement.isDebug();
-  }
-
-  @Override
-  public void setDebug(boolean debug) {
-    innerStatement.setDebug(debug);
-  }
-
-  @Override
-  public boolean isQuery() {
-    return !Objects.isNull(innerStatement) && innerStatement.isQuery();
-  }
-
-  @Override
-  public List<PartialPath> getPaths() {
-    return Collections.emptyList();
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
index 8c85795f1c9..70037a99089 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
@@ -85,7 +85,7 @@ public class DataNodeThrottleQuotaManager {
       case BATCH_INSERT_ONE_DEVICE:
       case BATCH_INSERT_ROWS:
       case MULTI_BATCH_INSERT:
-      case PIPE_ENRICHED:
+      case PIPE_ENRICHED_INSERT:
         return checkQuota(userName, 1, 0, s);
       case QUERY:
       case GROUP_BY_TIME:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
index 5d5882430d5..b6b714b5de2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
@@ -29,7 +29,8 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.BitMap;
@@ -94,10 +95,10 @@ public class DefaultOperationQuota implements 
OperationQuota {
   protected void updateEstimateConsumeQuota(int numWrites, int numReads, 
Statement s) {
     if (numWrites > 0) {
       long avgSize = 0;
-      if (s.getType() == StatementType.PIPE_ENRICHED) {
-        s = ((PipeEnrichedStatement) s).getInnerStatement();
-      }
-      final StatementType statementType = s.getType();
+      final StatementType statementType =
+          s.getType() == StatementType.PIPE_ENRICHED_INSERT
+              ? ((PipeEnrichedInsertBaseStatement) 
s).getInsertBaseStatement().getType()
+              : s.getType();
       switch (statementType) {
         case INSERT:
           // InsertStatement  InsertRowStatement
@@ -137,7 +138,10 @@ public class DefaultOperationQuota implements 
OperationQuota {
           }
           break;
         case MULTI_BATCH_INSERT:
-          // LoadTsFileStatement  InsertMultiTabletsStatement
+          // PipeEnrichedLoadTsFileStatement  LoadTsFileStatement  
InsertMultiTabletsStatement
+          if (s instanceof PipeEnrichedLoadTsFileStatement) {
+            s = ((PipeEnrichedLoadTsFileStatement) s).getLoadTsFileStatement();
+          }
           if (s instanceof LoadTsFileStatement) {
             LoadTsFileStatement loadTsFileStatement = (LoadTsFileStatement) s;
             for (int i = 0; i < loadTsFileStatement.getResources().size(); 
i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
index e791faad552..68f87bcc86f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
@@ -36,13 +36,13 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import org.apache.iotdb.db.trigger.service.TriggerManagementService;
 import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp;
@@ -253,7 +253,20 @@ public class TriggerFireVisitor extends 
PlanVisitor<TriggerFireResult, TriggerEv
   @Override
   public TriggerFireResult visitPipeEnrichedInsert(
       PipeEnrichedInsertNode node, TriggerEvent context) {
-    return node.getInsertNode().accept(this, context);
+    final InsertNode realInsertNode = node.getInsertNode();
+    if (realInsertNode instanceof InsertRowNode) {
+      return visitInsertRow((InsertRowNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertTabletNode) {
+      return visitInsertTablet((InsertTabletNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertRowsNode) {
+      return visitInsertRows((InsertRowsNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertMultiTabletsNode) {
+      return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode, 
context);
+    } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) {
+      return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode) 
realInsertNode, context);
+    } else {
+      return visitPlan(realInsertNode, context);
+    }
   }
 
   private Map<String, Integer> constructMeasurementToSchemaIndexMap(

Reply via email to