This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e4c6a4df91 Pipe Schema: Receiver Agent: Added pipe enriched planNode 
to enable pipe request detection to configure "forwarding-pipe-request" (#11672)
3e4c6a4df91 is described below

commit 3e4c6a4df918513cb1c002614da3837090a4dfc8
Author: Caideyipi <[email protected]>
AuthorDate: Thu Dec 14 21:14:58 2023 +0800

    Pipe Schema: Receiver Agent: Added pipe enriched planNode to enable pipe 
request detection to configure "forwarding-pipe-request" (#11672)
    
    * Squashed the pipe enriched statements into a single one.
    * Expanded the pipe enriched nodes to Insert, Delete, WriteSchema, 
ConfigSchema (detailed content in the notes)
    * Simplified the logic to execute the enriched nodes.
    * Changed AlterLogicalViewNode from WritePlanNode to PlanNode since it's 
directly assigned to regions by configNode.
    * Fixed DefaultOperationQuota to avoid potential ClassCastException.
---
 .../org/apache/iotdb/db/audit/AuditLogger.java     |   2 +-
 .../dataregion/DataExecutionVisitor.java           |  29 +--
 .../schemaregion/SchemaExecutionVisitor.java       |  14 ++
 .../protocol/writeback/WriteBackConnector.java     |   4 +-
 .../receiver/thrift/IoTDBThriftReceiverV1.java     |  10 +-
 .../execution/executor/RegionWriteExecutor.java    | 276 ++++++++++++++++++---
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  42 +---
 .../queryengine/plan/execution/QueryExecution.java |  10 +-
 .../plan/planner/LogicalPlanVisitor.java           |  56 ++---
 .../plan/planner/plan/node/PlanNode.java           |  10 +
 .../plan/planner/plan/node/PlanNodeType.java       |  18 +-
 .../plan/planner/plan/node/PlanVisitor.java        |  23 +-
 .../planner/plan/node/load/LoadTsFileNode.java     |   7 +-
 .../node/metedata/write/CreateTimeSeriesNode.java  |   2 +-
 .../metedata/write/view/AlterLogicalViewNode.java  |  43 +---
 .../node/pipe/PipeEnrichedConfigSchemaNode.java    | 161 ++++++++++++
 .../plan/node/pipe/PipeEnrichedDeleteDataNode.java | 169 +++++++++++++
 .../{write => pipe}/PipeEnrichedInsertNode.java    |  19 +-
 .../node/pipe/PipeEnrichedWriteSchemaNode.java     | 192 ++++++++++++++
 .../plan/planner/plan/node/write/InsertNode.java   |  10 -
 .../queryengine/plan/statement/StatementType.java  |   3 +-
 .../plan/statement/StatementVisitor.java           |  13 +-
 .../crud/PipeEnrichedInsertBaseStatement.java      | 220 ----------------
 .../crud/PipeEnrichedLoadTsFileStatement.java      | 137 ----------
 .../plan/statement/pipe/PipeEnrichedStatement.java |  72 ++++++
 .../quotas/DataNodeThrottleQuotaManager.java       |   2 +-
 .../rescon/quotas/DefaultOperationQuota.java       |  16 +-
 .../db/trigger/executor/TriggerFireVisitor.java    |  17 +-
 28 files changed, 982 insertions(+), 595 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
index ed503128aee..4f6cecb32a4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
@@ -215,7 +215,7 @@ public class AuditLogger {
       case BATCH_INSERT_ROWS:
       case BATCH_INSERT_ONE_DEVICE:
       case MULTI_BATCH_INSERT:
-      case PIPE_ENRICHED_INSERT:
+      case PIPE_ENRICHED:
       case DELETE:
       case SELECT_INTO:
       case LOAD_FILES:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 53813fb93ea..845532bc96d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -27,14 +27,14 @@ import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -169,23 +169,8 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
 
   @Override
   public TSStatus visitPipeEnrichedInsert(PipeEnrichedInsertNode node, 
DataRegion context) {
-    final InsertNode realInsertNode = node.getInsertNode();
-
-    realInsertNode.markAsGeneratedByPipe();
-
-    if (realInsertNode instanceof InsertRowNode) {
-      return visitInsertRow((InsertRowNode) realInsertNode, context);
-    } else if (realInsertNode instanceof InsertTabletNode) {
-      return visitInsertTablet((InsertTabletNode) realInsertNode, context);
-    } else if (realInsertNode instanceof InsertRowsNode) {
-      return visitInsertRows((InsertRowsNode) realInsertNode, context);
-    } else if (realInsertNode instanceof InsertMultiTabletsNode) {
-      return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode, 
context);
-    } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) {
-      return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode) 
realInsertNode, context);
-    } else {
-      return visitPlan(realInsertNode, context);
-    }
+    node.getInsertNode().markAsGeneratedByPipe();
+    return node.getInsertNode().accept(this, context);
   }
 
   @Override
@@ -215,4 +200,10 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
       return new TSStatus(TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode());
     }
   }
+
+  @Override
+  public TSStatus visitPipeEnrichedDeleteData(PipeEnrichedDeleteDataNode node, 
DataRegion context) {
+    node.getDeleteDataNode().markAsGeneratedByPipe();
+    return visitDeleteData((DeleteDataNode) node.getDeleteDataNode(), context);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
index 904863ddcc6..a1496e7b6d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
@@ -50,6 +50,8 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.vie
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.RollbackLogicalViewBlackListNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedConfigSchemaNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
 import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateAlignedTimeSeriesPlan;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateTimeSeriesPlan;
@@ -520,6 +522,18 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
     }
   }
 
+  @Override
+  public TSStatus visitPipeEnrichedWriteSchema(
+      PipeEnrichedWriteSchemaNode node, ISchemaRegion schemaRegion) {
+    return node.getWriteSchemaNode().accept(this, schemaRegion);
+  }
+
+  @Override
+  public TSStatus visitPipeEnrichedConfigSchema(
+      PipeEnrichedConfigSchemaNode node, ISchemaRegion schemaRegion) {
+    return node.getConfigSchemaNode().accept(this, schemaRegion);
+  }
+
   @Override
   public TSStatus visitPlan(PlanNode node, ISchemaRegion context) {
     return null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
index eb262704c68..9a2883e6935 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
@@ -36,7 +36,7 @@ import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -159,7 +159,7 @@ public class WriteBackConnector implements PipeConnector {
   private TSStatus executeStatement(InsertBaseStatement statement) {
     return Coordinator.getInstance()
         .execute(
-            new PipeEnrichedInsertBaseStatement(statement),
+            new PipeEnrichedStatement(statement),
             SessionManager.getInstance().requestQueryId(),
             new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault().getId()),
             "",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 03c58874292..128d59af02b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -48,13 +48,11 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
 import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -556,11 +554,7 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
           TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null 
statement.");
     }
 
-    if (statement instanceof InsertBaseStatement) {
-      statement = new PipeEnrichedInsertBaseStatement((InsertBaseStatement) 
statement);
-    } else if (statement instanceof LoadTsFileStatement) {
-      statement = new PipeEnrichedLoadTsFileStatement((LoadTsFileStatement) 
statement);
-    }
+    statement = new PipeEnrichedStatement(statement);
 
     final ExecutionResult result =
         Coordinator.getInstance()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index 8c53c63bddb..7c6348fdbb4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -52,6 +52,9 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.Int
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.MeasurementGroup;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -59,7 +62,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import org.apache.iotdb.db.schemaengine.SchemaEngine;
 import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
@@ -104,6 +106,10 @@ public class RegionWriteExecutor {
 
   private final TriggerFireVisitor triggerFireVisitor;
 
+  private final WritePlanNodeExecutionVisitor executionVisitor;
+
+  private final PipeEnrichedWriteSchemaNodeExecutionVisitor 
pipeExecutionVisitor;
+
   public RegionWriteExecutor() {
     dataRegionConsensus = DataRegionConsensusImpl.getInstance();
     schemaRegionConsensus = SchemaRegionConsensusImpl.getInstance();
@@ -111,6 +117,8 @@ public class RegionWriteExecutor {
     schemaEngine = SchemaEngine.getInstance();
     clusterTemplateManager = ClusterTemplateManager.getInstance();
     triggerFireVisitor = new TriggerFireVisitor();
+    executionVisitor = new WritePlanNodeExecutionVisitor();
+    pipeExecutionVisitor = new 
PipeEnrichedWriteSchemaNodeExecutionVisitor(executionVisitor);
   }
 
   @TestOnly
@@ -127,6 +135,8 @@ public class RegionWriteExecutor {
     this.schemaEngine = schemaEngine;
     this.clusterTemplateManager = clusterTemplateManager;
     this.triggerFireVisitor = triggerFireVisitor;
+    executionVisitor = new WritePlanNodeExecutionVisitor();
+    pipeExecutionVisitor = new 
PipeEnrichedWriteSchemaNodeExecutionVisitor(executionVisitor);
   }
 
   @SuppressWarnings("squid:S1181")
@@ -134,7 +144,6 @@ public class RegionWriteExecutor {
     try {
       WritePlanNodeExecutionContext context =
           new WritePlanNodeExecutionContext(groupId, 
regionManager.getRegionLock(groupId));
-      WritePlanNodeExecutionVisitor executionVisitor = new 
WritePlanNodeExecutionVisitor();
       return planNode.accept(executionVisitor, context);
     } catch (Throwable e) {
       LOGGER.error(e.getMessage(), e);
@@ -246,13 +255,13 @@ public class RegionWriteExecutor {
       }
     }
 
-    private TSStatus fireTriggerAndInsert(ConsensusGroupId groupId, PlanNode 
planNode)
+    private TSStatus fireTriggerAndInsert(ConsensusGroupId groupId, InsertNode 
insertNode)
         throws ConsensusException {
       long triggerCostTime = 0;
       TSStatus status;
       long startTime = System.nanoTime();
       // fire Trigger before the insertion
-      TriggerFireResult result = triggerFireVisitor.process(planNode, 
TriggerEvent.BEFORE_INSERT);
+      TriggerFireResult result = triggerFireVisitor.process(insertNode, 
TriggerEvent.BEFORE_INSERT);
       triggerCostTime += (System.nanoTime() - startTime);
       if (result.equals(TriggerFireResult.TERMINATION)) {
         status =
@@ -261,14 +270,14 @@ public class RegionWriteExecutor {
                 "Failed to complete the insertion because trigger error before 
the insertion.");
       } else {
         long startWriteTime = System.nanoTime();
-        status = dataRegionConsensus.write(groupId, planNode);
+        status = dataRegionConsensus.write(groupId, insertNode);
         
PERFORMANCE_OVERVIEW_METRICS.recordScheduleStorageCost(System.nanoTime() - 
startWriteTime);
 
         // fire Trigger after the insertion
         startTime = System.nanoTime();
         boolean hasFailedTriggerBeforeInsertion =
             result.equals(TriggerFireResult.FAILED_NO_TERMINATION);
-        result = triggerFireVisitor.process(planNode, 
TriggerEvent.AFTER_INSERT);
+        result = triggerFireVisitor.process(insertNode, 
TriggerEvent.AFTER_INSERT);
         if (hasFailedTriggerBeforeInsertion || 
!result.equals(TriggerFireResult.SUCCESS)) {
           status =
               RpcUtils.getStatus(
@@ -281,6 +290,18 @@ public class RegionWriteExecutor {
       return status;
     }
 
+    @Override
+    public RegionExecutionResult visitPipeEnrichedDeleteData(
+        PipeEnrichedDeleteDataNode node, WritePlanNodeExecutionContext 
context) {
+      // data deletion should block data insertion, especially when executed 
for deleting timeseries
+      context.getRegionWriteValidationRWLock().writeLock().lock();
+      try {
+        return super.visitPipeEnrichedDeleteData(node, context);
+      } finally {
+        context.getRegionWriteValidationRWLock().writeLock().unlock();
+      }
+    }
+
     @Override
     public RegionExecutionResult visitDeleteData(
         DeleteDataNode node, WritePlanNodeExecutionContext context) {
@@ -296,6 +317,13 @@ public class RegionWriteExecutor {
     @Override
     public RegionExecutionResult visitCreateTimeSeries(
         CreateTimeSeriesNode node, WritePlanNodeExecutionContext context) {
+      return executeCreateTimeSeries(node, context, false);
+    }
+
+    private RegionExecutionResult executeCreateTimeSeries(
+        CreateTimeSeriesNode node,
+        WritePlanNodeExecutionContext context,
+        boolean receivedFromPipe) {
       ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
       RegionExecutionResult result =
@@ -312,7 +340,9 @@ public class RegionWriteExecutor {
                   Collections.singletonList(node.getPath().getMeasurement()),
                   Collections.singletonList(node.getAlias()));
           if (failingMeasurementMap.isEmpty()) {
-            return super.visitCreateTimeSeries(node, context);
+            return receivedFromPipe
+                ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
+                : super.visitCreateTimeSeries(node, context);
           } else {
             MetadataException metadataException = failingMeasurementMap.get(0);
             LOGGER.error(METADATA_ERROR_MSG, metadataException);
@@ -328,13 +358,22 @@ public class RegionWriteExecutor {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
         }
       } else {
-        return super.visitCreateTimeSeries(node, context);
+        return receivedFromPipe
+            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
+            : super.visitCreateTimeSeries(node, context);
       }
     }
 
     @Override
     public RegionExecutionResult visitCreateAlignedTimeSeries(
         CreateAlignedTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
+      return executeCreateAlignedTimeSeries(node, context, false);
+    }
+
+    private RegionExecutionResult executeCreateAlignedTimeSeries(
+        CreateAlignedTimeSeriesNode node,
+        WritePlanNodeExecutionContext context,
+        boolean receivedFromPipe) {
       ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
       RegionExecutionResult result =
@@ -350,7 +389,9 @@ public class RegionWriteExecutor {
               schemaRegion.checkMeasurementExistence(
                   node.getDevicePath(), node.getMeasurements(), 
node.getAliasList());
           if (failingMeasurementMap.isEmpty()) {
-            return super.visitCreateAlignedTimeSeries(node, context);
+            return receivedFromPipe
+                ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
+                : super.visitCreateAlignedTimeSeries(node, context);
           } else {
             MetadataException metadataException = 
failingMeasurementMap.values().iterator().next();
             LOGGER.error(METADATA_ERROR_MSG, metadataException);
@@ -366,13 +407,22 @@ public class RegionWriteExecutor {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
         }
       } else {
-        return super.visitCreateAlignedTimeSeries(node, context);
+        return receivedFromPipe
+            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
+            : super.visitCreateAlignedTimeSeries(node, context);
       }
     }
 
     @Override
     public RegionExecutionResult visitCreateMultiTimeSeries(
         CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) 
{
+      return executeCreateMultiTimeSeries(node, context, false);
+    }
+
+    private RegionExecutionResult executeCreateMultiTimeSeries(
+        CreateMultiTimeSeriesNode node,
+        WritePlanNodeExecutionContext context,
+        boolean receivedFromPipe) {
       ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
       RegionExecutionResult result;
@@ -400,7 +450,8 @@ public class RegionWriteExecutor {
           }
 
           RegionExecutionResult failingResult =
-              registerTimeSeries(measurementGroupMap, node, context, 
failingStatus);
+              registerTimeSeries(
+                  measurementGroupMap, node, context, failingStatus, 
receivedFromPipe);
 
           if (failingResult != null) {
             return failingResult;
@@ -416,7 +467,9 @@ public class RegionWriteExecutor {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
         }
       } else {
-        return super.visitCreateMultiTimeSeries(node, context);
+        return receivedFromPipe
+            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
+            : super.visitCreateMultiTimeSeries(node, context);
       }
     }
 
@@ -455,10 +508,14 @@ public class RegionWriteExecutor {
         Map<PartialPath, MeasurementGroup> measurementGroupMap,
         CreateMultiTimeSeriesNode node,
         WritePlanNodeExecutionContext context,
-        List<TSStatus> failingStatus) {
+        List<TSStatus> failingStatus,
+        boolean receivedFromPipe) {
       if (!measurementGroupMap.isEmpty()) {
         // try registering the rest timeseries
-        RegionExecutionResult executionResult = 
super.visitCreateMultiTimeSeries(node, context);
+        RegionExecutionResult executionResult =
+            receivedFromPipe
+                ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
+                : super.visitCreateMultiTimeSeries(node, context);
         if (failingStatus.isEmpty()) {
           return executionResult;
         }
@@ -476,6 +533,13 @@ public class RegionWriteExecutor {
     @Override
     public RegionExecutionResult visitInternalCreateTimeSeries(
         InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
+      return executeInternalCreateTimeSeries(node, context, false);
+    }
+
+    private RegionExecutionResult executeInternalCreateTimeSeries(
+        InternalCreateTimeSeriesNode node,
+        WritePlanNodeExecutionContext context,
+        boolean receivedFromPipe) {
       ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
       RegionExecutionResult result =
@@ -519,20 +583,32 @@ public class RegionWriteExecutor {
           measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
 
           return processExecutionResultOfInternalCreateSchema(
-              super.visitInternalCreateTimeSeries(node, context),
+              receivedFromPipe
+                  ? super.visitPipeEnrichedWriteSchema(
+                      new PipeEnrichedWriteSchemaNode(node), context)
+                  : super.visitInternalCreateTimeSeries(node, context),
               failingStatus,
               alreadyExistingStatus);
         } finally {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
         }
       } else {
-        return super.visitInternalCreateTimeSeries(node, context);
+        return receivedFromPipe
+            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
+            : super.visitInternalCreateTimeSeries(node, context);
       }
     }
 
     @Override
     public RegionExecutionResult visitInternalCreateMultiTimeSeries(
         InternalCreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
+      return executeInternalCreateMultiTimeSeries(node, context, false);
+    }
+
+    private RegionExecutionResult executeInternalCreateMultiTimeSeries(
+        InternalCreateMultiTimeSeriesNode node,
+        WritePlanNodeExecutionContext context,
+        boolean receivedFromPipe) {
       ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
       RegionExecutionResult result;
@@ -586,14 +662,19 @@ public class RegionWriteExecutor {
           }
 
           return processExecutionResultOfInternalCreateSchema(
-              super.visitInternalCreateMultiTimeSeries(node, context),
+              receivedFromPipe
+                  ? super.visitPipeEnrichedWriteSchema(
+                      new PipeEnrichedWriteSchemaNode(node), context)
+                  : super.visitInternalCreateMultiTimeSeries(node, context),
               failingStatus,
               alreadyExistingStatus);
         } finally {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
         }
       } else {
-        return super.visitInternalCreateMultiTimeSeries(node, context);
+        return receivedFromPipe
+            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
+            : super.visitInternalCreateMultiTimeSeries(node, context);
       }
     }
 
@@ -676,17 +757,22 @@ public class RegionWriteExecutor {
     @Override
     public RegionExecutionResult visitAlterTimeSeries(
         AlterTimeSeriesNode node, WritePlanNodeExecutionContext context) {
+      return executeAlterTimeSeries(node, context, false);
+    }
+
+    private RegionExecutionResult executeAlterTimeSeries(
+        AlterTimeSeriesNode node, WritePlanNodeExecutionContext context, 
boolean receivedFromPipe) {
       ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
       try {
         MeasurementPath measurementPath = 
schemaRegion.fetchMeasurementPath(node.getPath());
-        if (node.isAlterView()) {
-          if (!measurementPath.getMeasurementSchema().isLogicalView()) {
-            throw new MetadataException(
-                String.format("%s is not view.", 
measurementPath.getFullPath()));
-          }
+        if (node.isAlterView() && 
!measurementPath.getMeasurementSchema().isLogicalView()) {
+          throw new MetadataException(
+              String.format("%s is not view.", measurementPath.getFullPath()));
         }
-        return super.visitAlterTimeSeries(node, context);
+        return receivedFromPipe
+            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
+            : super.visitAlterTimeSeries(node, context);
       } catch (MetadataException e) {
         RegionExecutionResult result = new RegionExecutionResult();
         result.setAccepted(true);
@@ -699,6 +785,13 @@ public class RegionWriteExecutor {
     @Override
     public RegionExecutionResult visitActivateTemplate(
         ActivateTemplateNode node, WritePlanNodeExecutionContext context) {
+      return executeActivateTemplate(node, context, false);
+    }
+
+    private RegionExecutionResult executeActivateTemplate(
+        ActivateTemplateNode node,
+        WritePlanNodeExecutionContext context,
+        boolean receivedFromPipe) {
       // activate template operation shall be blocked by unset template check
       context.getRegionWriteValidationRWLock().readLock().lock();
       try {
@@ -722,7 +815,13 @@ public class RegionWriteExecutor {
         RegionExecutionResult result =
             checkQuotaBeforeCreatingTimeSeries(
                 schemaRegion, node.getActivatePath(), 
templateSetInfo.left.getMeasurementNumber());
-        return result == null ? super.visitActivateTemplate(node, context) : 
result;
+        if (result == null) {
+          return receivedFromPipe
+              ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
+              : super.visitActivateTemplate(node, context);
+        } else {
+          return result;
+        }
       } finally {
         context.getRegionWriteValidationRWLock().readLock().unlock();
       }
@@ -731,6 +830,13 @@ public class RegionWriteExecutor {
     @Override
     public RegionExecutionResult visitBatchActivateTemplate(
         BatchActivateTemplateNode node, WritePlanNodeExecutionContext context) 
{
+      return executeBatchActivateTemplate(node, context, false);
+    }
+
+    private RegionExecutionResult executeBatchActivateTemplate(
+        BatchActivateTemplateNode node,
+        WritePlanNodeExecutionContext context,
+        boolean receivedFromPipe) {
       // activate template operation shall be blocked by unset template check
       context.getRegionWriteValidationRWLock().readLock().lock();
       try {
@@ -760,7 +866,9 @@ public class RegionWriteExecutor {
           }
         }
 
-        return super.visitBatchActivateTemplate(node, context);
+        return receivedFromPipe
+            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
+            : super.visitBatchActivateTemplate(node, context);
       } finally {
         context.getRegionWriteValidationRWLock().readLock().unlock();
       }
@@ -769,6 +877,13 @@ public class RegionWriteExecutor {
     @Override
     public RegionExecutionResult visitInternalBatchActivateTemplate(
         InternalBatchActivateTemplateNode node, WritePlanNodeExecutionContext 
context) {
+      return executeInternalBatchActivateTemplate(node, context, false);
+    }
+
+    private RegionExecutionResult executeInternalBatchActivateTemplate(
+        InternalBatchActivateTemplateNode node,
+        WritePlanNodeExecutionContext context,
+        boolean receivedFromPipe) {
       // activate template operation shall be blocked by unset template check
       context.getRegionWriteValidationRWLock().readLock().lock();
       try {
@@ -801,7 +916,9 @@ public class RegionWriteExecutor {
           }
         }
 
-        return super.visitInternalBatchActivateTemplate(node, context);
+        return receivedFromPipe
+            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
+            : super.visitInternalBatchActivateTemplate(node, context);
       } finally {
         context.getRegionWriteValidationRWLock().readLock().unlock();
       }
@@ -810,12 +927,19 @@ public class RegionWriteExecutor {
     @Override
     public RegionExecutionResult visitCreateLogicalView(
         CreateLogicalViewNode node, WritePlanNodeExecutionContext context) {
+      return executeCreateLogicalView(node, context, false);
+    }
+
+    private RegionExecutionResult executeCreateLogicalView(
+        CreateLogicalViewNode node,
+        WritePlanNodeExecutionContext context,
+        boolean receivedFromPipe) {
       ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
       if 
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {
         context.getRegionWriteValidationRWLock().writeLock().lock();
         try {
-          // step 1. make sure all target paths are NOT exist.
+          // step 1. make sure all target paths do NOT exist.
           List<PartialPath> targetPaths = node.getViewPathList();
           List<MetadataException> failingMetadataException = new ArrayList<>();
           for (PartialPath thisPath : targetPaths) {
@@ -825,12 +949,12 @@ public class RegionWriteExecutor {
                     thisPath.getDevicePath(),
                     Collections.singletonList(thisPath.getMeasurement()),
                     null);
-            // merge all exception into one map
+            // merge all exceptions into one map
             for (Map.Entry<Integer, MetadataException> entry : 
failingMeasurementMap.entrySet()) {
               failingMetadataException.add(entry.getValue());
             }
           }
-          // if there is some exception, handle each exception and return 
first of them.
+          // if there are some exceptions, handle each exception and return 
first of them.
           if (!failingMetadataException.isEmpty()) {
             MetadataException metadataException = 
failingMetadataException.get(0);
             LOGGER.error(METADATA_ERROR_MSG, metadataException);
@@ -842,16 +966,102 @@ public class RegionWriteExecutor {
                     metadataException.getErrorCode(), 
metadataException.getMessage()));
             return result;
           }
-          // step 2. make sure all source paths are existed.
-          return super.visitCreateLogicalView(node, context);
+          // step 2. make sure all source paths exist.
+          return receivedFromPipe
+              ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
+              : super.visitCreateLogicalView(node, context);
         } finally {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
         }
       } else {
-        return super.visitCreateLogicalView(node, context);
+        return receivedFromPipe
+            ? super.visitPipeEnrichedWriteSchema(new 
PipeEnrichedWriteSchemaNode(node), context)
+            : super.visitCreateLogicalView(node, context);
       }
       // end of visitCreateLogicalView
     }
+
+    @Override
+    public RegionExecutionResult visitPipeEnrichedWriteSchema(
+        PipeEnrichedWriteSchemaNode node, WritePlanNodeExecutionContext 
context) {
+      return node.getWriteSchemaNode().accept(pipeExecutionVisitor, context);
+    }
+  }
+
+  private class PipeEnrichedWriteSchemaNodeExecutionVisitor
+      extends PlanVisitor<RegionExecutionResult, 
WritePlanNodeExecutionContext> {
+
+    WritePlanNodeExecutionVisitor visitor;
+
+    private 
PipeEnrichedWriteSchemaNodeExecutionVisitor(WritePlanNodeExecutionVisitor 
visitor) {
+      this.visitor = visitor;
+    }
+
+    @Override
+    public RegionExecutionResult visitPlan(PlanNode node, 
WritePlanNodeExecutionContext context) {
+      throw new UnsupportedOperationException(
+          "PipeEnrichedWriteSchemaNodeExecutionVisitor does not support 
visiting general plan.");
+    }
+
+    @Override
+    public RegionExecutionResult visitCreateTimeSeries(
+        CreateTimeSeriesNode node, WritePlanNodeExecutionContext context) {
+      return visitor.executeCreateTimeSeries(node, context, true);
+    }
+
+    @Override
+    public RegionExecutionResult visitCreateAlignedTimeSeries(
+        CreateAlignedTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
+      return visitor.executeCreateAlignedTimeSeries(node, context, true);
+    }
+
+    @Override
+    public RegionExecutionResult visitCreateMultiTimeSeries(
+        CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) 
{
+      return visitor.executeCreateMultiTimeSeries(node, context, true);
+    }
+
+    @Override
+    public RegionExecutionResult visitInternalCreateTimeSeries(
+        InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
+      return visitor.executeInternalCreateTimeSeries(node, context, true);
+    }
+
+    @Override
+    public RegionExecutionResult visitInternalCreateMultiTimeSeries(
+        InternalCreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
+      return visitor.executeInternalCreateMultiTimeSeries(node, context, true);
+    }
+
+    @Override
+    public RegionExecutionResult visitAlterTimeSeries(
+        AlterTimeSeriesNode node, WritePlanNodeExecutionContext context) {
+      return visitor.executeAlterTimeSeries(node, context, true);
+    }
+
+    @Override
+    public RegionExecutionResult visitActivateTemplate(
+        ActivateTemplateNode node, WritePlanNodeExecutionContext context) {
+      return visitor.executeActivateTemplate(node, context, true);
+    }
+
+    @Override
+    public RegionExecutionResult visitBatchActivateTemplate(
+        BatchActivateTemplateNode node, WritePlanNodeExecutionContext context) 
{
+      return visitor.executeBatchActivateTemplate(node, context, true);
+    }
+
+    @Override
+    public RegionExecutionResult visitInternalBatchActivateTemplate(
+        InternalBatchActivateTemplateNode node, WritePlanNodeExecutionContext 
context) {
+      return visitor.executeInternalBatchActivateTemplate(node, context, true);
+    }
+
+    @Override
+    public RegionExecutionResult visitCreateLogicalView(
+        CreateLogicalViewNode node, WritePlanNodeExecutionContext context) {
+      return visitor.executeCreateLogicalView(node, context, true);
+    }
   }
 
   private static class WritePlanNodeExecutionContext {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 89842c3d1af..6ddf58895ac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -100,8 +100,6 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -133,6 +131,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ShowPath
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ShowSchemaTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.ShowLogicalViewStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
@@ -2570,33 +2569,13 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
   }
 
   @Override
-  public Analysis visitPipeEnrichedInsert(
-      PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement, 
MPPQueryContext context) {
-    Analysis analysis;
-
-    final InsertBaseStatement insertBaseStatement =
-        pipeEnrichedInsertBaseStatement.getInsertBaseStatement();
-    if (insertBaseStatement instanceof InsertTabletStatement) {
-      analysis = visitInsertTablet((InsertTabletStatement) 
insertBaseStatement, context);
-    } else if (insertBaseStatement instanceof InsertMultiTabletsStatement) {
-      analysis =
-          visitInsertMultiTablets((InsertMultiTabletsStatement) 
insertBaseStatement, context);
-    } else if (insertBaseStatement instanceof InsertRowStatement) {
-      analysis = visitInsertRow((InsertRowStatement) insertBaseStatement, 
context);
-    } else if (insertBaseStatement instanceof InsertRowsStatement) {
-      analysis = visitInsertRows((InsertRowsStatement) insertBaseStatement, 
context);
-    } else if (insertBaseStatement instanceof InsertRowsOfOneDeviceStatement) {
-      analysis =
-          visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceStatement) 
insertBaseStatement, context);
-    } else {
-      throw new UnsupportedOperationException(
-          "Unsupported insert statement type: " + 
insertBaseStatement.getClass().getName());
-    }
+  public Analysis visitPipeEnrichedStatement(
+      PipeEnrichedStatement pipeEnrichedStatement, MPPQueryContext context) {
+    Analysis analysis = pipeEnrichedStatement.getInnerStatement().accept(this, 
context);
 
     // statement may be changed because of logical view
-    pipeEnrichedInsertBaseStatement.setInsertBaseStatement(
-        (InsertBaseStatement) analysis.getStatement());
-    analysis.setStatement(pipeEnrichedInsertBaseStatement);
+    pipeEnrichedStatement.setInnerStatement(analysis.getStatement());
+    analysis.setStatement(pipeEnrichedStatement);
     return analysis;
   }
 
@@ -2653,15 +2632,6 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         .analyzeFileByFile();
   }
 
-  @Override
-  public Analysis visitPipeEnrichedLoadFile(
-      PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, 
MPPQueryContext context) {
-    final Analysis analysis =
-        
visitLoadFile(pipeEnrichedLoadTsFileStatement.getLoadTsFileStatement(), 
context);
-    analysis.setStatement(pipeEnrichedLoadTsFileStatement);
-    return analysis;
-  }
-
   /** get analysis according to statement and params */
   private Analysis getAnalysisForWriting(
       Analysis analysis, List<DataPartitionQueryParam> 
dataPartitionQueryParams, String userName) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index ce33e169b36..9af8cfa867b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -65,7 +65,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -318,7 +318,11 @@ public class QueryExecution implements IQueryExecution {
 
   private void schedule() {
     final long startTime = System.nanoTime();
-    if (rawStatement instanceof LoadTsFileStatement) {
+    boolean isPipeEnrichedTsFileLoad =
+        rawStatement instanceof PipeEnrichedStatement
+            && ((PipeEnrichedStatement) rawStatement).getInnerStatement()
+                instanceof LoadTsFileStatement;
+    if (rawStatement instanceof LoadTsFileStatement || 
isPipeEnrichedTsFileLoad) {
       this.scheduler =
           new LoadTsFileScheduler(
               distributedPlan,
@@ -326,7 +330,7 @@ public class QueryExecution implements IQueryExecution {
               stateMachine,
               syncInternalServiceClientManager,
               partitionFetcher,
-              rawStatement instanceof PipeEnrichedLoadTsFileStatement);
+              isPipeEnrichedTsFileLoad);
       this.scheduler.start();
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 090c2a55a6f..e55521ea107 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
 import 
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
 import 
org.apache.iotdb.db.queryengine.plan.expression.visitor.TransformToViewExpressionVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
@@ -43,6 +44,9 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.Int
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.MeasurementGroup;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
@@ -51,20 +55,16 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.DeleteDataStatement;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -87,6 +87,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchAct
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ShowPathsUsingTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.ShowLogicalViewStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.db.schemaengine.template.Template;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -558,37 +559,20 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
   }
 
   @Override
-  public PlanNode visitPipeEnrichedInsert(
-      PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement, 
MPPQueryContext context) {
-    InsertNode insertNode;
-
-    final InsertBaseStatement insertBaseStatement =
-        pipeEnrichedInsertBaseStatement.getInsertBaseStatement();
-    if (insertBaseStatement instanceof InsertRowStatement) {
-      insertNode =
-          (InsertRowNode) visitInsertRow((InsertRowStatement) 
insertBaseStatement, context);
-    } else if (insertBaseStatement instanceof InsertRowsStatement) {
-      insertNode =
-          (InsertRowsNode) visitInsertRows((InsertRowsStatement) 
insertBaseStatement, context);
-    } else if (insertBaseStatement instanceof InsertRowsOfOneDeviceStatement) {
-      insertNode =
-          (InsertRowsOfOneDeviceNode)
-              visitInsertRowsOfOneDevice(
-                  (InsertRowsOfOneDeviceStatement) insertBaseStatement, 
context);
-    } else if (insertBaseStatement instanceof InsertTabletStatement) {
-      insertNode =
-          (InsertTabletNode)
-              visitInsertTablet((InsertTabletStatement) insertBaseStatement, 
context);
-    } else if (insertBaseStatement instanceof InsertMultiTabletsStatement) {
-      insertNode =
-          (InsertMultiTabletsNode)
-              visitInsertMultiTablets((InsertMultiTabletsStatement) 
insertBaseStatement, context);
-    } else {
-      throw new UnsupportedOperationException(
-          "Unsupported insert statement type: " + 
insertBaseStatement.getClass().getName());
+  public PlanNode visitPipeEnrichedStatement(
+      PipeEnrichedStatement pipeEnrichedStatement, MPPQueryContext context) {
+    WritePlanNode node =
+        (WritePlanNode) pipeEnrichedStatement.getInnerStatement().accept(this, 
context);
+
+    if (node instanceof LoadTsFileNode) {
+      return node;
+    } else if (node instanceof InsertNode) {
+      return new PipeEnrichedInsertNode((InsertNode) node);
+    } else if (node instanceof DeleteDataNode) {
+      return new PipeEnrichedDeleteDataNode((DeleteDataNode) node);
     }
 
-    return new PipeEnrichedInsertNode(insertNode);
+    return new PipeEnrichedWriteSchemaNode(node);
   }
 
   @Override
@@ -597,12 +581,6 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
         context.getQueryId().genPlanNodeId(), 
loadTsFileStatement.getResources());
   }
 
-  @Override
-  public PlanNode visitPipeEnrichedLoadFile(
-      PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, 
MPPQueryContext context) {
-    return 
visitLoadFile(pipeEnrichedLoadTsFileStatement.getLoadTsFileStatement(), 
context);
-  }
-
   @Override
   public PlanNode visitShowTimeSeries(
       ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext 
context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
index 2b73576ea03..18f678974b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
@@ -46,6 +46,8 @@ public abstract class PlanNode implements IConsensusRequest {
 
   protected PlanNodeId id;
 
+  protected boolean isGeneratedByPipe = false;
+
   protected PlanNode(PlanNodeId id) {
     requireNonNull(id, "id is null");
     this.id = id;
@@ -59,6 +61,14 @@ public abstract class PlanNode implements IConsensusRequest {
     this.id = id;
   }
 
+  public boolean isGeneratedByPipe() {
+    return isGeneratedByPipe;
+  }
+
+  public void markAsGeneratedByPipe() {
+    isGeneratedByPipe = true;
+  }
+
   public abstract List<PlanNode> getChildren();
 
   public abstract void addChild(PlanNode child);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index ea819b466f5..a17ea3493fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -56,6 +56,10 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.vie
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.RollbackLogicalViewBlackListNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedConfigSchemaNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
@@ -97,7 +101,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataInputStream;
@@ -189,7 +192,11 @@ public enum PlanNodeType {
 
   LAST_QUERY_TRANSFORM((short) 81),
   TOP_K((short) 82),
-  COLUMN_INJECT((short) 83);
+  COLUMN_INJECT((short) 83),
+  PIPE_ENRICHED_DELETE_DATA((short) 84),
+  PIPE_ENRICHED_WRITE_SCHEMA((short) 85),
+  PIPE_ENRICHED_DELETE_SCHEMA((short) 86),
+  ;
 
   public static final int BYTES = Short.BYTES;
 
@@ -404,6 +411,13 @@ public enum PlanNodeType {
         return TopKNode.deserialize(buffer);
       case 83:
         return ColumnInjectNode.deserialize(buffer);
+      case 84:
+        return PipeEnrichedDeleteDataNode.deserialize(buffer);
+      case 85:
+        return PipeEnrichedWriteSchemaNode.deserialize(buffer);
+      case 86:
+        return PipeEnrichedConfigSchemaNode.deserialize(buffer);
+
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 9dbb3df3b57..104242486d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -53,6 +53,10 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.vie
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.RollbackLogicalViewBlackListNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedConfigSchemaNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
@@ -97,7 +101,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 
 public abstract class PlanVisitor<R, C> {
 
@@ -439,11 +442,27 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
+  public R visitDeleteData(DeleteDataNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  
/////////////////////////////////////////////////////////////////////////////////////////////////
+  // Pipe Enriched Node
+  
/////////////////////////////////////////////////////////////////////////////////////////////////
+
   public R visitPipeEnrichedInsert(PipeEnrichedInsertNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitDeleteData(DeleteDataNode node, C context) {
+  public R visitPipeEnrichedDeleteData(PipeEnrichedDeleteDataNode node, C 
context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitPipeEnrichedWriteSchema(PipeEnrichedWriteSchemaNode node, C 
context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitPipeEnrichedConfigSchema(PipeEnrichedConfigSchemaNode node, C 
context) {
     return visitPlan(node, context);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index 23a5620a611..997f5bd89e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
@@ -88,7 +89,11 @@ public class LoadTsFileNode extends WritePlanNode {
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
     List<WritePlanNode> res = new ArrayList<>();
-    LoadTsFileStatement statement = (LoadTsFileStatement) 
analysis.getStatement();
+    LoadTsFileStatement statement =
+        analysis.getStatement() instanceof PipeEnrichedStatement
+            ? (LoadTsFileStatement)
+                ((PipeEnrichedStatement) 
analysis.getStatement()).getInnerStatement()
+            : (LoadTsFileStatement) analysis.getStatement();
 
     for (int i = 0; i < resources.size(); i++) {
       res.add(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 43b8f9c6510..060acc2f9ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -187,7 +187,7 @@ public class CreateTimeSeriesNode extends WritePlanNode 
implements ICreateTimeSe
 
   public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
     String id;
-    PartialPath path = null;
+    PartialPath path;
     TSDataType dataType;
     TSEncoding encoding;
     CompressionType compressor;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/AlterLogicalViewNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/AlterLogicalViewNode.java
index 4122ebfb461..664575e1bf7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/AlterLogicalViewNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/AlterLogicalViewNode.java
@@ -19,16 +19,13 @@
 
 package 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view;
 
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathDeserializeUtil;
 import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
-import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -41,7 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class AlterLogicalViewNode extends WritePlanNode {
+public class AlterLogicalViewNode extends PlanNode {
 
   /**
    * A map from target path to source expression. Yht target path is the name 
of this logical view,
@@ -49,12 +46,6 @@ public class AlterLogicalViewNode extends WritePlanNode {
    */
   private final Map<PartialPath, ViewExpression> viewPathToSourceMap;
 
-  /**
-   * This variable will be set in function splitByPartition() according to 
analysis. And it will be
-   * set when creating new split nodes.
-   */
-  private final TRegionReplicaSet regionReplicaSet = null;
-
   public AlterLogicalViewNode(PlanNodeId id, Map<PartialPath, ViewExpression> 
viewPathToSourceMap) {
     super(id);
     this.viewPathToSourceMap = viewPathToSourceMap;
@@ -71,11 +62,6 @@ public class AlterLogicalViewNode extends WritePlanNode {
     return visitor.visitAlterLogicalView(this, context);
   }
 
-  @Override
-  public TRegionReplicaSet getRegionReplicaSet() {
-    return this.regionReplicaSet;
-  }
-
   @Override
   public List<PlanNode> getChildren() {
     return new ArrayList<>();
@@ -160,32 +146,5 @@ public class AlterLogicalViewNode extends WritePlanNode {
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     return new AlterLogicalViewNode(planNodeId, viewPathToSourceMap);
   }
-
-  @Override
-  public List<WritePlanNode> splitByPartition(Analysis analysis) {
-    Map<TRegionReplicaSet, Map<PartialPath, ViewExpression>> splitMap = new 
HashMap<>();
-    for (Map.Entry<PartialPath, ViewExpression> entry : 
this.viewPathToSourceMap.entrySet()) {
-      // for each entry in the map for target path to source expression,
-      // build a map from TRegionReplicaSet to this entry.
-      // Please note that getSchemaRegionReplicaSet needs a device path as 
parameter.
-      TRegionReplicaSet regionReplicaSet =
-          
analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(entry.getKey().getDevice());
-
-      // create a map if the key(regionReplicaSet) is not exists,
-      // then put this entry into this map(from regionReplicaSet to this entry)
-      splitMap
-          .computeIfAbsent(regionReplicaSet, k -> new HashMap<>())
-          .put(entry.getKey(), entry.getValue());
-    }
-
-    // split this node into several nodes according to their regionReplicaSet
-    List<WritePlanNode> result = new ArrayList<>();
-    for (Map.Entry<TRegionReplicaSet, Map<PartialPath, ViewExpression>> entry :
-        splitMap.entrySet()) {
-      // for each entry in splitMap, create a plan node.
-      result.add(new CreateLogicalViewNode(getPlanNodeId(), entry.getValue(), 
entry.getKey()));
-    }
-    return result;
-  }
   // endregion
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedConfigSchemaNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedConfigSchemaNode.java
new file mode 100644
index 00000000000..1ff6cabb7a2
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedConfigSchemaNode.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe;
+
+import 
org.apache.iotdb.db.consensus.statemachine.schemaregion.SchemaExecutionVisitor;
+import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ConstructSchemaBlackListNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeactivateTemplateNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode;
+import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * This class aims to mark the {@link PlanNode} of schema operation assigned 
by configNode to
+ * selectively forward the operations, {@link Statement}s of which extending 
{@link
+ * IConfigStatement}. The handling logic is defined only in {@link 
SchemaExecutionVisitor} to
+ * execute deletion logic and mark it as received, since the request does not 
need to pass {@link
+ * RegionWriteExecutor} and is assigned directly to regions by configNode.
+ *
+ * <p>
+ *
+ * <p>Notes: The contents includes all the collected {@link PlanNode}s of 
schema deletion:
+ *
+ * <p>- Timeseries: {@link DeleteTimeSeriesNode}
+ *
+ * <p>- Template: {@link DeactivateTemplateNode}
+ *
+ * <p>- LogicalView: {@link AlterLogicalViewNode}, {@link 
DeleteLogicalViewNode}
+ *
+ * <p>Intermediate nodes like {@link ConstructSchemaBlackListNode} will not be 
included since they
+ * will not be collected by pipe.
+ */
+public class PipeEnrichedConfigSchemaNode extends PlanNode {
+
+  private final PlanNode configSchemaNode;
+
+  public PipeEnrichedConfigSchemaNode(PlanNode configSchemaNode) {
+    super(configSchemaNode.getPlanNodeId());
+    this.configSchemaNode = configSchemaNode;
+  }
+
+  public PlanNode getConfigSchemaNode() {
+    return configSchemaNode;
+  }
+
+  @Override
+  public boolean isGeneratedByPipe() {
+    return configSchemaNode.isGeneratedByPipe();
+  }
+
+  @Override
+  public void markAsGeneratedByPipe() {
+    configSchemaNode.markAsGeneratedByPipe();
+  }
+
+  @Override
+  public PlanNodeId getPlanNodeId() {
+    return configSchemaNode.getPlanNodeId();
+  }
+
+  @Override
+  public void setPlanNodeId(PlanNodeId id) {
+    configSchemaNode.setPlanNodeId(id);
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return configSchemaNode.getChildren();
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    configSchemaNode.addChild(child);
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new PipeEnrichedConfigSchemaNode(configSchemaNode.clone());
+  }
+
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new PipeEnrichedConfigSchemaNode(
+        configSchemaNode.createSubNode(subNodeId, startIndex, endIndex));
+  }
+
+  @Override
+  public PlanNode cloneWithChildren(List<PlanNode> children) {
+    return new 
PipeEnrichedConfigSchemaNode(configSchemaNode.cloneWithChildren(children));
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return configSchemaNode.allowedChildCount();
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return configSchemaNode.getOutputColumnNames();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitPipeEnrichedConfigSchema(this, context);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.PIPE_ENRICHED_DELETE_SCHEMA.serialize(byteBuffer);
+    configSchemaNode.serialize(byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.PIPE_ENRICHED_DELETE_SCHEMA.serialize(stream);
+    configSchemaNode.serialize(stream);
+  }
+
+  public static PipeEnrichedConfigSchemaNode deserialize(ByteBuffer buffer) {
+    return new PipeEnrichedConfigSchemaNode(PlanNodeType.deserialize(buffer));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return o instanceof PipeEnrichedConfigSchemaNode
+        && configSchemaNode.equals(((PipeEnrichedConfigSchemaNode) 
o).configSchemaNode);
+  }
+
+  @Override
+  public int hashCode() {
+    return configSchemaNode.hashCode();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
new file mode 100644
index 00000000000..29d2e011c58
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import 
org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor;
+import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * This class aims to mark the {@link DeleteDataNode} to enable selectively 
forwarding pipe
+ * deletion. The handling logic is defined in:
+ *
+ * <p>1.{@link RegionWriteExecutor}, to serialize and reach the target data 
region.
+ *
+ * <p>2.{@link DataExecutionVisitor}, to actually write data on data region 
and mark it as received
+ * from pipe.
+ */
+public class PipeEnrichedDeleteDataNode extends DeleteDataNode {
+
+  private final DeleteDataNode deleteDataNode;
+
+  public PipeEnrichedDeleteDataNode(DeleteDataNode deleteDataNode) {
+    super(
+        deleteDataNode.getPlanNodeId(),
+        deleteDataNode.getPathList(),
+        deleteDataNode.getDeleteStartTime(),
+        deleteDataNode.getDeleteEndTime());
+    this.deleteDataNode = deleteDataNode;
+  }
+
+  public PlanNode getDeleteDataNode() {
+    return deleteDataNode;
+  }
+
+  @Override
+  public boolean isGeneratedByPipe() {
+    return deleteDataNode.isGeneratedByPipe();
+  }
+
+  @Override
+  public void markAsGeneratedByPipe() {
+    deleteDataNode.markAsGeneratedByPipe();
+  }
+
+  @Override
+  public PlanNodeId getPlanNodeId() {
+    return deleteDataNode.getPlanNodeId();
+  }
+
+  @Override
+  public void setPlanNodeId(PlanNodeId id) {
+    deleteDataNode.setPlanNodeId(id);
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return deleteDataNode.getChildren();
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    deleteDataNode.addChild(child);
+  }
+
+  @Override
+  public DeleteDataNode clone() {
+    return new PipeEnrichedDeleteDataNode((DeleteDataNode) 
deleteDataNode.clone());
+  }
+
+  @Override
+  public DeleteDataNode createSubNode(int subNodeId, int startIndex, int 
endIndex) {
+    return new PipeEnrichedDeleteDataNode(
+        (DeleteDataNode) deleteDataNode.createSubNode(subNodeId, startIndex, 
endIndex));
+  }
+
+  @Override
+  public PlanNode cloneWithChildren(List<PlanNode> children) {
+    return new PipeEnrichedDeleteDataNode(
+        (DeleteDataNode) deleteDataNode.cloneWithChildren(children));
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return deleteDataNode.allowedChildCount();
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return deleteDataNode.getOutputColumnNames();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitPipeEnrichedDeleteData(this, context);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.PIPE_ENRICHED_DELETE_DATA.serialize(byteBuffer);
+    deleteDataNode.serialize(byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.PIPE_ENRICHED_DELETE_DATA.serialize(stream);
+    deleteDataNode.serialize(stream);
+  }
+
+  public static PipeEnrichedDeleteDataNode deserialize(ByteBuffer buffer) {
+    return new PipeEnrichedDeleteDataNode((DeleteDataNode) 
PlanNodeType.deserialize(buffer));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return o instanceof PipeEnrichedDeleteDataNode
+        && deleteDataNode.equals(((PipeEnrichedDeleteDataNode) 
o).deleteDataNode);
+  }
+
+  @Override
+  public int hashCode() {
+    return deleteDataNode.hashCode();
+  }
+
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return deleteDataNode.getRegionReplicaSet();
+  }
+
+  @Override
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
+    return deleteDataNode.splitByPartition(analysis).stream()
+        .map(
+            plan ->
+                plan instanceof PipeEnrichedDeleteDataNode
+                    ? plan
+                    : new PipeEnrichedDeleteDataNode((DeleteDataNode) plan))
+        .collect(Collectors.toList());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
similarity index 89%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
index 28d217176f4..2f2408f8097 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
@@ -17,18 +17,22 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.path.PartialPath;
+import 
org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor;
+import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IDeviceID;
+import org.apache.iotdb.db.trigger.executor.TriggerFireVisitor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -38,6 +42,17 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.stream.Collectors;
 
+/**
+ * This class aims to mark the {@link InsertNode} to prevent forwarding pipe 
insertions. The
+ * handling logic is defined in:
+ *
+ * <p>1.{@link RegionWriteExecutor}, to serialize and reach the target data 
region.
+ *
+ * <p>2.{@link TriggerFireVisitor}, to fire the trigger before writing to data 
region.
+ *
+ * <p>3.{@link DataExecutionVisitor}, to actually write data on data region 
and mark it as received
+ * from pipe.
+ */
 public class PipeEnrichedInsertNode extends InsertNode {
 
   private final InsertNode insertNode;
@@ -215,7 +230,7 @@ public class PipeEnrichedInsertNode extends InsertNode {
     insertNode.serialize(stream);
   }
 
-  public static PlanNode deserialize(ByteBuffer buffer) {
+  public static PipeEnrichedInsertNode deserialize(ByteBuffer buffer) {
     return new PipeEnrichedInsertNode((InsertNode) 
PlanNodeType.deserialize(buffer));
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWriteSchemaNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWriteSchemaNode.java
new file mode 100644
index 00000000000..4ac9ffbe69e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWriteSchemaNode.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import 
org.apache.iotdb.db.consensus.statemachine.schemaregion.SchemaExecutionVisitor;
+import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.BatchActivateTemplateNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * This class aims to mark the {@link WritePlanNode} of schema writing to 
selectively forward pipe
+ * schema operations, {@link Statement}s of which passing through {@link 
SchemaExecutionVisitor}.
+ * The handling logic is defined in:
+ *
+ * <p>1.{@link RegionWriteExecutor}, to serialize and reach the target schema 
region.
+ *
+ * <p>2.{@link SchemaExecutionVisitor}, to actually write data on schema 
region and mark it as
+ * received from pipe.
+ *
+ * <p>
+ *
+ * <p>Notes: The content varies in all the {@link WritePlanNode}s of schema 
writing nodes:
+ *
+ * <p>- Timeseries(Manual): {@link CreateTimeSeriesNode}, {@link 
CreateAlignedTimeSeriesNode},
+ * {@link CreateMultiTimeSeriesNode}, {@link AlterTimeSeriesNode}
+ *
+ * <p>- Timeseries(Auto): {@link InternalCreateTimeSeriesNode}, {@link
+ * InternalCreateMultiTimeSeriesNode}
+ *
+ * <p>- Template(Manual): {@link ActivateTemplateNode}, {@link 
BatchActivateTemplateNode}
+ *
+ * <p>- Template(Auto): {@link InternalBatchActivateTemplateNode}
+ *
+ * <p>- LogicalView: {@link CreateLogicalViewNode}
+ */
+public class PipeEnrichedWriteSchemaNode extends WritePlanNode {
+
+  private final WritePlanNode writeSchemaNode;
+
+  public PipeEnrichedWriteSchemaNode(WritePlanNode schemaWriteNode) {
+    super(schemaWriteNode.getPlanNodeId());
+    this.writeSchemaNode = schemaWriteNode;
+  }
+
+  public WritePlanNode getWriteSchemaNode() {
+    return writeSchemaNode;
+  }
+
+  @Override
+  public boolean isGeneratedByPipe() {
+    return writeSchemaNode.isGeneratedByPipe();
+  }
+
+  @Override
+  public void markAsGeneratedByPipe() {
+    writeSchemaNode.markAsGeneratedByPipe();
+  }
+
+  @Override
+  public PlanNodeId getPlanNodeId() {
+    return writeSchemaNode.getPlanNodeId();
+  }
+
+  @Override
+  public void setPlanNodeId(PlanNodeId id) {
+    writeSchemaNode.setPlanNodeId(id);
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return writeSchemaNode.getChildren();
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    writeSchemaNode.addChild(child);
+  }
+
+  @Override
+  public WritePlanNode clone() {
+    return new PipeEnrichedWriteSchemaNode((WritePlanNode) 
writeSchemaNode.clone());
+  }
+
+  @Override
+  public WritePlanNode createSubNode(int subNodeId, int startIndex, int 
endIndex) {
+    return new PipeEnrichedWriteSchemaNode(
+        (WritePlanNode) writeSchemaNode.createSubNode(subNodeId, startIndex, 
endIndex));
+  }
+
+  @Override
+  public PlanNode cloneWithChildren(List<PlanNode> children) {
+    return new PipeEnrichedWriteSchemaNode(
+        (WritePlanNode) writeSchemaNode.cloneWithChildren(children));
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return writeSchemaNode.allowedChildCount();
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return writeSchemaNode.getOutputColumnNames();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitPipeEnrichedWriteSchema(this, context);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.PIPE_ENRICHED_WRITE_SCHEMA.serialize(byteBuffer);
+    writeSchemaNode.serialize(byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.PIPE_ENRICHED_WRITE_SCHEMA.serialize(stream);
+    writeSchemaNode.serialize(stream);
+  }
+
+  public static PipeEnrichedWriteSchemaNode deserialize(ByteBuffer buffer) {
+    return new PipeEnrichedWriteSchemaNode((WritePlanNode) 
PlanNodeType.deserialize(buffer));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return o instanceof PipeEnrichedWriteSchemaNode
+        && writeSchemaNode.equals(((PipeEnrichedWriteSchemaNode) 
o).writeSchemaNode);
+  }
+
+  @Override
+  public int hashCode() {
+    return writeSchemaNode.hashCode();
+  }
+
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return writeSchemaNode.getRegionReplicaSet();
+  }
+
+  @Override
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
+    return writeSchemaNode.splitByPartition(analysis).stream()
+        .map(
+            plan ->
+                plan instanceof PipeEnrichedWriteSchemaNode
+                    ? plan
+                    : new PipeEnrichedWriteSchemaNode(plan))
+        .collect(Collectors.toList());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 0b0fe8da749..5f0b83b64fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -78,8 +78,6 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
 
   protected ProgressIndex progressIndex;
 
-  protected boolean isGeneratedByPipe = false;
-
   protected InsertNode(PlanNodeId id) {
     super(id);
   }
@@ -176,14 +174,6 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
     throw new NotImplementedException("serializeAttributes of InsertNode is 
not implemented");
   }
 
-  public boolean isGeneratedByPipe() {
-    return isGeneratedByPipe;
-  }
-
-  public void markAsGeneratedByPipe() {
-    isGeneratedByPipe = true;
-  }
-
   // region Serialization methods for WAL
   /** Serialized size of measurement schemas, ignoring failed time series */
   protected int serializeMeasurementSchemasSize() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
index b673703649a..2b1cf8a68f8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
@@ -64,7 +64,6 @@ public enum StatementType {
   BATCH_INSERT_ROWS,
   BATCH_INSERT_ONE_DEVICE,
   MULTI_BATCH_INSERT,
-  PIPE_ENRICHED_INSERT,
 
   DELETE,
 
@@ -174,4 +173,6 @@ public enum StatementType {
   DELETE_LOGICAL_VIEW,
   RENAME_LOGICAL_VIEW,
   ALTER_LOGICAL_VIEW,
+
+  PIPE_ENRICHED,
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
index 2d34e5affba..c824593bd15 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
@@ -27,8 +27,6 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -98,6 +96,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogica
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.DeleteLogicalViewStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.RenameLogicalViewStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.ShowLogicalViewStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.AuthorStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ClearCacheStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement;
@@ -283,11 +282,6 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(loadTsFileStatement, context);
   }
 
-  public R visitPipeEnrichedLoadFile(
-      PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, C 
context) {
-    return visitStatement(pipeEnrichedLoadTsFileStatement, context);
-  }
-
   public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
     return visitStatement(insertRowStatement, context);
   }
@@ -306,9 +300,8 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(insertRowsOfOneDeviceStatement, context);
   }
 
-  public R visitPipeEnrichedInsert(
-      PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement, C 
context) {
-    return visitStatement(pipeEnrichedInsertBaseStatement, context);
+  public R visitPipeEnrichedStatement(PipeEnrichedStatement 
pipeEnrichedStatement, C context) {
+    return visitStatement(pipeEnrichedStatement, context);
   }
 
   /** Data Control Language (DCL) */
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
deleted file mode 100644
index 4b659b4ab15..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.plan.statement.crud;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
-import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
-import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-public class PipeEnrichedInsertBaseStatement extends InsertBaseStatement {
-
-  private InsertBaseStatement insertBaseStatement;
-
-  public PipeEnrichedInsertBaseStatement(InsertBaseStatement 
insertBaseStatement) {
-    statementType = StatementType.PIPE_ENRICHED_INSERT;
-    this.insertBaseStatement = insertBaseStatement;
-  }
-
-  public InsertBaseStatement getInsertBaseStatement() {
-    return insertBaseStatement;
-  }
-
-  public void setInsertBaseStatement(InsertBaseStatement insertBaseStatement) {
-    this.insertBaseStatement = insertBaseStatement;
-  }
-
-  @Override
-  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
-    return visitor.visitPipeEnrichedInsert(this, context);
-  }
-
-  @Override
-  public boolean isDebug() {
-    return insertBaseStatement.isDebug();
-  }
-
-  @Override
-  public void setDebug(boolean debug) {
-    insertBaseStatement.setDebug(debug);
-  }
-
-  @Override
-  public boolean isQuery() {
-    return !Objects.isNull(insertBaseStatement) && 
insertBaseStatement.isQuery();
-  }
-
-  @Override
-  public PartialPath getDevicePath() {
-    return insertBaseStatement.getDevicePath();
-  }
-
-  @Override
-  public void setDevicePath(PartialPath devicePath) {
-    insertBaseStatement.setDevicePath(devicePath);
-  }
-
-  @Override
-  public String[] getMeasurements() {
-    return insertBaseStatement.getMeasurements();
-  }
-
-  @Override
-  public void setMeasurements(String[] measurements) {
-    insertBaseStatement.setMeasurements(measurements);
-  }
-
-  @Override
-  public MeasurementSchema[] getMeasurementSchemas() {
-    return insertBaseStatement.getMeasurementSchemas();
-  }
-
-  @Override
-  public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) {
-    insertBaseStatement.setMeasurementSchemas(measurementSchemas);
-  }
-
-  @Override
-  public boolean isAligned() {
-    return insertBaseStatement.isAligned();
-  }
-
-  @Override
-  public void setAligned(boolean aligned) {
-    insertBaseStatement.setAligned(aligned);
-  }
-
-  @Override
-  public TSDataType[] getDataTypes() {
-    return insertBaseStatement.getDataTypes();
-  }
-
-  @Override
-  public void setDataTypes(TSDataType[] dataTypes) {
-    insertBaseStatement.setDataTypes(dataTypes);
-  }
-
-  @Override
-  public List<PartialPath> getPaths() {
-    return insertBaseStatement.getPaths();
-  }
-
-  @Override
-  public void updateAfterSchemaValidation() throws QueryProcessException {
-    insertBaseStatement.updateAfterSchemaValidation();
-  }
-
-  @Override
-  protected void selfCheckDataTypes(int index)
-      throws DataTypeMismatchException, PathNotExistException {
-    insertBaseStatement.selfCheckDataTypes(index);
-  }
-
-  @Override
-  public void markFailedMeasurement(int index, Exception cause) {
-    insertBaseStatement.markFailedMeasurement(index, cause);
-  }
-
-  @Override
-  public boolean hasValidMeasurements() {
-    return insertBaseStatement.hasValidMeasurements();
-  }
-
-  @Override
-  public boolean hasFailedMeasurements() {
-    return insertBaseStatement.hasFailedMeasurements();
-  }
-
-  @Override
-  public int getFailedMeasurementNumber() {
-    return insertBaseStatement.getFailedMeasurementNumber();
-  }
-
-  @Override
-  public List<String> getFailedMeasurements() {
-    return insertBaseStatement.getFailedMeasurements();
-  }
-
-  @Override
-  public List<Exception> getFailedExceptions() {
-    return insertBaseStatement.getFailedExceptions();
-  }
-
-  @Override
-  public List<String> getFailedMessages() {
-    return insertBaseStatement.getFailedMessages();
-  }
-
-  @Override
-  public void setFailedMeasurementIndex2Info(
-      Map<Integer, InsertBaseStatement.FailedMeasurementInfo> 
failedMeasurementIndex2Info) {
-    
insertBaseStatement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info);
-  }
-
-  @Override
-  protected Map<PartialPath, List<Pair<String, Integer>>> 
getMapFromDeviceToMeasurementAndIndex() {
-    return insertBaseStatement.getMapFromDeviceToMeasurementAndIndex();
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return insertBaseStatement.isEmpty();
-  }
-
-  @Override
-  public ISchemaValidation getSchemaValidation() {
-    return insertBaseStatement.getSchemaValidation();
-  }
-
-  @Override
-  public List<ISchemaValidation> getSchemaValidationList() {
-    return insertBaseStatement.getSchemaValidationList();
-  }
-
-  @Override
-  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) 
{
-    return insertBaseStatement.checkAndCastDataType(columnIndex, dataType);
-  }
-
-  @Override
-  public long getMinTime() {
-    return insertBaseStatement.getMinTime();
-  }
-
-  @Override
-  public Object getFirstValueOfIndex(int index) {
-    return insertBaseStatement.getFirstValueOfIndex(index);
-  }
-
-  @Override
-  public InsertBaseStatement removeLogicalView() {
-    return new 
PipeEnrichedInsertBaseStatement(insertBaseStatement.removeLogicalView());
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
deleted file mode 100644
index c2d9f349052..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.plan.statement.crud;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
-import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-
-import java.io.File;
-import java.util.List;
-
-public class PipeEnrichedLoadTsFileStatement extends LoadTsFileStatement {
-
-  private final LoadTsFileStatement loadTsFileStatement;
-
-  public PipeEnrichedLoadTsFileStatement(LoadTsFileStatement 
loadTsFileStatement) {
-    this.loadTsFileStatement = loadTsFileStatement;
-    statementType = StatementType.MULTI_BATCH_INSERT;
-  }
-
-  public LoadTsFileStatement getLoadTsFileStatement() {
-    return loadTsFileStatement;
-  }
-
-  @Override
-  public boolean isDebug() {
-    return loadTsFileStatement.isDebug();
-  }
-
-  @Override
-  public void setDebug(boolean debug) {
-    loadTsFileStatement.setDebug(debug);
-  }
-
-  @Override
-  public boolean isQuery() {
-    return loadTsFileStatement.isQuery();
-  }
-
-  @Override
-  public void setDeleteAfterLoad(boolean deleteAfterLoad) {
-    loadTsFileStatement.setDeleteAfterLoad(deleteAfterLoad);
-  }
-
-  @Override
-  public void setDatabaseLevel(int databaseLevel) {
-    loadTsFileStatement.setDatabaseLevel(databaseLevel);
-  }
-
-  @Override
-  public void setVerifySchema(boolean verifySchema) {
-    loadTsFileStatement.setVerifySchema(verifySchema);
-  }
-
-  @Override
-  public void setAutoCreateDatabase(boolean autoCreateDatabase) {
-    loadTsFileStatement.setAutoCreateDatabase(autoCreateDatabase);
-  }
-
-  @Override
-  public boolean isVerifySchema() {
-    return loadTsFileStatement.isVerifySchema();
-  }
-
-  @Override
-  public boolean isDeleteAfterLoad() {
-    return loadTsFileStatement.isDeleteAfterLoad();
-  }
-
-  @Override
-  public boolean isAutoCreateDatabase() {
-    return loadTsFileStatement.isAutoCreateDatabase();
-  }
-
-  @Override
-  public int getDatabaseLevel() {
-    return loadTsFileStatement.getDatabaseLevel();
-  }
-
-  @Override
-  public List<File> getTsFiles() {
-    return loadTsFileStatement.getTsFiles();
-  }
-
-  @Override
-  public void addTsFileResource(TsFileResource resource) {
-    loadTsFileStatement.addTsFileResource(resource);
-  }
-
-  @Override
-  public List<TsFileResource> getResources() {
-    return loadTsFileStatement.getResources();
-  }
-
-  @Override
-  public void addWritePointCount(long writePointCount) {
-    loadTsFileStatement.addWritePointCount(writePointCount);
-  }
-
-  @Override
-  public long getWritePointCount(int resourceIndex) {
-    return loadTsFileStatement.getWritePointCount(resourceIndex);
-  }
-
-  @Override
-  public List<PartialPath> getPaths() {
-    return loadTsFileStatement.getPaths();
-  }
-
-  @Override
-  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
-    return visitor.visitPipeEnrichedLoadFile(this, context);
-  }
-
-  @Override
-  public String toString() {
-    return "PipeEnrichedLoadTsFileStatement{" + "loadTsFileStatement=" + 
loadTsFileStatement + '}';
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
new file mode 100644
index 00000000000..1f2080be372
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.pipe;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class PipeEnrichedStatement extends Statement {
+
+  private Statement innerStatement;
+
+  public PipeEnrichedStatement(Statement innerStatement) {
+    statementType = StatementType.PIPE_ENRICHED;
+    this.innerStatement = innerStatement;
+  }
+
+  public Statement getInnerStatement() {
+    return innerStatement;
+  }
+
+  public void setInnerStatement(Statement innerStatement) {
+    this.innerStatement = innerStatement;
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitPipeEnrichedStatement(this, context);
+  }
+
+  @Override
+  public boolean isDebug() {
+    return innerStatement.isDebug();
+  }
+
+  @Override
+  public void setDebug(boolean debug) {
+    innerStatement.setDebug(debug);
+  }
+
+  @Override
+  public boolean isQuery() {
+    return !Objects.isNull(innerStatement) && innerStatement.isQuery();
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return Collections.emptyList();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
index 70037a99089..8c85795f1c9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
@@ -85,7 +85,7 @@ public class DataNodeThrottleQuotaManager {
       case BATCH_INSERT_ONE_DEVICE:
       case BATCH_INSERT_ROWS:
       case MULTI_BATCH_INSERT:
-      case PIPE_ENRICHED_INSERT:
+      case PIPE_ENRICHED:
         return checkQuota(userName, 1, 0, s);
       case QUERY:
       case GROUP_BY_TIME:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
index b6b714b5de2..5d5882430d5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
@@ -29,8 +29,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.BitMap;
@@ -95,10 +94,10 @@ public class DefaultOperationQuota implements 
OperationQuota {
   protected void updateEstimateConsumeQuota(int numWrites, int numReads, 
Statement s) {
     if (numWrites > 0) {
       long avgSize = 0;
-      final StatementType statementType =
-          s.getType() == StatementType.PIPE_ENRICHED_INSERT
-              ? ((PipeEnrichedInsertBaseStatement) 
s).getInsertBaseStatement().getType()
-              : s.getType();
+      if (s.getType() == StatementType.PIPE_ENRICHED) {
+        s = ((PipeEnrichedStatement) s).getInnerStatement();
+      }
+      final StatementType statementType = s.getType();
       switch (statementType) {
         case INSERT:
           // InsertStatement  InsertRowStatement
@@ -138,10 +137,7 @@ public class DefaultOperationQuota implements 
OperationQuota {
           }
           break;
         case MULTI_BATCH_INSERT:
-          // PipeEnrichedLoadTsFileStatement  LoadTsFileStatement  
InsertMultiTabletsStatement
-          if (s instanceof PipeEnrichedLoadTsFileStatement) {
-            s = ((PipeEnrichedLoadTsFileStatement) s).getLoadTsFileStatement();
-          }
+          // LoadTsFileStatement  InsertMultiTabletsStatement
           if (s instanceof LoadTsFileStatement) {
             LoadTsFileStatement loadTsFileStatement = (LoadTsFileStatement) s;
             for (int i = 0; i < loadTsFileStatement.getResources().size(); 
i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
index 68f87bcc86f..e791faad552 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
@@ -36,13 +36,13 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import org.apache.iotdb.db.trigger.service.TriggerManagementService;
 import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp;
@@ -253,20 +253,7 @@ public class TriggerFireVisitor extends 
PlanVisitor<TriggerFireResult, TriggerEv
   @Override
   public TriggerFireResult visitPipeEnrichedInsert(
       PipeEnrichedInsertNode node, TriggerEvent context) {
-    final InsertNode realInsertNode = node.getInsertNode();
-    if (realInsertNode instanceof InsertRowNode) {
-      return visitInsertRow((InsertRowNode) realInsertNode, context);
-    } else if (realInsertNode instanceof InsertTabletNode) {
-      return visitInsertTablet((InsertTabletNode) realInsertNode, context);
-    } else if (realInsertNode instanceof InsertRowsNode) {
-      return visitInsertRows((InsertRowsNode) realInsertNode, context);
-    } else if (realInsertNode instanceof InsertMultiTabletsNode) {
-      return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode, 
context);
-    } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) {
-      return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode) 
realInsertNode, context);
-    } else {
-      return visitPlan(realInsertNode, context);
-    }
+    return node.getInsertNode().accept(this, context);
   }
 
   private Map<String, Integer> constructMeasurementToSchemaIndexMap(

Reply via email to