This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch multi-cyclic-pipe
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 29b6f2d02a344153f6b709151e368867d29a46b4
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Aug 15 15:04:34 2023 +0800

    node visitor
---
 .../dataregion/DataExecutionVisitor.java           |  23 ++
 .../execution/executor/RegionWriteExecutor.java    |   7 +
 .../plan/planner/plan/node/PlanNodeType.java       |   7 +-
 .../plan/planner/plan/node/PlanVisitor.java        |   5 +
 .../plan/node/write/InsertMultiTabletsNode.java    |   6 +
 .../plan/planner/plan/node/write/InsertNode.java   |  12 +-
 .../planner/plan/node/write/InsertRowsNode.java    |   6 +
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |   6 +
 .../plan/node/write/PipeEnrichedInsertNode.java    | 276 +++++++++++++++++++++
 .../db/trigger/executor/TriggerFireVisitor.java    |  20 ++
 10 files changed, 366 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 28384aee64b..1e647995a57 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -28,10 +28,12 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -164,6 +166,27 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
     }
   }
 
+  @Override
+  public TSStatus visitPipeEnrichedInsert(PipeEnrichedInsertNode node, 
DataRegion context) {
+    final InsertNode realInsertNode = node.getInsertNode();
+
+    realInsertNode.markAsGeneratedByPipe();
+
+    if (realInsertNode instanceof InsertRowNode) {
+      return visitInsertRow((InsertRowNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertTabletNode) {
+      return visitInsertTablet((InsertTabletNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertRowsNode) {
+      return visitInsertRows((InsertRowsNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertMultiTabletsNode) {
+      return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode, 
context);
+    } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) {
+      return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode) 
realInsertNode, context);
+    } else {
+      return visitPlan(realInsertNode, context);
+    }
+  }
+
   @Override
   public TSStatus visitDeleteData(DeleteDataNode node, DataRegion dataRegion) {
     try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index 1b94e607282..c60787dd737 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -60,6 +60,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import org.apache.iotdb.db.schemaengine.SchemaEngine;
 import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
@@ -222,6 +223,12 @@ public class RegionWriteExecutor {
       return executeDataInsert(node, context);
     }
 
+    @Override
+    public RegionExecutionResult visitPipeEnrichedInsert(
+        PipeEnrichedInsertNode node, WritePlanNodeExecutionContext context) {
+      return executeDataInsert(node, context);
+    }
+
     private RegionExecutionResult executeDataInsert(
         InsertNode insertNode, WritePlanNodeExecutionContext context) {
       RegionExecutionResult response = new RegionExecutionResult();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index d2379d1435d..2364aef9d7e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -93,6 +93,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataInputStream;
@@ -177,7 +178,9 @@ public enum PlanNodeType {
   ROLLBACK_LOGICAL_VIEW_BLACK_LIST((short) 75),
   DELETE_LOGICAL_VIEW((short) 76),
   LOGICAL_VIEW_SCHEMA_SCAN((short) 77),
-  ALTER_LOGICAL_VIEW((short) 78);
+  ALTER_LOGICAL_VIEW((short) 78),
+  PIPE_ENRICHED_INSERT((short) 79),
+  ;
 
   public static final int BYTES = Short.BYTES;
 
@@ -380,6 +383,8 @@ public enum PlanNodeType {
         return LogicalViewSchemaScanNode.deserialize(buffer);
       case 78:
         return AlterLogicalViewNode.deserialize(buffer);
+      case 79:
+        return PipeEnrichedInsertNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index c603af07f0e..e6eb22fc5a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -94,6 +94,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 
 public abstract class PlanVisitor<R, C> {
 
@@ -423,6 +424,10 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
+  public R visitPipeEnrichedInsert(PipeEnrichedInsertNode node, C context) {
+    return visitPlan(node, context);
+  }
+
   public R visitDeleteData(DeleteDataNode node, C context) {
     return visitPlan(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 167019fe361..69af99b48cf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -240,6 +240,12 @@ public class InsertMultiTabletsNode extends InsertNode {
     }
   }
 
+  @Override
+  public void markAsGeneratedByPipe() {
+    isGeneratedByPipe = true;
+    insertTabletNodeList.forEach(InsertTabletNode::markAsGeneratedByPipe);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index fba7c0574ca..abca512e1e8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -77,6 +77,8 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
 
   protected ProgressIndex progressIndex;
 
+  protected boolean isGeneratedByPipe = false;
+
   protected InsertNode(PlanNodeId id) {
     super(id);
   }
@@ -169,6 +171,14 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
     throw new NotImplementedException("serializeAttributes of InsertNode is 
not implemented");
   }
 
+  public boolean isGeneratedByPipe() {
+    return isGeneratedByPipe;
+  }
+
+  public void markAsGeneratedByPipe() {
+    isGeneratedByPipe = true;
+  }
+
   // region Serialization methods for WAL
   /** Serialized size of measurement schemas, ignoring failed time series */
   protected int serializeMeasurementSchemasSize() {
@@ -274,7 +284,7 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
   // region progress index
 
   @Override
-  public final ProgressIndex getProgressIndex() {
+  public ProgressIndex getProgressIndex() {
     return progressIndex;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 61949962285..20394f14ae0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -202,6 +202,12 @@ public class InsertRowsNode extends InsertNode {
     }
   }
 
+  @Override
+  public void markAsGeneratedByPipe() {
+    isGeneratedByPipe = true;
+    insertRowNodeList.forEach(InsertRowNode::markAsGeneratedByPipe);
+  }
+
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
     Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 28b436c6237..a05e2080f95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -269,6 +269,12 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
     }
   }
 
+  @Override
+  public void markAsGeneratedByPipe() {
+    isGeneratedByPipe = true;
+    insertRowNodeList.forEach(InsertRowNode::markAsGeneratedByPipe);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
new file mode 100644
index 00000000000..c2988beafe5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IDeviceID;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PipeEnrichedInsertNode extends InsertNode {
+
+  private final InsertNode insertNode;
+
+  public PipeEnrichedInsertNode(InsertNode insertNode) {
+    super(insertNode.getPlanNodeId());
+    this.insertNode = insertNode;
+  }
+
+  public InsertNode getInsertNode() {
+    return insertNode;
+  }
+
+  @Override
+  public boolean isGeneratedByPipe() {
+    return insertNode.isGeneratedByPipe();
+  }
+
+  @Override
+  public void markAsGeneratedByPipe() {
+    insertNode.markAsGeneratedByPipe();
+  }
+
+  @Override
+  public PlanNodeId getPlanNodeId() {
+    return insertNode.getPlanNodeId();
+  }
+
+  @Override
+  public void setPlanNodeId(PlanNodeId id) {
+    insertNode.setPlanNodeId(id);
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return insertNode.getChildren();
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    insertNode.addChild(child);
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new PipeEnrichedInsertNode((InsertNode) insertNode.clone());
+  }
+
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new PipeEnrichedInsertNode(
+        (InsertNode) insertNode.createSubNode(subNodeId, startIndex, 
endIndex));
+  }
+
+  @Override
+  public PlanNode cloneWithChildren(List<PlanNode> children) {
+    return new PipeEnrichedInsertNode((InsertNode) 
insertNode.cloneWithChildren(children));
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return insertNode.allowedChildCount();
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return insertNode.getOutputColumnNames();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitPipeEnrichedInsert(this, context);
+  }
+
+  @Override
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
+    return insertNode.splitByPartition(analysis).stream()
+        .map(
+            plan ->
+                plan instanceof PipeEnrichedInsertNode
+                    ? plan
+                    : new PipeEnrichedInsertNode((InsertNode) plan))
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public TRegionReplicaSet getDataRegionReplicaSet() {
+    return insertNode.getDataRegionReplicaSet();
+  }
+
+  @Override
+  public void setDataRegionReplicaSet(TRegionReplicaSet dataRegionReplicaSet) {
+    insertNode.setDataRegionReplicaSet(dataRegionReplicaSet);
+  }
+
+  @Override
+  public PartialPath getDevicePath() {
+    return insertNode.getDevicePath();
+  }
+
+  @Override
+  public void setDevicePath(PartialPath devicePath) {
+    insertNode.setDevicePath(devicePath);
+  }
+
+  @Override
+  public boolean isAligned() {
+    return insertNode.isAligned();
+  }
+
+  @Override
+  public void setAligned(boolean aligned) {
+    insertNode.setAligned(aligned);
+  }
+
+  @Override
+  public MeasurementSchema[] getMeasurementSchemas() {
+    return insertNode.getMeasurementSchemas();
+  }
+
+  @Override
+  public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) {
+    insertNode.setMeasurementSchemas(measurementSchemas);
+  }
+
+  @Override
+  public String[] getMeasurements() {
+    return insertNode.getMeasurements();
+  }
+
+  @Override
+  public TSDataType[] getDataTypes() {
+    return insertNode.getDataTypes();
+  }
+
+  @Override
+  public TSDataType getDataType(int index) {
+    return insertNode.getDataType(index);
+  }
+
+  @Override
+  public void setDataTypes(TSDataType[] dataTypes) {
+    insertNode.setDataTypes(dataTypes);
+  }
+
+  @Override
+  public IDeviceID getDeviceID() {
+    return insertNode.getDeviceID();
+  }
+
+  @Override
+  public void setDeviceID(IDeviceID deviceID) {
+    insertNode.setDeviceID(deviceID);
+  }
+
+  @Override
+  public long getSearchIndex() {
+    return insertNode.getSearchIndex();
+  }
+
+  @Override
+  public void setSearchIndex(long searchIndex) {
+    insertNode.setSearchIndex(searchIndex);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.PIPE_ENRICHED_INSERT.serialize(byteBuffer);
+    insertNode.serialize(byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.PIPE_ENRICHED_INSERT.serialize(stream);
+    insertNode.serialize(stream);
+  }
+
+  public static PlanNode deserialize(ByteBuffer buffer) {
+    return new PipeEnrichedInsertNode((InsertNode) 
PlanNodeType.deserialize(buffer));
+  }
+
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return insertNode.getRegionReplicaSet();
+  }
+
+  @Override
+  public long getMinTime() {
+    return insertNode.getMinTime();
+  }
+
+  @Override
+  public boolean isSyncFromLeaderWhenUsingIoTConsensus() {
+    return insertNode.isSyncFromLeaderWhenUsingIoTConsensus();
+  }
+
+  @Override
+  public void markFailedMeasurement(int index) {
+    insertNode.markFailedMeasurement(index);
+  }
+
+  @Override
+  public boolean hasValidMeasurements() {
+    return insertNode.hasValidMeasurements();
+  }
+
+  @Override
+  public void setFailedMeasurementNumber(int failedMeasurementNumber) {
+    insertNode.setFailedMeasurementNumber(failedMeasurementNumber);
+  }
+
+  @Override
+  public int getFailedMeasurementNumber() {
+    return insertNode.getFailedMeasurementNumber();
+  }
+
+  @Override
+  public void setProgressIndex(ProgressIndex progressIndex) {
+    insertNode.setProgressIndex(progressIndex);
+  }
+
+  @Override
+  public ProgressIndex getProgressIndex() {
+    return insertNode.getProgressIndex();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return o instanceof PipeEnrichedInsertNode
+        && insertNode.equals(((PipeEnrichedInsertNode) o).insertNode);
+  }
+
+  @Override
+  public int hashCode() {
+    return insertNode.hashCode();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
index 1d91ec43ddf..68f87bcc86f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
@@ -42,6 +42,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
 import org.apache.iotdb.db.trigger.service.TriggerManagementService;
 import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp;
@@ -249,6 +250,25 @@ public class TriggerFireVisitor extends 
PlanVisitor<TriggerFireResult, TriggerEv
     return hasFailedTrigger ? TriggerFireResult.FAILED_NO_TERMINATION : 
TriggerFireResult.SUCCESS;
   }
 
+  @Override
+  public TriggerFireResult visitPipeEnrichedInsert(
+      PipeEnrichedInsertNode node, TriggerEvent context) {
+    final InsertNode realInsertNode = node.getInsertNode();
+    if (realInsertNode instanceof InsertRowNode) {
+      return visitInsertRow((InsertRowNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertTabletNode) {
+      return visitInsertTablet((InsertTabletNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertRowsNode) {
+      return visitInsertRows((InsertRowsNode) realInsertNode, context);
+    } else if (realInsertNode instanceof InsertMultiTabletsNode) {
+      return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode, 
context);
+    } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) {
+      return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode) 
realInsertNode, context);
+    } else {
+      return visitPlan(realInsertNode, context);
+    }
+  }
+
   private Map<String, Integer> constructMeasurementToSchemaIndexMap(
       String[] measurements, MeasurementSchema[] schemas) {
     // The index of measurement and schema is the same now.

Reply via email to