This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3e4c6a4df91 Pipe Schema: Receiver Agent: Added pipe enriched planNode
to enable pipe request detection to configure "forwarding-pipe-request" (#11672)
3e4c6a4df91 is described below
commit 3e4c6a4df918513cb1c002614da3837090a4dfc8
Author: Caideyipi <[email protected]>
AuthorDate: Thu Dec 14 21:14:58 2023 +0800
Pipe Schema: Receiver Agent: Added pipe enriched planNode to enable pipe
request detection to configure "forwarding-pipe-request" (#11672)
* Squashed the pipe enriched statements into a single one.
* Expanded the pipe enriched nodes to Insert, Delete, WriteSchema,
ConfigSchema (detailed content in the notes)
* Simplified the logic to execute the enriched nodes.
* Changed AlterLogicalViewNode from WritePlanNode to PlanNode since it's
directly assigned to regions by configNode.
* Fixed DefaultOperationQuota to avoid potential ClassCastException.
---
.../org/apache/iotdb/db/audit/AuditLogger.java | 2 +-
.../dataregion/DataExecutionVisitor.java | 29 +--
.../schemaregion/SchemaExecutionVisitor.java | 14 ++
.../protocol/writeback/WriteBackConnector.java | 4 +-
.../receiver/thrift/IoTDBThriftReceiverV1.java | 10 +-
.../execution/executor/RegionWriteExecutor.java | 276 ++++++++++++++++++---
.../queryengine/plan/analyze/AnalyzeVisitor.java | 42 +---
.../queryengine/plan/execution/QueryExecution.java | 10 +-
.../plan/planner/LogicalPlanVisitor.java | 56 ++---
.../plan/planner/plan/node/PlanNode.java | 10 +
.../plan/planner/plan/node/PlanNodeType.java | 18 +-
.../plan/planner/plan/node/PlanVisitor.java | 23 +-
.../planner/plan/node/load/LoadTsFileNode.java | 7 +-
.../node/metedata/write/CreateTimeSeriesNode.java | 2 +-
.../metedata/write/view/AlterLogicalViewNode.java | 43 +---
.../node/pipe/PipeEnrichedConfigSchemaNode.java | 161 ++++++++++++
.../plan/node/pipe/PipeEnrichedDeleteDataNode.java | 169 +++++++++++++
.../{write => pipe}/PipeEnrichedInsertNode.java | 19 +-
.../node/pipe/PipeEnrichedWriteSchemaNode.java | 192 ++++++++++++++
.../plan/planner/plan/node/write/InsertNode.java | 10 -
.../queryengine/plan/statement/StatementType.java | 3 +-
.../plan/statement/StatementVisitor.java | 13 +-
.../crud/PipeEnrichedInsertBaseStatement.java | 220 ----------------
.../crud/PipeEnrichedLoadTsFileStatement.java | 137 ----------
.../plan/statement/pipe/PipeEnrichedStatement.java | 72 ++++++
.../quotas/DataNodeThrottleQuotaManager.java | 2 +-
.../rescon/quotas/DefaultOperationQuota.java | 16 +-
.../db/trigger/executor/TriggerFireVisitor.java | 17 +-
28 files changed, 982 insertions(+), 595 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
index ed503128aee..4f6cecb32a4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
@@ -215,7 +215,7 @@ public class AuditLogger {
case BATCH_INSERT_ROWS:
case BATCH_INSERT_ONE_DEVICE:
case MULTI_BATCH_INSERT:
- case PIPE_ENRICHED_INSERT:
+ case PIPE_ENRICHED:
case DELETE:
case SELECT_INTO:
case LOAD_FILES:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 53813fb93ea..845532bc96d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -27,14 +27,14 @@ import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -169,23 +169,8 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
@Override
public TSStatus visitPipeEnrichedInsert(PipeEnrichedInsertNode node,
DataRegion context) {
- final InsertNode realInsertNode = node.getInsertNode();
-
- realInsertNode.markAsGeneratedByPipe();
-
- if (realInsertNode instanceof InsertRowNode) {
- return visitInsertRow((InsertRowNode) realInsertNode, context);
- } else if (realInsertNode instanceof InsertTabletNode) {
- return visitInsertTablet((InsertTabletNode) realInsertNode, context);
- } else if (realInsertNode instanceof InsertRowsNode) {
- return visitInsertRows((InsertRowsNode) realInsertNode, context);
- } else if (realInsertNode instanceof InsertMultiTabletsNode) {
- return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode,
context);
- } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) {
- return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode)
realInsertNode, context);
- } else {
- return visitPlan(realInsertNode, context);
- }
+ node.getInsertNode().markAsGeneratedByPipe();
+ return node.getInsertNode().accept(this, context);
}
@Override
@@ -215,4 +200,10 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
return new TSStatus(TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode());
}
}
+
+ @Override
+ public TSStatus visitPipeEnrichedDeleteData(PipeEnrichedDeleteDataNode node,
DataRegion context) {
+ node.getDeleteDataNode().markAsGeneratedByPipe();
+ return visitDeleteData((DeleteDataNode) node.getDeleteDataNode(), context);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
index 904863ddcc6..a1496e7b6d7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
@@ -50,6 +50,8 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.vie
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.RollbackLogicalViewBlackListNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedConfigSchemaNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateAlignedTimeSeriesPlan;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateTimeSeriesPlan;
@@ -520,6 +522,18 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
}
}
+ @Override
+ public TSStatus visitPipeEnrichedWriteSchema(
+ PipeEnrichedWriteSchemaNode node, ISchemaRegion schemaRegion) {
+ return node.getWriteSchemaNode().accept(this, schemaRegion);
+ }
+
+ @Override
+ public TSStatus visitPipeEnrichedConfigSchema(
+ PipeEnrichedConfigSchemaNode node, ISchemaRegion schemaRegion) {
+ return node.getConfigSchemaNode().accept(this, schemaRegion);
+ }
+
@Override
public TSStatus visitPlan(PlanNode node, ISchemaRegion context) {
return null;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
index eb262704c68..9a2883e6935 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
@@ -36,7 +36,7 @@ import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.PipeConnector;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -159,7 +159,7 @@ public class WriteBackConnector implements PipeConnector {
private TSStatus executeStatement(InsertBaseStatement statement) {
return Coordinator.getInstance()
.execute(
- new PipeEnrichedInsertBaseStatement(statement),
+ new PipeEnrichedStatement(statement),
SessionManager.getInstance().requestQueryId(),
new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault().getId()),
"",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 03c58874292..128d59af02b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -48,13 +48,11 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.rpc.RpcUtils;
@@ -556,11 +554,7 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null
statement.");
}
- if (statement instanceof InsertBaseStatement) {
- statement = new PipeEnrichedInsertBaseStatement((InsertBaseStatement)
statement);
- } else if (statement instanceof LoadTsFileStatement) {
- statement = new PipeEnrichedLoadTsFileStatement((LoadTsFileStatement)
statement);
- }
+ statement = new PipeEnrichedStatement(statement);
final ExecutionResult result =
Coordinator.getInstance()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index 8c53c63bddb..7c6348fdbb4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -52,6 +52,9 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.Int
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.MeasurementGroup;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -59,7 +62,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
@@ -104,6 +106,10 @@ public class RegionWriteExecutor {
private final TriggerFireVisitor triggerFireVisitor;
+ private final WritePlanNodeExecutionVisitor executionVisitor;
+
+ private final PipeEnrichedWriteSchemaNodeExecutionVisitor
pipeExecutionVisitor;
+
public RegionWriteExecutor() {
dataRegionConsensus = DataRegionConsensusImpl.getInstance();
schemaRegionConsensus = SchemaRegionConsensusImpl.getInstance();
@@ -111,6 +117,8 @@ public class RegionWriteExecutor {
schemaEngine = SchemaEngine.getInstance();
clusterTemplateManager = ClusterTemplateManager.getInstance();
triggerFireVisitor = new TriggerFireVisitor();
+ executionVisitor = new WritePlanNodeExecutionVisitor();
+ pipeExecutionVisitor = new
PipeEnrichedWriteSchemaNodeExecutionVisitor(executionVisitor);
}
@TestOnly
@@ -127,6 +135,8 @@ public class RegionWriteExecutor {
this.schemaEngine = schemaEngine;
this.clusterTemplateManager = clusterTemplateManager;
this.triggerFireVisitor = triggerFireVisitor;
+ executionVisitor = new WritePlanNodeExecutionVisitor();
+ pipeExecutionVisitor = new
PipeEnrichedWriteSchemaNodeExecutionVisitor(executionVisitor);
}
@SuppressWarnings("squid:S1181")
@@ -134,7 +144,6 @@ public class RegionWriteExecutor {
try {
WritePlanNodeExecutionContext context =
new WritePlanNodeExecutionContext(groupId,
regionManager.getRegionLock(groupId));
- WritePlanNodeExecutionVisitor executionVisitor = new
WritePlanNodeExecutionVisitor();
return planNode.accept(executionVisitor, context);
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
@@ -246,13 +255,13 @@ public class RegionWriteExecutor {
}
}
- private TSStatus fireTriggerAndInsert(ConsensusGroupId groupId, PlanNode
planNode)
+ private TSStatus fireTriggerAndInsert(ConsensusGroupId groupId, InsertNode
insertNode)
throws ConsensusException {
long triggerCostTime = 0;
TSStatus status;
long startTime = System.nanoTime();
// fire Trigger before the insertion
- TriggerFireResult result = triggerFireVisitor.process(planNode,
TriggerEvent.BEFORE_INSERT);
+ TriggerFireResult result = triggerFireVisitor.process(insertNode,
TriggerEvent.BEFORE_INSERT);
triggerCostTime += (System.nanoTime() - startTime);
if (result.equals(TriggerFireResult.TERMINATION)) {
status =
@@ -261,14 +270,14 @@ public class RegionWriteExecutor {
"Failed to complete the insertion because trigger error before
the insertion.");
} else {
long startWriteTime = System.nanoTime();
- status = dataRegionConsensus.write(groupId, planNode);
+ status = dataRegionConsensus.write(groupId, insertNode);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleStorageCost(System.nanoTime() -
startWriteTime);
// fire Trigger after the insertion
startTime = System.nanoTime();
boolean hasFailedTriggerBeforeInsertion =
result.equals(TriggerFireResult.FAILED_NO_TERMINATION);
- result = triggerFireVisitor.process(planNode,
TriggerEvent.AFTER_INSERT);
+ result = triggerFireVisitor.process(insertNode,
TriggerEvent.AFTER_INSERT);
if (hasFailedTriggerBeforeInsertion ||
!result.equals(TriggerFireResult.SUCCESS)) {
status =
RpcUtils.getStatus(
@@ -281,6 +290,18 @@ public class RegionWriteExecutor {
return status;
}
+ @Override
+ public RegionExecutionResult visitPipeEnrichedDeleteData(
+ PipeEnrichedDeleteDataNode node, WritePlanNodeExecutionContext
context) {
+ // data deletion should block data insertion, especially when executed
for deleting timeseries
+ context.getRegionWriteValidationRWLock().writeLock().lock();
+ try {
+ return super.visitPipeEnrichedDeleteData(node, context);
+ } finally {
+ context.getRegionWriteValidationRWLock().writeLock().unlock();
+ }
+ }
+
@Override
public RegionExecutionResult visitDeleteData(
DeleteDataNode node, WritePlanNodeExecutionContext context) {
@@ -296,6 +317,13 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitCreateTimeSeries(
CreateTimeSeriesNode node, WritePlanNodeExecutionContext context) {
+ return executeCreateTimeSeries(node, context, false);
+ }
+
+ private RegionExecutionResult executeCreateTimeSeries(
+ CreateTimeSeriesNode node,
+ WritePlanNodeExecutionContext context,
+ boolean receivedFromPipe) {
ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
RegionExecutionResult result =
@@ -312,7 +340,9 @@ public class RegionWriteExecutor {
Collections.singletonList(node.getPath().getMeasurement()),
Collections.singletonList(node.getAlias()));
if (failingMeasurementMap.isEmpty()) {
- return super.visitCreateTimeSeries(node, context);
+ return receivedFromPipe
+ ? super.visitPipeEnrichedWriteSchema(new
PipeEnrichedWriteSchemaNode(node), context)
+ : super.visitCreateTimeSeries(node, context);
} else {
MetadataException metadataException = failingMeasurementMap.get(0);
LOGGER.error(METADATA_ERROR_MSG, metadataException);
@@ -328,13 +358,22 @@ public class RegionWriteExecutor {
context.getRegionWriteValidationRWLock().writeLock().unlock();
}
} else {
- return super.visitCreateTimeSeries(node, context);
+ return receivedFromPipe
+ ? super.visitPipeEnrichedWriteSchema(new
PipeEnrichedWriteSchemaNode(node), context)
+ : super.visitCreateTimeSeries(node, context);
}
}
@Override
public RegionExecutionResult visitCreateAlignedTimeSeries(
CreateAlignedTimeSeriesNode node, WritePlanNodeExecutionContext
context) {
+ return executeCreateAlignedTimeSeries(node, context, false);
+ }
+
+ private RegionExecutionResult executeCreateAlignedTimeSeries(
+ CreateAlignedTimeSeriesNode node,
+ WritePlanNodeExecutionContext context,
+ boolean receivedFromPipe) {
ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
RegionExecutionResult result =
@@ -350,7 +389,9 @@ public class RegionWriteExecutor {
schemaRegion.checkMeasurementExistence(
node.getDevicePath(), node.getMeasurements(),
node.getAliasList());
if (failingMeasurementMap.isEmpty()) {
- return super.visitCreateAlignedTimeSeries(node, context);
+ return receivedFromPipe
+ ? super.visitPipeEnrichedWriteSchema(new
PipeEnrichedWriteSchemaNode(node), context)
+ : super.visitCreateAlignedTimeSeries(node, context);
} else {
MetadataException metadataException =
failingMeasurementMap.values().iterator().next();
LOGGER.error(METADATA_ERROR_MSG, metadataException);
@@ -366,13 +407,22 @@ public class RegionWriteExecutor {
context.getRegionWriteValidationRWLock().writeLock().unlock();
}
} else {
- return super.visitCreateAlignedTimeSeries(node, context);
+ return receivedFromPipe
+ ? super.visitPipeEnrichedWriteSchema(new
PipeEnrichedWriteSchemaNode(node), context)
+ : super.visitCreateAlignedTimeSeries(node, context);
}
}
@Override
public RegionExecutionResult visitCreateMultiTimeSeries(
CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context)
{
+ return executeCreateMultiTimeSeries(node, context, false);
+ }
+
+ private RegionExecutionResult executeCreateMultiTimeSeries(
+ CreateMultiTimeSeriesNode node,
+ WritePlanNodeExecutionContext context,
+ boolean receivedFromPipe) {
ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
RegionExecutionResult result;
@@ -400,7 +450,8 @@ public class RegionWriteExecutor {
}
RegionExecutionResult failingResult =
- registerTimeSeries(measurementGroupMap, node, context,
failingStatus);
+ registerTimeSeries(
+ measurementGroupMap, node, context, failingStatus,
receivedFromPipe);
if (failingResult != null) {
return failingResult;
@@ -416,7 +467,9 @@ public class RegionWriteExecutor {
context.getRegionWriteValidationRWLock().writeLock().unlock();
}
} else {
- return super.visitCreateMultiTimeSeries(node, context);
+ return receivedFromPipe
+ ? super.visitPipeEnrichedWriteSchema(new
PipeEnrichedWriteSchemaNode(node), context)
+ : super.visitCreateMultiTimeSeries(node, context);
}
}
@@ -455,10 +508,14 @@ public class RegionWriteExecutor {
Map<PartialPath, MeasurementGroup> measurementGroupMap,
CreateMultiTimeSeriesNode node,
WritePlanNodeExecutionContext context,
- List<TSStatus> failingStatus) {
+ List<TSStatus> failingStatus,
+ boolean receivedFromPipe) {
if (!measurementGroupMap.isEmpty()) {
// try registering the rest timeseries
- RegionExecutionResult executionResult =
super.visitCreateMultiTimeSeries(node, context);
+ RegionExecutionResult executionResult =
+ receivedFromPipe
+ ? super.visitPipeEnrichedWriteSchema(new
PipeEnrichedWriteSchemaNode(node), context)
+ : super.visitCreateMultiTimeSeries(node, context);
if (failingStatus.isEmpty()) {
return executionResult;
}
@@ -476,6 +533,13 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitInternalCreateTimeSeries(
InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext
context) {
+ return executeInternalCreateTimeSeries(node, context, false);
+ }
+
+ private RegionExecutionResult executeInternalCreateTimeSeries(
+ InternalCreateTimeSeriesNode node,
+ WritePlanNodeExecutionContext context,
+ boolean receivedFromPipe) {
ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
RegionExecutionResult result =
@@ -519,20 +583,32 @@ public class RegionWriteExecutor {
measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
return processExecutionResultOfInternalCreateSchema(
- super.visitInternalCreateTimeSeries(node, context),
+ receivedFromPipe
+ ? super.visitPipeEnrichedWriteSchema(
+ new PipeEnrichedWriteSchemaNode(node), context)
+ : super.visitInternalCreateTimeSeries(node, context),
failingStatus,
alreadyExistingStatus);
} finally {
context.getRegionWriteValidationRWLock().writeLock().unlock();
}
} else {
- return super.visitInternalCreateTimeSeries(node, context);
+ return receivedFromPipe
+ ? super.visitPipeEnrichedWriteSchema(new
PipeEnrichedWriteSchemaNode(node), context)
+ : super.visitInternalCreateTimeSeries(node, context);
}
}
@Override
public RegionExecutionResult visitInternalCreateMultiTimeSeries(
InternalCreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext
context) {
+ return executeInternalCreateMultiTimeSeries(node, context, false);
+ }
+
+ private RegionExecutionResult executeInternalCreateMultiTimeSeries(
+ InternalCreateMultiTimeSeriesNode node,
+ WritePlanNodeExecutionContext context,
+ boolean receivedFromPipe) {
ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
RegionExecutionResult result;
@@ -586,14 +662,19 @@ public class RegionWriteExecutor {
}
return processExecutionResultOfInternalCreateSchema(
- super.visitInternalCreateMultiTimeSeries(node, context),
+ receivedFromPipe
+ ? super.visitPipeEnrichedWriteSchema(
+ new PipeEnrichedWriteSchemaNode(node), context)
+ : super.visitInternalCreateMultiTimeSeries(node, context),
failingStatus,
alreadyExistingStatus);
} finally {
context.getRegionWriteValidationRWLock().writeLock().unlock();
}
} else {
- return super.visitInternalCreateMultiTimeSeries(node, context);
+ return receivedFromPipe
+ ? super.visitPipeEnrichedWriteSchema(new
PipeEnrichedWriteSchemaNode(node), context)
+ : super.visitInternalCreateMultiTimeSeries(node, context);
}
}
@@ -676,17 +757,22 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitAlterTimeSeries(
AlterTimeSeriesNode node, WritePlanNodeExecutionContext context) {
+ return executeAlterTimeSeries(node, context, false);
+ }
+
+ private RegionExecutionResult executeAlterTimeSeries(
+ AlterTimeSeriesNode node, WritePlanNodeExecutionContext context,
boolean receivedFromPipe) {
ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
try {
MeasurementPath measurementPath =
schemaRegion.fetchMeasurementPath(node.getPath());
- if (node.isAlterView()) {
- if (!measurementPath.getMeasurementSchema().isLogicalView()) {
- throw new MetadataException(
- String.format("%s is not view.",
measurementPath.getFullPath()));
- }
+ if (node.isAlterView() &&
!measurementPath.getMeasurementSchema().isLogicalView()) {
+ throw new MetadataException(
+ String.format("%s is not view.", measurementPath.getFullPath()));
}
- return super.visitAlterTimeSeries(node, context);
+ return receivedFromPipe
+ ? super.visitPipeEnrichedWriteSchema(new
PipeEnrichedWriteSchemaNode(node), context)
+ : super.visitAlterTimeSeries(node, context);
} catch (MetadataException e) {
RegionExecutionResult result = new RegionExecutionResult();
result.setAccepted(true);
@@ -699,6 +785,13 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitActivateTemplate(
ActivateTemplateNode node, WritePlanNodeExecutionContext context) {
+ return executeActivateTemplate(node, context, false);
+ }
+
+ private RegionExecutionResult executeActivateTemplate(
+ ActivateTemplateNode node,
+ WritePlanNodeExecutionContext context,
+ boolean receivedFromPipe) {
// activate template operation shall be blocked by unset template check
context.getRegionWriteValidationRWLock().readLock().lock();
try {
@@ -722,7 +815,13 @@ public class RegionWriteExecutor {
RegionExecutionResult result =
checkQuotaBeforeCreatingTimeSeries(
schemaRegion, node.getActivatePath(),
templateSetInfo.left.getMeasurementNumber());
- return result == null ? super.visitActivateTemplate(node, context) :
result;
+ if (result == null) {
+ return receivedFromPipe
+ ? super.visitPipeEnrichedWriteSchema(new
PipeEnrichedWriteSchemaNode(node), context)
+ : super.visitActivateTemplate(node, context);
+ } else {
+ return result;
+ }
} finally {
context.getRegionWriteValidationRWLock().readLock().unlock();
}
@@ -731,6 +830,13 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitBatchActivateTemplate(
BatchActivateTemplateNode node, WritePlanNodeExecutionContext context)
{
+ return executeBatchActivateTemplate(node, context, false);
+ }
+
+ private RegionExecutionResult executeBatchActivateTemplate(
+ BatchActivateTemplateNode node,
+ WritePlanNodeExecutionContext context,
+ boolean receivedFromPipe) {
// activate template operation shall be blocked by unset template check
context.getRegionWriteValidationRWLock().readLock().lock();
try {
@@ -760,7 +866,9 @@ public class RegionWriteExecutor {
}
}
- return super.visitBatchActivateTemplate(node, context);
+ return receivedFromPipe
+ ? super.visitPipeEnrichedWriteSchema(new
PipeEnrichedWriteSchemaNode(node), context)
+ : super.visitBatchActivateTemplate(node, context);
} finally {
context.getRegionWriteValidationRWLock().readLock().unlock();
}
@@ -769,6 +877,13 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitInternalBatchActivateTemplate(
InternalBatchActivateTemplateNode node, WritePlanNodeExecutionContext
context) {
+ return executeInternalBatchActivateTemplate(node, context, false);
+ }
+
+ private RegionExecutionResult executeInternalBatchActivateTemplate(
+ InternalBatchActivateTemplateNode node,
+ WritePlanNodeExecutionContext context,
+ boolean receivedFromPipe) {
// activate template operation shall be blocked by unset template check
context.getRegionWriteValidationRWLock().readLock().lock();
try {
@@ -801,7 +916,9 @@ public class RegionWriteExecutor {
}
}
- return super.visitInternalBatchActivateTemplate(node, context);
+ return receivedFromPipe
+ ? super.visitPipeEnrichedWriteSchema(new
PipeEnrichedWriteSchemaNode(node), context)
+ : super.visitInternalBatchActivateTemplate(node, context);
} finally {
context.getRegionWriteValidationRWLock().readLock().unlock();
}
@@ -810,12 +927,19 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitCreateLogicalView(
CreateLogicalViewNode node, WritePlanNodeExecutionContext context) {
+ return executeCreateLogicalView(node, context, false);
+ }
+
+ private RegionExecutionResult executeCreateLogicalView(
+ CreateLogicalViewNode node,
+ WritePlanNodeExecutionContext context,
+ boolean receivedFromPipe) {
ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
if
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
{
context.getRegionWriteValidationRWLock().writeLock().lock();
try {
- // step 1. make sure all target paths are NOT exist.
+ // step 1. make sure all target paths do NOT exist.
List<PartialPath> targetPaths = node.getViewPathList();
List<MetadataException> failingMetadataException = new ArrayList<>();
for (PartialPath thisPath : targetPaths) {
@@ -825,12 +949,12 @@ public class RegionWriteExecutor {
thisPath.getDevicePath(),
Collections.singletonList(thisPath.getMeasurement()),
null);
- // merge all exception into one map
+ // merge all exceptions into one map
for (Map.Entry<Integer, MetadataException> entry :
failingMeasurementMap.entrySet()) {
failingMetadataException.add(entry.getValue());
}
}
- // if there is some exception, handle each exception and return
first of them.
+ // if there are some exceptions, handle each exception and return
first of them.
if (!failingMetadataException.isEmpty()) {
MetadataException metadataException =
failingMetadataException.get(0);
LOGGER.error(METADATA_ERROR_MSG, metadataException);
@@ -842,16 +966,102 @@ public class RegionWriteExecutor {
metadataException.getErrorCode(),
metadataException.getMessage()));
return result;
}
- // step 2. make sure all source paths are existed.
- return super.visitCreateLogicalView(node, context);
+ // step 2. make sure all source paths exist.
+ return receivedFromPipe
+ ? super.visitPipeEnrichedWriteSchema(new
PipeEnrichedWriteSchemaNode(node), context)
+ : super.visitCreateLogicalView(node, context);
} finally {
context.getRegionWriteValidationRWLock().writeLock().unlock();
}
} else {
- return super.visitCreateLogicalView(node, context);
+ return receivedFromPipe
+ ? super.visitPipeEnrichedWriteSchema(new
PipeEnrichedWriteSchemaNode(node), context)
+ : super.visitCreateLogicalView(node, context);
}
// end of visitCreateLogicalView
}
+
+ @Override
+ public RegionExecutionResult visitPipeEnrichedWriteSchema(
+ PipeEnrichedWriteSchemaNode node, WritePlanNodeExecutionContext
context) {
+ return node.getWriteSchemaNode().accept(pipeExecutionVisitor, context);
+ }
+ }
+
+ private class PipeEnrichedWriteSchemaNodeExecutionVisitor
+ extends PlanVisitor<RegionExecutionResult,
WritePlanNodeExecutionContext> {
+
+ WritePlanNodeExecutionVisitor visitor;
+
+ private
PipeEnrichedWriteSchemaNodeExecutionVisitor(WritePlanNodeExecutionVisitor
visitor) {
+ this.visitor = visitor;
+ }
+
+ @Override
+ public RegionExecutionResult visitPlan(PlanNode node,
WritePlanNodeExecutionContext context) {
+ throw new UnsupportedOperationException(
+ "PipeEnrichedWriteSchemaNodeExecutionVisitor does not support
visiting general plan.");
+ }
+
+ @Override
+ public RegionExecutionResult visitCreateTimeSeries(
+ CreateTimeSeriesNode node, WritePlanNodeExecutionContext context) {
+ return visitor.executeCreateTimeSeries(node, context, true);
+ }
+
+ @Override
+ public RegionExecutionResult visitCreateAlignedTimeSeries(
+ CreateAlignedTimeSeriesNode node, WritePlanNodeExecutionContext
context) {
+ return visitor.executeCreateAlignedTimeSeries(node, context, true);
+ }
+
+ @Override
+ public RegionExecutionResult visitCreateMultiTimeSeries(
+ CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context)
{
+ return visitor.executeCreateMultiTimeSeries(node, context, true);
+ }
+
+ @Override
+ public RegionExecutionResult visitInternalCreateTimeSeries(
+ InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext
context) {
+ return visitor.executeInternalCreateTimeSeries(node, context, true);
+ }
+
+ @Override
+ public RegionExecutionResult visitInternalCreateMultiTimeSeries(
+ InternalCreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext
context) {
+ return visitor.executeInternalCreateMultiTimeSeries(node, context, true);
+ }
+
+ @Override
+ public RegionExecutionResult visitAlterTimeSeries(
+ AlterTimeSeriesNode node, WritePlanNodeExecutionContext context) {
+ return visitor.executeAlterTimeSeries(node, context, true);
+ }
+
+ @Override
+ public RegionExecutionResult visitActivateTemplate(
+ ActivateTemplateNode node, WritePlanNodeExecutionContext context) {
+ return visitor.executeActivateTemplate(node, context, true);
+ }
+
+ @Override
+ public RegionExecutionResult visitBatchActivateTemplate(
+ BatchActivateTemplateNode node, WritePlanNodeExecutionContext context)
{
+ return visitor.executeBatchActivateTemplate(node, context, true);
+ }
+
+ @Override
+ public RegionExecutionResult visitInternalBatchActivateTemplate(
+ InternalBatchActivateTemplateNode node, WritePlanNodeExecutionContext
context) {
+ return visitor.executeInternalBatchActivateTemplate(node, context, true);
+ }
+
+ @Override
+ public RegionExecutionResult visitCreateLogicalView(
+ CreateLogicalViewNode node, WritePlanNodeExecutionContext context) {
+ return visitor.executeCreateLogicalView(node, context, true);
+ }
}
private static class WritePlanNodeExecutionContext {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 89842c3d1af..6ddf58895ac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -100,8 +100,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -133,6 +131,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ShowPath
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ShowSchemaTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.ShowLogicalViewStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
@@ -2570,33 +2569,13 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
@Override
- public Analysis visitPipeEnrichedInsert(
- PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement,
MPPQueryContext context) {
- Analysis analysis;
-
- final InsertBaseStatement insertBaseStatement =
- pipeEnrichedInsertBaseStatement.getInsertBaseStatement();
- if (insertBaseStatement instanceof InsertTabletStatement) {
- analysis = visitInsertTablet((InsertTabletStatement)
insertBaseStatement, context);
- } else if (insertBaseStatement instanceof InsertMultiTabletsStatement) {
- analysis =
- visitInsertMultiTablets((InsertMultiTabletsStatement)
insertBaseStatement, context);
- } else if (insertBaseStatement instanceof InsertRowStatement) {
- analysis = visitInsertRow((InsertRowStatement) insertBaseStatement,
context);
- } else if (insertBaseStatement instanceof InsertRowsStatement) {
- analysis = visitInsertRows((InsertRowsStatement) insertBaseStatement,
context);
- } else if (insertBaseStatement instanceof InsertRowsOfOneDeviceStatement) {
- analysis =
- visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceStatement)
insertBaseStatement, context);
- } else {
- throw new UnsupportedOperationException(
- "Unsupported insert statement type: " +
insertBaseStatement.getClass().getName());
- }
+ public Analysis visitPipeEnrichedStatement(
+ PipeEnrichedStatement pipeEnrichedStatement, MPPQueryContext context) {
+ Analysis analysis = pipeEnrichedStatement.getInnerStatement().accept(this,
context);
// statement may be changed because of logical view
- pipeEnrichedInsertBaseStatement.setInsertBaseStatement(
- (InsertBaseStatement) analysis.getStatement());
- analysis.setStatement(pipeEnrichedInsertBaseStatement);
+ pipeEnrichedStatement.setInnerStatement(analysis.getStatement());
+ analysis.setStatement(pipeEnrichedStatement);
return analysis;
}
@@ -2653,15 +2632,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
.analyzeFileByFile();
}
- @Override
- public Analysis visitPipeEnrichedLoadFile(
- PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement,
MPPQueryContext context) {
- final Analysis analysis =
-
visitLoadFile(pipeEnrichedLoadTsFileStatement.getLoadTsFileStatement(),
context);
- analysis.setStatement(pipeEnrichedLoadTsFileStatement);
- return analysis;
- }
-
/** get analysis according to statement and params */
private Analysis getAnalysisForWriting(
Analysis analysis, List<DataPartitionQueryParam>
dataPartitionQueryParams, String userName) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index ce33e169b36..9af8cfa867b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -65,7 +65,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.rpc.RpcUtils;
@@ -318,7 +318,11 @@ public class QueryExecution implements IQueryExecution {
private void schedule() {
final long startTime = System.nanoTime();
- if (rawStatement instanceof LoadTsFileStatement) {
+ boolean isPipeEnrichedTsFileLoad =
+ rawStatement instanceof PipeEnrichedStatement
+ && ((PipeEnrichedStatement) rawStatement).getInnerStatement()
+ instanceof LoadTsFileStatement;
+ if (rawStatement instanceof LoadTsFileStatement ||
isPipeEnrichedTsFileLoad) {
this.scheduler =
new LoadTsFileScheduler(
distributedPlan,
@@ -326,7 +330,7 @@ public class QueryExecution implements IQueryExecution {
stateMachine,
syncInternalServiceClientManager,
partitionFetcher,
- rawStatement instanceof PipeEnrichedLoadTsFileStatement);
+ isPipeEnrichedTsFileLoad);
this.scheduler.start();
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 090c2a55a6f..e55521ea107 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import
org.apache.iotdb.db.queryengine.plan.expression.visitor.TransformToViewExpressionVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
@@ -43,6 +44,9 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.Int
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.MeasurementGroup;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
@@ -51,20 +55,16 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.iotdb.db.queryengine.plan.statement.crud.DeleteDataStatement;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -87,6 +87,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchAct
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ShowPathsUsingTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.ShowLogicalViewStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -558,37 +559,20 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
}
@Override
- public PlanNode visitPipeEnrichedInsert(
- PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement,
MPPQueryContext context) {
- InsertNode insertNode;
-
- final InsertBaseStatement insertBaseStatement =
- pipeEnrichedInsertBaseStatement.getInsertBaseStatement();
- if (insertBaseStatement instanceof InsertRowStatement) {
- insertNode =
- (InsertRowNode) visitInsertRow((InsertRowStatement)
insertBaseStatement, context);
- } else if (insertBaseStatement instanceof InsertRowsStatement) {
- insertNode =
- (InsertRowsNode) visitInsertRows((InsertRowsStatement)
insertBaseStatement, context);
- } else if (insertBaseStatement instanceof InsertRowsOfOneDeviceStatement) {
- insertNode =
- (InsertRowsOfOneDeviceNode)
- visitInsertRowsOfOneDevice(
- (InsertRowsOfOneDeviceStatement) insertBaseStatement,
context);
- } else if (insertBaseStatement instanceof InsertTabletStatement) {
- insertNode =
- (InsertTabletNode)
- visitInsertTablet((InsertTabletStatement) insertBaseStatement,
context);
- } else if (insertBaseStatement instanceof InsertMultiTabletsStatement) {
- insertNode =
- (InsertMultiTabletsNode)
- visitInsertMultiTablets((InsertMultiTabletsStatement)
insertBaseStatement, context);
- } else {
- throw new UnsupportedOperationException(
- "Unsupported insert statement type: " +
insertBaseStatement.getClass().getName());
+ public PlanNode visitPipeEnrichedStatement(
+ PipeEnrichedStatement pipeEnrichedStatement, MPPQueryContext context) {
+ WritePlanNode node =
+ (WritePlanNode) pipeEnrichedStatement.getInnerStatement().accept(this,
context);
+
+ if (node instanceof LoadTsFileNode) {
+ return node;
+ } else if (node instanceof InsertNode) {
+ return new PipeEnrichedInsertNode((InsertNode) node);
+ } else if (node instanceof DeleteDataNode) {
+ return new PipeEnrichedDeleteDataNode((DeleteDataNode) node);
}
- return new PipeEnrichedInsertNode(insertNode);
+ return new PipeEnrichedWriteSchemaNode(node);
}
@Override
@@ -597,12 +581,6 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
context.getQueryId().genPlanNodeId(),
loadTsFileStatement.getResources());
}
- @Override
- public PlanNode visitPipeEnrichedLoadFile(
- PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement,
MPPQueryContext context) {
- return
visitLoadFile(pipeEnrichedLoadTsFileStatement.getLoadTsFileStatement(),
context);
- }
-
@Override
public PlanNode visitShowTimeSeries(
ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext
context) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
index 2b73576ea03..18f678974b7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
@@ -46,6 +46,8 @@ public abstract class PlanNode implements IConsensusRequest {
protected PlanNodeId id;
+ protected boolean isGeneratedByPipe = false;
+
protected PlanNode(PlanNodeId id) {
requireNonNull(id, "id is null");
this.id = id;
@@ -59,6 +61,14 @@ public abstract class PlanNode implements IConsensusRequest {
this.id = id;
}
+ public boolean isGeneratedByPipe() {
+ return isGeneratedByPipe;
+ }
+
+ public void markAsGeneratedByPipe() {
+ isGeneratedByPipe = true;
+ }
+
public abstract List<PlanNode> getChildren();
public abstract void addChild(PlanNode child);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index ea819b466f5..a17ea3493fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -56,6 +56,10 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.vie
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.RollbackLogicalViewBlackListNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedConfigSchemaNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
@@ -97,7 +101,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataInputStream;
@@ -189,7 +192,11 @@ public enum PlanNodeType {
LAST_QUERY_TRANSFORM((short) 81),
TOP_K((short) 82),
- COLUMN_INJECT((short) 83);
+ COLUMN_INJECT((short) 83),
+ PIPE_ENRICHED_DELETE_DATA((short) 84),
+ PIPE_ENRICHED_WRITE_SCHEMA((short) 85),
+ PIPE_ENRICHED_DELETE_SCHEMA((short) 86),
+ ;
public static final int BYTES = Short.BYTES;
@@ -404,6 +411,13 @@ public enum PlanNodeType {
return TopKNode.deserialize(buffer);
case 83:
return ColumnInjectNode.deserialize(buffer);
+ case 84:
+ return PipeEnrichedDeleteDataNode.deserialize(buffer);
+ case 85:
+ return PipeEnrichedWriteSchemaNode.deserialize(buffer);
+ case 86:
+ return PipeEnrichedConfigSchemaNode.deserialize(buffer);
+
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 9dbb3df3b57..104242486d7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -53,6 +53,10 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.vie
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.RollbackLogicalViewBlackListNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedConfigSchemaNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
@@ -97,7 +101,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
public abstract class PlanVisitor<R, C> {
@@ -439,11 +442,27 @@ public abstract class PlanVisitor<R, C> {
return visitPlan(node, context);
}
+ public R visitDeleteData(DeleteDataNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+ // Pipe Enriched Node
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+
public R visitPipeEnrichedInsert(PipeEnrichedInsertNode node, C context) {
return visitPlan(node, context);
}
- public R visitDeleteData(DeleteDataNode node, C context) {
+ public R visitPipeEnrichedDeleteData(PipeEnrichedDeleteDataNode node, C
context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitPipeEnrichedWriteSchema(PipeEnrichedWriteSchemaNode node, C
context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitPipeEnrichedConfigSchema(PipeEnrichedConfigSchemaNode node, C
context) {
return visitPlan(node, context);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index 23a5620a611..997f5bd89e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -88,7 +89,11 @@ public class LoadTsFileNode extends WritePlanNode {
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
List<WritePlanNode> res = new ArrayList<>();
- LoadTsFileStatement statement = (LoadTsFileStatement)
analysis.getStatement();
+ LoadTsFileStatement statement =
+ analysis.getStatement() instanceof PipeEnrichedStatement
+ ? (LoadTsFileStatement)
+ ((PipeEnrichedStatement)
analysis.getStatement()).getInnerStatement()
+ : (LoadTsFileStatement) analysis.getStatement();
for (int i = 0; i < resources.size(); i++) {
res.add(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 43b8f9c6510..060acc2f9ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -187,7 +187,7 @@ public class CreateTimeSeriesNode extends WritePlanNode
implements ICreateTimeSe
public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
String id;
- PartialPath path = null;
+ PartialPath path;
TSDataType dataType;
TSEncoding encoding;
CompressionType compressor;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/AlterLogicalViewNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/AlterLogicalViewNode.java
index 4122ebfb461..664575e1bf7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/AlterLogicalViewNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/AlterLogicalViewNode.java
@@ -19,16 +19,13 @@
package
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
-import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -41,7 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class AlterLogicalViewNode extends WritePlanNode {
+public class AlterLogicalViewNode extends PlanNode {
/**
* A map from target path to source expression. Yht target path is the name
of this logical view,
@@ -49,12 +46,6 @@ public class AlterLogicalViewNode extends WritePlanNode {
*/
private final Map<PartialPath, ViewExpression> viewPathToSourceMap;
- /**
- * This variable will be set in function splitByPartition() according to
analysis. And it will be
- * set when creating new split nodes.
- */
- private final TRegionReplicaSet regionReplicaSet = null;
-
public AlterLogicalViewNode(PlanNodeId id, Map<PartialPath, ViewExpression>
viewPathToSourceMap) {
super(id);
this.viewPathToSourceMap = viewPathToSourceMap;
@@ -71,11 +62,6 @@ public class AlterLogicalViewNode extends WritePlanNode {
return visitor.visitAlterLogicalView(this, context);
}
- @Override
- public TRegionReplicaSet getRegionReplicaSet() {
- return this.regionReplicaSet;
- }
-
@Override
public List<PlanNode> getChildren() {
return new ArrayList<>();
@@ -160,32 +146,5 @@ public class AlterLogicalViewNode extends WritePlanNode {
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
return new AlterLogicalViewNode(planNodeId, viewPathToSourceMap);
}
-
- @Override
- public List<WritePlanNode> splitByPartition(Analysis analysis) {
- Map<TRegionReplicaSet, Map<PartialPath, ViewExpression>> splitMap = new
HashMap<>();
- for (Map.Entry<PartialPath, ViewExpression> entry :
this.viewPathToSourceMap.entrySet()) {
- // for each entry in the map for target path to source expression,
- // build a map from TRegionReplicaSet to this entry.
- // Please note that getSchemaRegionReplicaSet needs a device path as
parameter.
- TRegionReplicaSet regionReplicaSet =
-
analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(entry.getKey().getDevice());
-
- // create a map if the key(regionReplicaSet) is not exists,
- // then put this entry into this map(from regionReplicaSet to this entry)
- splitMap
- .computeIfAbsent(regionReplicaSet, k -> new HashMap<>())
- .put(entry.getKey(), entry.getValue());
- }
-
- // split this node into several nodes according to their regionReplicaSet
- List<WritePlanNode> result = new ArrayList<>();
- for (Map.Entry<TRegionReplicaSet, Map<PartialPath, ViewExpression>> entry :
- splitMap.entrySet()) {
- // for each entry in splitMap, create a plan node.
- result.add(new CreateLogicalViewNode(getPlanNodeId(), entry.getValue(),
entry.getKey()));
- }
- return result;
- }
// endregion
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedConfigSchemaNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedConfigSchemaNode.java
new file mode 100644
index 00000000000..1ff6cabb7a2
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedConfigSchemaNode.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe;
+
+import
org.apache.iotdb.db.consensus.statemachine.schemaregion.SchemaExecutionVisitor;
+import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ConstructSchemaBlackListNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeactivateTemplateNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode;
+import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * This class aims to mark the {@link PlanNode} of schema operation assigned
by configNode to
+ * selectively forward the operations, {@link Statement}s of which extending
{@link
+ * IConfigStatement}. The handling logic is defined only in {@link
SchemaExecutionVisitor} to
+ * execute deletion logic and mark it as received, since the request does not
need to pass {@link
+ * RegionWriteExecutor} and is assigned directly to regions by configNode.
+ *
+ * <p>
+ *
+ * <p>Notes: The contents includes all the collected {@link PlanNode}s of
schema deletion:
+ *
+ * <p>- Timeseries: {@link DeleteTimeSeriesNode}
+ *
+ * <p>- Template: {@link DeactivateTemplateNode}
+ *
+ * <p>- LogicalView: {@link AlterLogicalViewNode}, {@link
DeleteLogicalViewNode}
+ *
+ * <p>Intermediate nodes like {@link ConstructSchemaBlackListNode} will not be
included since they
+ * will not be collected by pipe.
+ */
+public class PipeEnrichedConfigSchemaNode extends PlanNode {
+
+ private final PlanNode configSchemaNode;
+
+ public PipeEnrichedConfigSchemaNode(PlanNode configSchemaNode) {
+ super(configSchemaNode.getPlanNodeId());
+ this.configSchemaNode = configSchemaNode;
+ }
+
+ public PlanNode getConfigSchemaNode() {
+ return configSchemaNode;
+ }
+
+ @Override
+ public boolean isGeneratedByPipe() {
+ return configSchemaNode.isGeneratedByPipe();
+ }
+
+ @Override
+ public void markAsGeneratedByPipe() {
+ configSchemaNode.markAsGeneratedByPipe();
+ }
+
+ @Override
+ public PlanNodeId getPlanNodeId() {
+ return configSchemaNode.getPlanNodeId();
+ }
+
+ @Override
+ public void setPlanNodeId(PlanNodeId id) {
+ configSchemaNode.setPlanNodeId(id);
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return configSchemaNode.getChildren();
+ }
+
+ @Override
+ public void addChild(PlanNode child) {
+ configSchemaNode.addChild(child);
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new PipeEnrichedConfigSchemaNode(configSchemaNode.clone());
+ }
+
+ @Override
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new PipeEnrichedConfigSchemaNode(
+ configSchemaNode.createSubNode(subNodeId, startIndex, endIndex));
+ }
+
+ @Override
+ public PlanNode cloneWithChildren(List<PlanNode> children) {
+ return new
PipeEnrichedConfigSchemaNode(configSchemaNode.cloneWithChildren(children));
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return configSchemaNode.allowedChildCount();
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return configSchemaNode.getOutputColumnNames();
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitPipeEnrichedConfigSchema(this, context);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.PIPE_ENRICHED_DELETE_SCHEMA.serialize(byteBuffer);
+ configSchemaNode.serialize(byteBuffer);
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ PlanNodeType.PIPE_ENRICHED_DELETE_SCHEMA.serialize(stream);
+ configSchemaNode.serialize(stream);
+ }
+
+ public static PipeEnrichedConfigSchemaNode deserialize(ByteBuffer buffer) {
+ return new PipeEnrichedConfigSchemaNode(PlanNodeType.deserialize(buffer));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof PipeEnrichedConfigSchemaNode
+ && configSchemaNode.equals(((PipeEnrichedConfigSchemaNode)
o).configSchemaNode);
+ }
+
+ @Override
+ public int hashCode() {
+ return configSchemaNode.hashCode();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
new file mode 100644
index 00000000000..29d2e011c58
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import
org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor;
+import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * This class aims to mark the {@link DeleteDataNode} to enable selectively
forwarding pipe
+ * deletion. The handling logic is defined in:
+ *
+ * <p>1.{@link RegionWriteExecutor}, to serialize and reach the target data
region.
+ *
+ * <p>2.{@link DataExecutionVisitor}, to actually write data on data region
and mark it as received
+ * from pipe.
+ */
+public class PipeEnrichedDeleteDataNode extends DeleteDataNode {
+
+ private final DeleteDataNode deleteDataNode;
+
+ public PipeEnrichedDeleteDataNode(DeleteDataNode deleteDataNode) {
+ super(
+ deleteDataNode.getPlanNodeId(),
+ deleteDataNode.getPathList(),
+ deleteDataNode.getDeleteStartTime(),
+ deleteDataNode.getDeleteEndTime());
+ this.deleteDataNode = deleteDataNode;
+ }
+
+ public PlanNode getDeleteDataNode() {
+ return deleteDataNode;
+ }
+
+ @Override
+ public boolean isGeneratedByPipe() {
+ return deleteDataNode.isGeneratedByPipe();
+ }
+
+ @Override
+ public void markAsGeneratedByPipe() {
+ deleteDataNode.markAsGeneratedByPipe();
+ }
+
+ @Override
+ public PlanNodeId getPlanNodeId() {
+ return deleteDataNode.getPlanNodeId();
+ }
+
+ @Override
+ public void setPlanNodeId(PlanNodeId id) {
+ deleteDataNode.setPlanNodeId(id);
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return deleteDataNode.getChildren();
+ }
+
+ @Override
+ public void addChild(PlanNode child) {
+ deleteDataNode.addChild(child);
+ }
+
+ @Override
+ public DeleteDataNode clone() {
+ return new PipeEnrichedDeleteDataNode((DeleteDataNode)
deleteDataNode.clone());
+ }
+
+ @Override
+ public DeleteDataNode createSubNode(int subNodeId, int startIndex, int
endIndex) {
+ return new PipeEnrichedDeleteDataNode(
+ (DeleteDataNode) deleteDataNode.createSubNode(subNodeId, startIndex,
endIndex));
+ }
+
+ @Override
+ public PlanNode cloneWithChildren(List<PlanNode> children) {
+ return new PipeEnrichedDeleteDataNode(
+ (DeleteDataNode) deleteDataNode.cloneWithChildren(children));
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return deleteDataNode.allowedChildCount();
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return deleteDataNode.getOutputColumnNames();
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitPipeEnrichedDeleteData(this, context);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.PIPE_ENRICHED_DELETE_DATA.serialize(byteBuffer);
+ deleteDataNode.serialize(byteBuffer);
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ PlanNodeType.PIPE_ENRICHED_DELETE_DATA.serialize(stream);
+ deleteDataNode.serialize(stream);
+ }
+
+ public static PipeEnrichedDeleteDataNode deserialize(ByteBuffer buffer) {
+ return new PipeEnrichedDeleteDataNode((DeleteDataNode)
PlanNodeType.deserialize(buffer));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof PipeEnrichedDeleteDataNode
+ && deleteDataNode.equals(((PipeEnrichedDeleteDataNode)
o).deleteDataNode);
+ }
+
+ @Override
+ public int hashCode() {
+ return deleteDataNode.hashCode();
+ }
+
+ @Override
+ public TRegionReplicaSet getRegionReplicaSet() {
+ return deleteDataNode.getRegionReplicaSet();
+ }
+
+ @Override
+ public List<WritePlanNode> splitByPartition(Analysis analysis) {
+ return deleteDataNode.splitByPartition(analysis).stream()
+ .map(
+ plan ->
+ plan instanceof PipeEnrichedDeleteDataNode
+ ? plan
+ : new PipeEnrichedDeleteDataNode((DeleteDataNode) plan))
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
similarity index 89%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
index 28d217176f4..2f2408f8097 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
@@ -17,18 +17,22 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.path.PartialPath;
+import
org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor;
+import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IDeviceID;
+import org.apache.iotdb.db.trigger.executor.TriggerFireVisitor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -38,6 +42,17 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;
+/**
+ * This class aims to mark the {@link InsertNode} to prevent forwarding pipe
insertions. The
+ * handling logic is defined in:
+ *
+ * <p>1.{@link RegionWriteExecutor}, to serialize and reach the target data
region.
+ *
+ * <p>2.{@link TriggerFireVisitor}, to fire the trigger before writing to data
region.
+ *
+ * <p>3.{@link DataExecutionVisitor}, to actually write data on data region
and mark it as received
+ * from pipe.
+ */
public class PipeEnrichedInsertNode extends InsertNode {
private final InsertNode insertNode;
@@ -215,7 +230,7 @@ public class PipeEnrichedInsertNode extends InsertNode {
insertNode.serialize(stream);
}
- public static PlanNode deserialize(ByteBuffer buffer) {
+ public static PipeEnrichedInsertNode deserialize(ByteBuffer buffer) {
return new PipeEnrichedInsertNode((InsertNode)
PlanNodeType.deserialize(buffer));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWriteSchemaNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWriteSchemaNode.java
new file mode 100644
index 00000000000..4ac9ffbe69e
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWriteSchemaNode.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import
org.apache.iotdb.db.consensus.statemachine.schemaregion.SchemaExecutionVisitor;
+import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.BatchActivateTemplateNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * This class aims to mark the {@link WritePlanNode} of schema writing to
selectively forward pipe
+ * schema operations, {@link Statement}s of which passing through {@link
SchemaExecutionVisitor}.
+ * The handling logic is defined in:
+ *
+ * <p>1.{@link RegionWriteExecutor}, to serialize and reach the target schema
region.
+ *
+ * <p>2.{@link SchemaExecutionVisitor}, to actually write data on schema
region and mark it as
+ * received from pipe.
+ *
+ * <p>
+ *
+ * <p>Notes: The content varies in all the {@link WritePlanNode}s of schema
writing nodes:
+ *
+ * <p>- Timeseries(Manual): {@link CreateTimeSeriesNode}, {@link
CreateAlignedTimeSeriesNode},
+ * {@link CreateMultiTimeSeriesNode}, {@link AlterTimeSeriesNode}
+ *
+ * <p>- Timeseries(Auto): {@link InternalCreateTimeSeriesNode}, {@link
+ * InternalCreateMultiTimeSeriesNode}
+ *
+ * <p>- Template(Manual): {@link ActivateTemplateNode}, {@link
BatchActivateTemplateNode}
+ *
+ * <p>- Template(Auto): {@link InternalBatchActivateTemplateNode}
+ *
+ * <p>- LogicalView: {@link CreateLogicalViewNode}
+ */
+public class PipeEnrichedWriteSchemaNode extends WritePlanNode {
+
+ private final WritePlanNode writeSchemaNode;
+
+ public PipeEnrichedWriteSchemaNode(WritePlanNode schemaWriteNode) {
+ super(schemaWriteNode.getPlanNodeId());
+ this.writeSchemaNode = schemaWriteNode;
+ }
+
+ public WritePlanNode getWriteSchemaNode() {
+ return writeSchemaNode;
+ }
+
+ @Override
+ public boolean isGeneratedByPipe() {
+ return writeSchemaNode.isGeneratedByPipe();
+ }
+
+ @Override
+ public void markAsGeneratedByPipe() {
+ writeSchemaNode.markAsGeneratedByPipe();
+ }
+
+ @Override
+ public PlanNodeId getPlanNodeId() {
+ return writeSchemaNode.getPlanNodeId();
+ }
+
+ @Override
+ public void setPlanNodeId(PlanNodeId id) {
+ writeSchemaNode.setPlanNodeId(id);
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return writeSchemaNode.getChildren();
+ }
+
+ @Override
+ public void addChild(PlanNode child) {
+ writeSchemaNode.addChild(child);
+ }
+
+ @Override
+ public WritePlanNode clone() {
+ return new PipeEnrichedWriteSchemaNode((WritePlanNode)
writeSchemaNode.clone());
+ }
+
+ @Override
+ public WritePlanNode createSubNode(int subNodeId, int startIndex, int
endIndex) {
+ return new PipeEnrichedWriteSchemaNode(
+ (WritePlanNode) writeSchemaNode.createSubNode(subNodeId, startIndex,
endIndex));
+ }
+
+ @Override
+ public PlanNode cloneWithChildren(List<PlanNode> children) {
+ return new PipeEnrichedWriteSchemaNode(
+ (WritePlanNode) writeSchemaNode.cloneWithChildren(children));
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return writeSchemaNode.allowedChildCount();
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return writeSchemaNode.getOutputColumnNames();
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitPipeEnrichedWriteSchema(this, context);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.PIPE_ENRICHED_WRITE_SCHEMA.serialize(byteBuffer);
+ writeSchemaNode.serialize(byteBuffer);
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ PlanNodeType.PIPE_ENRICHED_WRITE_SCHEMA.serialize(stream);
+ writeSchemaNode.serialize(stream);
+ }
+
+ public static PipeEnrichedWriteSchemaNode deserialize(ByteBuffer buffer) {
+ return new PipeEnrichedWriteSchemaNode((WritePlanNode)
PlanNodeType.deserialize(buffer));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof PipeEnrichedWriteSchemaNode
+ && writeSchemaNode.equals(((PipeEnrichedWriteSchemaNode)
o).writeSchemaNode);
+ }
+
+ @Override
+ public int hashCode() {
+ return writeSchemaNode.hashCode();
+ }
+
+ @Override
+ public TRegionReplicaSet getRegionReplicaSet() {
+ return writeSchemaNode.getRegionReplicaSet();
+ }
+
+ @Override
+ public List<WritePlanNode> splitByPartition(Analysis analysis) {
+ return writeSchemaNode.splitByPartition(analysis).stream()
+ .map(
+ plan ->
+ plan instanceof PipeEnrichedWriteSchemaNode
+ ? plan
+ : new PipeEnrichedWriteSchemaNode(plan))
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 0b0fe8da749..5f0b83b64fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -78,8 +78,6 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
protected ProgressIndex progressIndex;
- protected boolean isGeneratedByPipe = false;
-
protected InsertNode(PlanNodeId id) {
super(id);
}
@@ -176,14 +174,6 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
throw new NotImplementedException("serializeAttributes of InsertNode is
not implemented");
}
- public boolean isGeneratedByPipe() {
- return isGeneratedByPipe;
- }
-
- public void markAsGeneratedByPipe() {
- isGeneratedByPipe = true;
- }
-
// region Serialization methods for WAL
/** Serialized size of measurement schemas, ignoring failed time series */
protected int serializeMeasurementSchemasSize() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
index b673703649a..2b1cf8a68f8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
@@ -64,7 +64,6 @@ public enum StatementType {
BATCH_INSERT_ROWS,
BATCH_INSERT_ONE_DEVICE,
MULTI_BATCH_INSERT,
- PIPE_ENRICHED_INSERT,
DELETE,
@@ -174,4 +173,6 @@ public enum StatementType {
DELETE_LOGICAL_VIEW,
RENAME_LOGICAL_VIEW,
ALTER_LOGICAL_VIEW,
+
+ PIPE_ENRICHED,
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
index 2d34e5affba..c824593bd15 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
@@ -27,8 +27,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -98,6 +96,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogica
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.DeleteLogicalViewStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.RenameLogicalViewStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.ShowLogicalViewStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.AuthorStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ClearCacheStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement;
@@ -283,11 +282,6 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(loadTsFileStatement, context);
}
- public R visitPipeEnrichedLoadFile(
- PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, C
context) {
- return visitStatement(pipeEnrichedLoadTsFileStatement, context);
- }
-
public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
return visitStatement(insertRowStatement, context);
}
@@ -306,9 +300,8 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(insertRowsOfOneDeviceStatement, context);
}
- public R visitPipeEnrichedInsert(
- PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement, C
context) {
- return visitStatement(pipeEnrichedInsertBaseStatement, context);
+ public R visitPipeEnrichedStatement(PipeEnrichedStatement
pipeEnrichedStatement, C context) {
+ return visitStatement(pipeEnrichedStatement, context);
}
/** Data Control Language (DCL) */
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
deleted file mode 100644
index 4b659b4ab15..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.plan.statement.crud;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
-import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
-import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-public class PipeEnrichedInsertBaseStatement extends InsertBaseStatement {
-
- private InsertBaseStatement insertBaseStatement;
-
- public PipeEnrichedInsertBaseStatement(InsertBaseStatement
insertBaseStatement) {
- statementType = StatementType.PIPE_ENRICHED_INSERT;
- this.insertBaseStatement = insertBaseStatement;
- }
-
- public InsertBaseStatement getInsertBaseStatement() {
- return insertBaseStatement;
- }
-
- public void setInsertBaseStatement(InsertBaseStatement insertBaseStatement) {
- this.insertBaseStatement = insertBaseStatement;
- }
-
- @Override
- public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
- return visitor.visitPipeEnrichedInsert(this, context);
- }
-
- @Override
- public boolean isDebug() {
- return insertBaseStatement.isDebug();
- }
-
- @Override
- public void setDebug(boolean debug) {
- insertBaseStatement.setDebug(debug);
- }
-
- @Override
- public boolean isQuery() {
- return !Objects.isNull(insertBaseStatement) &&
insertBaseStatement.isQuery();
- }
-
- @Override
- public PartialPath getDevicePath() {
- return insertBaseStatement.getDevicePath();
- }
-
- @Override
- public void setDevicePath(PartialPath devicePath) {
- insertBaseStatement.setDevicePath(devicePath);
- }
-
- @Override
- public String[] getMeasurements() {
- return insertBaseStatement.getMeasurements();
- }
-
- @Override
- public void setMeasurements(String[] measurements) {
- insertBaseStatement.setMeasurements(measurements);
- }
-
- @Override
- public MeasurementSchema[] getMeasurementSchemas() {
- return insertBaseStatement.getMeasurementSchemas();
- }
-
- @Override
- public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) {
- insertBaseStatement.setMeasurementSchemas(measurementSchemas);
- }
-
- @Override
- public boolean isAligned() {
- return insertBaseStatement.isAligned();
- }
-
- @Override
- public void setAligned(boolean aligned) {
- insertBaseStatement.setAligned(aligned);
- }
-
- @Override
- public TSDataType[] getDataTypes() {
- return insertBaseStatement.getDataTypes();
- }
-
- @Override
- public void setDataTypes(TSDataType[] dataTypes) {
- insertBaseStatement.setDataTypes(dataTypes);
- }
-
- @Override
- public List<PartialPath> getPaths() {
- return insertBaseStatement.getPaths();
- }
-
- @Override
- public void updateAfterSchemaValidation() throws QueryProcessException {
- insertBaseStatement.updateAfterSchemaValidation();
- }
-
- @Override
- protected void selfCheckDataTypes(int index)
- throws DataTypeMismatchException, PathNotExistException {
- insertBaseStatement.selfCheckDataTypes(index);
- }
-
- @Override
- public void markFailedMeasurement(int index, Exception cause) {
- insertBaseStatement.markFailedMeasurement(index, cause);
- }
-
- @Override
- public boolean hasValidMeasurements() {
- return insertBaseStatement.hasValidMeasurements();
- }
-
- @Override
- public boolean hasFailedMeasurements() {
- return insertBaseStatement.hasFailedMeasurements();
- }
-
- @Override
- public int getFailedMeasurementNumber() {
- return insertBaseStatement.getFailedMeasurementNumber();
- }
-
- @Override
- public List<String> getFailedMeasurements() {
- return insertBaseStatement.getFailedMeasurements();
- }
-
- @Override
- public List<Exception> getFailedExceptions() {
- return insertBaseStatement.getFailedExceptions();
- }
-
- @Override
- public List<String> getFailedMessages() {
- return insertBaseStatement.getFailedMessages();
- }
-
- @Override
- public void setFailedMeasurementIndex2Info(
- Map<Integer, InsertBaseStatement.FailedMeasurementInfo>
failedMeasurementIndex2Info) {
-
insertBaseStatement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info);
- }
-
- @Override
- protected Map<PartialPath, List<Pair<String, Integer>>>
getMapFromDeviceToMeasurementAndIndex() {
- return insertBaseStatement.getMapFromDeviceToMeasurementAndIndex();
- }
-
- @Override
- public boolean isEmpty() {
- return insertBaseStatement.isEmpty();
- }
-
- @Override
- public ISchemaValidation getSchemaValidation() {
- return insertBaseStatement.getSchemaValidation();
- }
-
- @Override
- public List<ISchemaValidation> getSchemaValidationList() {
- return insertBaseStatement.getSchemaValidationList();
- }
-
- @Override
- protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType)
{
- return insertBaseStatement.checkAndCastDataType(columnIndex, dataType);
- }
-
- @Override
- public long getMinTime() {
- return insertBaseStatement.getMinTime();
- }
-
- @Override
- public Object getFirstValueOfIndex(int index) {
- return insertBaseStatement.getFirstValueOfIndex(index);
- }
-
- @Override
- public InsertBaseStatement removeLogicalView() {
- return new
PipeEnrichedInsertBaseStatement(insertBaseStatement.removeLogicalView());
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
deleted file mode 100644
index c2d9f349052..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.plan.statement.crud;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
-import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-
-import java.io.File;
-import java.util.List;
-
-public class PipeEnrichedLoadTsFileStatement extends LoadTsFileStatement {
-
- private final LoadTsFileStatement loadTsFileStatement;
-
- public PipeEnrichedLoadTsFileStatement(LoadTsFileStatement
loadTsFileStatement) {
- this.loadTsFileStatement = loadTsFileStatement;
- statementType = StatementType.MULTI_BATCH_INSERT;
- }
-
- public LoadTsFileStatement getLoadTsFileStatement() {
- return loadTsFileStatement;
- }
-
- @Override
- public boolean isDebug() {
- return loadTsFileStatement.isDebug();
- }
-
- @Override
- public void setDebug(boolean debug) {
- loadTsFileStatement.setDebug(debug);
- }
-
- @Override
- public boolean isQuery() {
- return loadTsFileStatement.isQuery();
- }
-
- @Override
- public void setDeleteAfterLoad(boolean deleteAfterLoad) {
- loadTsFileStatement.setDeleteAfterLoad(deleteAfterLoad);
- }
-
- @Override
- public void setDatabaseLevel(int databaseLevel) {
- loadTsFileStatement.setDatabaseLevel(databaseLevel);
- }
-
- @Override
- public void setVerifySchema(boolean verifySchema) {
- loadTsFileStatement.setVerifySchema(verifySchema);
- }
-
- @Override
- public void setAutoCreateDatabase(boolean autoCreateDatabase) {
- loadTsFileStatement.setAutoCreateDatabase(autoCreateDatabase);
- }
-
- @Override
- public boolean isVerifySchema() {
- return loadTsFileStatement.isVerifySchema();
- }
-
- @Override
- public boolean isDeleteAfterLoad() {
- return loadTsFileStatement.isDeleteAfterLoad();
- }
-
- @Override
- public boolean isAutoCreateDatabase() {
- return loadTsFileStatement.isAutoCreateDatabase();
- }
-
- @Override
- public int getDatabaseLevel() {
- return loadTsFileStatement.getDatabaseLevel();
- }
-
- @Override
- public List<File> getTsFiles() {
- return loadTsFileStatement.getTsFiles();
- }
-
- @Override
- public void addTsFileResource(TsFileResource resource) {
- loadTsFileStatement.addTsFileResource(resource);
- }
-
- @Override
- public List<TsFileResource> getResources() {
- return loadTsFileStatement.getResources();
- }
-
- @Override
- public void addWritePointCount(long writePointCount) {
- loadTsFileStatement.addWritePointCount(writePointCount);
- }
-
- @Override
- public long getWritePointCount(int resourceIndex) {
- return loadTsFileStatement.getWritePointCount(resourceIndex);
- }
-
- @Override
- public List<PartialPath> getPaths() {
- return loadTsFileStatement.getPaths();
- }
-
- @Override
- public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
- return visitor.visitPipeEnrichedLoadFile(this, context);
- }
-
- @Override
- public String toString() {
- return "PipeEnrichedLoadTsFileStatement{" + "loadTsFileStatement=" +
loadTsFileStatement + '}';
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
new file mode 100644
index 00000000000..1f2080be372
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.pipe;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class PipeEnrichedStatement extends Statement {
+
+ private Statement innerStatement;
+
+ public PipeEnrichedStatement(Statement innerStatement) {
+ statementType = StatementType.PIPE_ENRICHED;
+ this.innerStatement = innerStatement;
+ }
+
+ public Statement getInnerStatement() {
+ return innerStatement;
+ }
+
+ public void setInnerStatement(Statement innerStatement) {
+ this.innerStatement = innerStatement;
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitPipeEnrichedStatement(this, context);
+ }
+
+ @Override
+ public boolean isDebug() {
+ return innerStatement.isDebug();
+ }
+
+ @Override
+ public void setDebug(boolean debug) {
+ innerStatement.setDebug(debug);
+ }
+
+ @Override
+ public boolean isQuery() {
+ return !Objects.isNull(innerStatement) && innerStatement.isQuery();
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return Collections.emptyList();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
index 70037a99089..8c85795f1c9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
@@ -85,7 +85,7 @@ public class DataNodeThrottleQuotaManager {
case BATCH_INSERT_ONE_DEVICE:
case BATCH_INSERT_ROWS:
case MULTI_BATCH_INSERT:
- case PIPE_ENRICHED_INSERT:
+ case PIPE_ENRICHED:
return checkQuota(userName, 1, 0, s);
case QUERY:
case GROUP_BY_TIME:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
index b6b714b5de2..5d5882430d5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
@@ -29,8 +29,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.BitMap;
@@ -95,10 +94,10 @@ public class DefaultOperationQuota implements
OperationQuota {
protected void updateEstimateConsumeQuota(int numWrites, int numReads,
Statement s) {
if (numWrites > 0) {
long avgSize = 0;
- final StatementType statementType =
- s.getType() == StatementType.PIPE_ENRICHED_INSERT
- ? ((PipeEnrichedInsertBaseStatement)
s).getInsertBaseStatement().getType()
- : s.getType();
+ if (s.getType() == StatementType.PIPE_ENRICHED) {
+ s = ((PipeEnrichedStatement) s).getInnerStatement();
+ }
+ final StatementType statementType = s.getType();
switch (statementType) {
case INSERT:
// InsertStatement InsertRowStatement
@@ -138,10 +137,7 @@ public class DefaultOperationQuota implements
OperationQuota {
}
break;
case MULTI_BATCH_INSERT:
- // PipeEnrichedLoadTsFileStatement LoadTsFileStatement
InsertMultiTabletsStatement
- if (s instanceof PipeEnrichedLoadTsFileStatement) {
- s = ((PipeEnrichedLoadTsFileStatement) s).getLoadTsFileStatement();
- }
+ // LoadTsFileStatement InsertMultiTabletsStatement
if (s instanceof LoadTsFileStatement) {
LoadTsFileStatement loadTsFileStatement = (LoadTsFileStatement) s;
for (int i = 0; i < loadTsFileStatement.getResources().size();
i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
index 68f87bcc86f..e791faad552 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
@@ -36,13 +36,13 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode;
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq;
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp;
@@ -253,20 +253,7 @@ public class TriggerFireVisitor extends
PlanVisitor<TriggerFireResult, TriggerEv
@Override
public TriggerFireResult visitPipeEnrichedInsert(
PipeEnrichedInsertNode node, TriggerEvent context) {
- final InsertNode realInsertNode = node.getInsertNode();
- if (realInsertNode instanceof InsertRowNode) {
- return visitInsertRow((InsertRowNode) realInsertNode, context);
- } else if (realInsertNode instanceof InsertTabletNode) {
- return visitInsertTablet((InsertTabletNode) realInsertNode, context);
- } else if (realInsertNode instanceof InsertRowsNode) {
- return visitInsertRows((InsertRowsNode) realInsertNode, context);
- } else if (realInsertNode instanceof InsertMultiTabletsNode) {
- return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode,
context);
- } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) {
- return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode)
realInsertNode, context);
- } else {
- return visitPlan(realInsertNode, context);
- }
+ return node.getInsertNode().accept(this, context);
}
private Map<String, Integer> constructMeasurementToSchemaIndexMap(