This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch multi-cyclic-pipe in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 29b6f2d02a344153f6b709151e368867d29a46b4 Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Aug 15 15:04:34 2023 +0800 node visitor --- .../dataregion/DataExecutionVisitor.java | 23 ++ .../execution/executor/RegionWriteExecutor.java | 7 + .../plan/planner/plan/node/PlanNodeType.java | 7 +- .../plan/planner/plan/node/PlanVisitor.java | 5 + .../plan/node/write/InsertMultiTabletsNode.java | 6 + .../plan/planner/plan/node/write/InsertNode.java | 12 +- .../planner/plan/node/write/InsertRowsNode.java | 6 + .../plan/node/write/InsertRowsOfOneDeviceNode.java | 6 + .../plan/node/write/PipeEnrichedInsertNode.java | 276 +++++++++++++++++++++ .../db/trigger/executor/TriggerFireVisitor.java | 20 ++ 10 files changed, 366 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java index 28384aee64b..1e647995a57 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java @@ -28,10 +28,12 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -164,6 +166,27 @@ public class DataExecutionVisitor extends PlanVisitor<TSStatus, DataRegion> { } } + @Override + public TSStatus visitPipeEnrichedInsert(PipeEnrichedInsertNode node, DataRegion context) { + final InsertNode realInsertNode = node.getInsertNode(); + + realInsertNode.markAsGeneratedByPipe(); + + if (realInsertNode instanceof InsertRowNode) { + return visitInsertRow((InsertRowNode) realInsertNode, context); + } else if (realInsertNode instanceof InsertTabletNode) { + return visitInsertTablet((InsertTabletNode) realInsertNode, context); + } else if (realInsertNode instanceof InsertRowsNode) { + return visitInsertRows((InsertRowsNode) realInsertNode, context); + } else if (realInsertNode instanceof InsertMultiTabletsNode) { + return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode, context); + } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) { + return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode) realInsertNode, context); + } else { + return visitPlan(realInsertNode, context); + } + } + @Override public TSStatus visitDeleteData(DeleteDataNode node, DataRegion dataRegion) { try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java index 1b94e607282..c60787dd737 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java @@ -60,6 +60,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode; import org.apache.iotdb.db.schemaengine.SchemaEngine; import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion; import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager; @@ -222,6 +223,12 @@ public class RegionWriteExecutor { return executeDataInsert(node, context); } + @Override + public RegionExecutionResult visitPipeEnrichedInsert( + PipeEnrichedInsertNode node, WritePlanNodeExecutionContext context) { + return executeDataInsert(node, context); + } + private RegionExecutionResult executeDataInsert( InsertNode insertNode, WritePlanNodeExecutionContext context) { RegionExecutionResult response = new RegionExecutionResult(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index d2379d1435d..2364aef9d7e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -93,6 +93,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.DataInputStream; @@ -177,7 +178,9 @@ public enum PlanNodeType { ROLLBACK_LOGICAL_VIEW_BLACK_LIST((short) 75), DELETE_LOGICAL_VIEW((short) 76), LOGICAL_VIEW_SCHEMA_SCAN((short) 77), - ALTER_LOGICAL_VIEW((short) 78); + ALTER_LOGICAL_VIEW((short) 78), + PIPE_ENRICHED_INSERT((short) 79), + ; public static final int BYTES = Short.BYTES; @@ -380,6 +383,8 @@ public enum PlanNodeType { return LogicalViewSchemaScanNode.deserialize(buffer); case 78: return AlterLogicalViewNode.deserialize(buffer); + case 79: + return PipeEnrichedInsertNode.deserialize(buffer); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index c603af07f0e..e6eb22fc5a2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -94,6 +94,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode; public abstract class PlanVisitor<R, C> { @@ -423,6 +424,10 @@ public abstract class PlanVisitor<R, C> { return visitPlan(node, context); } + public R visitPipeEnrichedInsert(PipeEnrichedInsertNode node, C context) { + return visitPlan(node, context); + } + public R visitDeleteData(DeleteDataNode node, C context) { return visitPlan(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java index 167019fe361..69af99b48cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java @@ -240,6 +240,12 @@ public class InsertMultiTabletsNode extends InsertNode { } } + @Override + public void markAsGeneratedByPipe() { + isGeneratedByPipe = true; + insertTabletNodeList.forEach(InsertTabletNode::markAsGeneratedByPipe); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java index fba7c0574ca..abca512e1e8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java @@ -77,6 +77,8 @@ public abstract class InsertNode extends WritePlanNode implements ComparableCons protected ProgressIndex progressIndex; + protected boolean isGeneratedByPipe = false; + protected InsertNode(PlanNodeId id) { super(id); } @@ -169,6 +171,14 @@ public abstract class InsertNode extends WritePlanNode implements ComparableCons throw new NotImplementedException("serializeAttributes of InsertNode is not implemented"); } + public boolean isGeneratedByPipe() { + return isGeneratedByPipe; + } + + public void markAsGeneratedByPipe() { + isGeneratedByPipe = true; + } + // region Serialization methods for WAL /** Serialized size of measurement schemas, ignoring failed time series */ protected int serializeMeasurementSchemasSize() { @@ -274,7 +284,7 @@ public abstract class InsertNode extends WritePlanNode implements ComparableCons // region progress index @Override - public final ProgressIndex getProgressIndex() { + public ProgressIndex getProgressIndex() { return progressIndex; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java index 61949962285..20394f14ae0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java @@ -202,6 +202,12 @@ public class InsertRowsNode extends InsertNode { } } + @Override + public void markAsGeneratedByPipe() { + isGeneratedByPipe = true; + insertRowNodeList.forEach(InsertRowNode::markAsGeneratedByPipe); + } + @Override public List<WritePlanNode> splitByPartition(Analysis analysis) { Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java index 28b436c6237..a05e2080f95 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java @@ -269,6 +269,12 @@ public class InsertRowsOfOneDeviceNode extends InsertNode { } } + @Override + public void markAsGeneratedByPipe() { + isGeneratedByPipe = true; + insertRowNodeList.forEach(InsertRowNode::markAsGeneratedByPipe); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java new file mode 100644 index 00000000000..c2988beafe5 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write; + +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; +import org.apache.iotdb.db.storageengine.dataregion.memtable.IDeviceID; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.Collectors; + +public class PipeEnrichedInsertNode extends InsertNode { + + private final InsertNode insertNode; + + public PipeEnrichedInsertNode(InsertNode insertNode) { + super(insertNode.getPlanNodeId()); + this.insertNode = insertNode; + } + + public InsertNode getInsertNode() { + return insertNode; + } + + @Override + public boolean isGeneratedByPipe() { + return insertNode.isGeneratedByPipe(); + } + + @Override + public void markAsGeneratedByPipe() { + insertNode.markAsGeneratedByPipe(); + } + + @Override + public PlanNodeId getPlanNodeId() { + return insertNode.getPlanNodeId(); + } + + @Override + public void setPlanNodeId(PlanNodeId id) { + insertNode.setPlanNodeId(id); + } + + @Override + public List<PlanNode> getChildren() { + return insertNode.getChildren(); + } + + @Override + public void addChild(PlanNode child) { + insertNode.addChild(child); + } + + @Override + public PlanNode clone() { + return new PipeEnrichedInsertNode((InsertNode) insertNode.clone()); + } + + @Override + public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { + return new PipeEnrichedInsertNode( + (InsertNode) insertNode.createSubNode(subNodeId, startIndex, endIndex)); + } + + @Override + public PlanNode cloneWithChildren(List<PlanNode> children) { + return new PipeEnrichedInsertNode((InsertNode) insertNode.cloneWithChildren(children)); + } + + @Override + public int allowedChildCount() { + return insertNode.allowedChildCount(); + } + + @Override + public List<String> getOutputColumnNames() { + return insertNode.getOutputColumnNames(); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitPipeEnrichedInsert(this, context); + } + + @Override + public List<WritePlanNode> splitByPartition(Analysis analysis) { + return insertNode.splitByPartition(analysis).stream() + .map( + plan -> + plan instanceof PipeEnrichedInsertNode + ? plan + : new PipeEnrichedInsertNode((InsertNode) plan)) + .collect(Collectors.toList()); + } + + @Override + public TRegionReplicaSet getDataRegionReplicaSet() { + return insertNode.getDataRegionReplicaSet(); + } + + @Override + public void setDataRegionReplicaSet(TRegionReplicaSet dataRegionReplicaSet) { + insertNode.setDataRegionReplicaSet(dataRegionReplicaSet); + } + + @Override + public PartialPath getDevicePath() { + return insertNode.getDevicePath(); + } + + @Override + public void setDevicePath(PartialPath devicePath) { + insertNode.setDevicePath(devicePath); + } + + @Override + public boolean isAligned() { + return insertNode.isAligned(); + } + + @Override + public void setAligned(boolean aligned) { + insertNode.setAligned(aligned); + } + + @Override + public MeasurementSchema[] getMeasurementSchemas() { + return insertNode.getMeasurementSchemas(); + } + + @Override + public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) { + insertNode.setMeasurementSchemas(measurementSchemas); + } + + @Override + public String[] getMeasurements() { + return insertNode.getMeasurements(); + } + + @Override + public TSDataType[] getDataTypes() { + return insertNode.getDataTypes(); + } + + @Override + public TSDataType getDataType(int index) { + return insertNode.getDataType(index); + } + + @Override + public void setDataTypes(TSDataType[] dataTypes) { + insertNode.setDataTypes(dataTypes); + } + + @Override + public IDeviceID getDeviceID() { + return insertNode.getDeviceID(); + } + + @Override + public void setDeviceID(IDeviceID deviceID) { + insertNode.setDeviceID(deviceID); + } + + @Override + public long getSearchIndex() { + return insertNode.getSearchIndex(); + } + + @Override + public void setSearchIndex(long searchIndex) { + insertNode.setSearchIndex(searchIndex); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.PIPE_ENRICHED_INSERT.serialize(byteBuffer); + insertNode.serialize(byteBuffer); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.PIPE_ENRICHED_INSERT.serialize(stream); + insertNode.serialize(stream); + } + + public static PlanNode deserialize(ByteBuffer buffer) { + return new PipeEnrichedInsertNode((InsertNode) PlanNodeType.deserialize(buffer)); + } + + @Override + public TRegionReplicaSet getRegionReplicaSet() { + return insertNode.getRegionReplicaSet(); + } + + @Override + public long getMinTime() { + return insertNode.getMinTime(); + } + + @Override + public boolean isSyncFromLeaderWhenUsingIoTConsensus() { + return insertNode.isSyncFromLeaderWhenUsingIoTConsensus(); + } + + @Override + public void markFailedMeasurement(int index) { + insertNode.markFailedMeasurement(index); + } + + @Override + public boolean hasValidMeasurements() { + return insertNode.hasValidMeasurements(); + } + + @Override + public void setFailedMeasurementNumber(int failedMeasurementNumber) { + insertNode.setFailedMeasurementNumber(failedMeasurementNumber); + } + + @Override + public int getFailedMeasurementNumber() { + return insertNode.getFailedMeasurementNumber(); + } + + @Override + public void setProgressIndex(ProgressIndex progressIndex) { + insertNode.setProgressIndex(progressIndex); + } + + @Override + public ProgressIndex getProgressIndex() { + return insertNode.getProgressIndex(); + } + + @Override + public boolean equals(Object o) { + return o instanceof PipeEnrichedInsertNode + && insertNode.equals(((PipeEnrichedInsertNode) o).insertNode); + } + + @Override + public int hashCode() { + return insertNode.hashCode(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java index 1d91ec43ddf..68f87bcc86f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java @@ -42,6 +42,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode; import org.apache.iotdb.db.trigger.service.TriggerManagementService; import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq; import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp; @@ -249,6 +250,25 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv return hasFailedTrigger ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.SUCCESS; } + @Override + public TriggerFireResult visitPipeEnrichedInsert( + PipeEnrichedInsertNode node, TriggerEvent context) { + final InsertNode realInsertNode = node.getInsertNode(); + if (realInsertNode instanceof InsertRowNode) { + return visitInsertRow((InsertRowNode) realInsertNode, context); + } else if (realInsertNode instanceof InsertTabletNode) { + return visitInsertTablet((InsertTabletNode) realInsertNode, context); + } else if (realInsertNode instanceof InsertRowsNode) { + return visitInsertRows((InsertRowsNode) realInsertNode, context); + } else if (realInsertNode instanceof InsertMultiTabletsNode) { + return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode, context); + } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) { + return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode) realInsertNode, context); + } else { + return visitPlan(realInsertNode, context); + } + } + private Map<String, Integer> constructMeasurementToSchemaIndexMap( String[] measurements, MeasurementSchema[] schemas) { // The index of measurement and schema is the same now.
