This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch revert-pipe-mark in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e66ef6d6376a3c01d850def5001d8d8f6396b3d5 Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Dec 15 10:08:30 2023 +0800 Revert "Pipe Schema: Receiver Agent: Added pipe enriched planNode to enable pipe request detection to configure "forwarding-pipe-request" (#11672)" This reverts commit 3e4c6a4df918513cb1c002614da3837090a4dfc8. --- .../org/apache/iotdb/db/audit/AuditLogger.java | 2 +- .../dataregion/DataExecutionVisitor.java | 29 ++- .../schemaregion/SchemaExecutionVisitor.java | 14 -- .../protocol/writeback/WriteBackConnector.java | 4 +- .../receiver/thrift/IoTDBThriftReceiverV1.java | 10 +- .../execution/executor/RegionWriteExecutor.java | 276 +++------------------ .../queryengine/plan/analyze/AnalyzeVisitor.java | 42 +++- .../queryengine/plan/execution/QueryExecution.java | 10 +- .../plan/planner/LogicalPlanVisitor.java | 56 +++-- .../plan/planner/plan/node/PlanNode.java | 10 - .../plan/planner/plan/node/PlanNodeType.java | 18 +- .../plan/planner/plan/node/PlanVisitor.java | 23 +- .../planner/plan/node/load/LoadTsFileNode.java | 7 +- .../node/metedata/write/CreateTimeSeriesNode.java | 2 +- .../metedata/write/view/AlterLogicalViewNode.java | 43 +++- .../node/pipe/PipeEnrichedConfigSchemaNode.java | 161 ------------ .../plan/node/pipe/PipeEnrichedDeleteDataNode.java | 169 ------------- .../node/pipe/PipeEnrichedWriteSchemaNode.java | 192 -------------- .../plan/planner/plan/node/write/InsertNode.java | 10 + .../{pipe => write}/PipeEnrichedInsertNode.java | 19 +- .../queryengine/plan/statement/StatementType.java | 3 +- .../plan/statement/StatementVisitor.java | 13 +- .../crud/PipeEnrichedInsertBaseStatement.java | 220 ++++++++++++++++ .../crud/PipeEnrichedLoadTsFileStatement.java | 137 ++++++++++ .../plan/statement/pipe/PipeEnrichedStatement.java | 72 ------ .../quotas/DataNodeThrottleQuotaManager.java | 2 +- .../rescon/quotas/DefaultOperationQuota.java | 16 +- .../db/trigger/executor/TriggerFireVisitor.java | 17 +- 28 files changed, 595 insertions(+), 982 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java index 4f6cecb32a4..ed503128aee 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java @@ -215,7 +215,7 @@ public class AuditLogger { case BATCH_INSERT_ROWS: case BATCH_INSERT_ONE_DEVICE: case MULTI_BATCH_INSERT: - case PIPE_ENRICHED: + case PIPE_ENRICHED_INSERT: case DELETE: case SELECT_INTO: case LOAD_FILES: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java index 845532bc96d..53813fb93ea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java @@ -27,14 +27,14 @@ import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.OutOfTTLException; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -169,8 +169,23 @@ public class DataExecutionVisitor extends PlanVisitor<TSStatus, DataRegion> { @Override public TSStatus visitPipeEnrichedInsert(PipeEnrichedInsertNode node, DataRegion context) { - node.getInsertNode().markAsGeneratedByPipe(); - return node.getInsertNode().accept(this, context); + final InsertNode realInsertNode = node.getInsertNode(); + + realInsertNode.markAsGeneratedByPipe(); + + if (realInsertNode instanceof InsertRowNode) { + return visitInsertRow((InsertRowNode) realInsertNode, context); + } else if (realInsertNode instanceof InsertTabletNode) { + return visitInsertTablet((InsertTabletNode) realInsertNode, context); + } else if (realInsertNode instanceof InsertRowsNode) { + return visitInsertRows((InsertRowsNode) realInsertNode, context); + } else if (realInsertNode instanceof InsertMultiTabletsNode) { + return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode, context); + } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) { + return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode) realInsertNode, context); + } else { + return visitPlan(realInsertNode, context); + } } @Override @@ -200,10 +215,4 @@ public class DataExecutionVisitor extends PlanVisitor<TSStatus, DataRegion> { return new TSStatus(TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode()); } } - - @Override - public TSStatus visitPipeEnrichedDeleteData(PipeEnrichedDeleteDataNode node, DataRegion context) { - node.getDeleteDataNode().markAsGeneratedByPipe(); - return visitDeleteData((DeleteDataNode) node.getDeleteDataNode(), context); - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java index a1496e7b6d7..904863ddcc6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java @@ -50,8 +50,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.vie import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.RollbackLogicalViewBlackListNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedConfigSchemaNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode; import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion; import org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateAlignedTimeSeriesPlan; import org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateTimeSeriesPlan; @@ -522,18 +520,6 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> } } - @Override - public TSStatus visitPipeEnrichedWriteSchema( - PipeEnrichedWriteSchemaNode node, ISchemaRegion schemaRegion) { - return node.getWriteSchemaNode().accept(this, schemaRegion); - } - - @Override - public TSStatus visitPipeEnrichedConfigSchema( - PipeEnrichedConfigSchemaNode node, ISchemaRegion schemaRegion) { - return node.getConfigSchemaNode().accept(this, schemaRegion); - } - @Override public TSStatus visitPlan(PlanNode node, ISchemaRegion context) { return null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java index 9a2883e6935..eb262704c68 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java @@ -36,7 +36,7 @@ import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; -import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; @@ -159,7 +159,7 @@ public class WriteBackConnector implements PipeConnector { private TSStatus executeStatement(InsertBaseStatement statement) { return Coordinator.getInstance() .execute( - new PipeEnrichedStatement(statement), + new PipeEnrichedInsertBaseStatement(statement), SessionManager.getInstance().requestQueryId(), new SessionInfo(0, AuthorityChecker.SUPER_USER, ZoneId.systemDefault().getId()), "", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java index 128d59af02b..03c58874292 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java @@ -48,11 +48,13 @@ import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher; import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; -import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement; import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; import org.apache.iotdb.rpc.RpcUtils; @@ -554,7 +556,11 @@ public class IoTDBThriftReceiverV1 implements IoTDBThriftReceiver { TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null statement."); } - statement = new PipeEnrichedStatement(statement); + if (statement instanceof InsertBaseStatement) { + statement = new PipeEnrichedInsertBaseStatement((InsertBaseStatement) statement); + } else if (statement instanceof LoadTsFileStatement) { + statement = new PipeEnrichedLoadTsFileStatement((LoadTsFileStatement) statement); + } final ExecutionResult result = Coordinator.getInstance() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java index 7c6348fdbb4..8c53c63bddb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java @@ -52,9 +52,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.Int import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.MeasurementGroup; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; @@ -62,6 +59,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode; import org.apache.iotdb.db.schemaengine.SchemaEngine; import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion; import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager; @@ -106,10 +104,6 @@ public class RegionWriteExecutor { private final TriggerFireVisitor triggerFireVisitor; - private final WritePlanNodeExecutionVisitor executionVisitor; - - private final PipeEnrichedWriteSchemaNodeExecutionVisitor pipeExecutionVisitor; - public RegionWriteExecutor() { dataRegionConsensus = DataRegionConsensusImpl.getInstance(); schemaRegionConsensus = SchemaRegionConsensusImpl.getInstance(); @@ -117,8 +111,6 @@ public class RegionWriteExecutor { schemaEngine = SchemaEngine.getInstance(); clusterTemplateManager = ClusterTemplateManager.getInstance(); triggerFireVisitor = new TriggerFireVisitor(); - executionVisitor = new WritePlanNodeExecutionVisitor(); - pipeExecutionVisitor = new PipeEnrichedWriteSchemaNodeExecutionVisitor(executionVisitor); } @TestOnly @@ -135,8 +127,6 @@ public class RegionWriteExecutor { this.schemaEngine = schemaEngine; this.clusterTemplateManager = clusterTemplateManager; this.triggerFireVisitor = triggerFireVisitor; - executionVisitor = new WritePlanNodeExecutionVisitor(); - pipeExecutionVisitor = new PipeEnrichedWriteSchemaNodeExecutionVisitor(executionVisitor); } @SuppressWarnings("squid:S1181") @@ -144,6 +134,7 @@ public class RegionWriteExecutor { try { WritePlanNodeExecutionContext context = new WritePlanNodeExecutionContext(groupId, regionManager.getRegionLock(groupId)); + WritePlanNodeExecutionVisitor executionVisitor = new WritePlanNodeExecutionVisitor(); return planNode.accept(executionVisitor, context); } catch (Throwable e) { LOGGER.error(e.getMessage(), e); @@ -255,13 +246,13 @@ public class RegionWriteExecutor { } } - private TSStatus fireTriggerAndInsert(ConsensusGroupId groupId, InsertNode insertNode) + private TSStatus fireTriggerAndInsert(ConsensusGroupId groupId, PlanNode planNode) throws ConsensusException { long triggerCostTime = 0; TSStatus status; long startTime = System.nanoTime(); // fire Trigger before the insertion - TriggerFireResult result = triggerFireVisitor.process(insertNode, TriggerEvent.BEFORE_INSERT); + TriggerFireResult result = triggerFireVisitor.process(planNode, TriggerEvent.BEFORE_INSERT); triggerCostTime += (System.nanoTime() - startTime); if (result.equals(TriggerFireResult.TERMINATION)) { status = @@ -270,14 +261,14 @@ public class RegionWriteExecutor { "Failed to complete the insertion because trigger error before the insertion."); } else { long startWriteTime = System.nanoTime(); - status = dataRegionConsensus.write(groupId, insertNode); + status = dataRegionConsensus.write(groupId, planNode); PERFORMANCE_OVERVIEW_METRICS.recordScheduleStorageCost(System.nanoTime() - startWriteTime); // fire Trigger after the insertion startTime = System.nanoTime(); boolean hasFailedTriggerBeforeInsertion = result.equals(TriggerFireResult.FAILED_NO_TERMINATION); - result = triggerFireVisitor.process(insertNode, TriggerEvent.AFTER_INSERT); + result = triggerFireVisitor.process(planNode, TriggerEvent.AFTER_INSERT); if (hasFailedTriggerBeforeInsertion || !result.equals(TriggerFireResult.SUCCESS)) { status = RpcUtils.getStatus( @@ -290,18 +281,6 @@ public class RegionWriteExecutor { return status; } - @Override - public RegionExecutionResult visitPipeEnrichedDeleteData( - PipeEnrichedDeleteDataNode node, WritePlanNodeExecutionContext context) { - // data deletion should block data insertion, especially when executed for deleting timeseries - context.getRegionWriteValidationRWLock().writeLock().lock(); - try { - return super.visitPipeEnrichedDeleteData(node, context); - } finally { - context.getRegionWriteValidationRWLock().writeLock().unlock(); - } - } - @Override public RegionExecutionResult visitDeleteData( DeleteDataNode node, WritePlanNodeExecutionContext context) { @@ -317,13 +296,6 @@ public class RegionWriteExecutor { @Override public RegionExecutionResult visitCreateTimeSeries( CreateTimeSeriesNode node, WritePlanNodeExecutionContext context) { - return executeCreateTimeSeries(node, context, false); - } - - private RegionExecutionResult executeCreateTimeSeries( - CreateTimeSeriesNode node, - WritePlanNodeExecutionContext context, - boolean receivedFromPipe) { ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); RegionExecutionResult result = @@ -340,9 +312,7 @@ public class RegionWriteExecutor { Collections.singletonList(node.getPath().getMeasurement()), Collections.singletonList(node.getAlias())); if (failingMeasurementMap.isEmpty()) { - return receivedFromPipe - ? super.visitPipeEnrichedWriteSchema(new PipeEnrichedWriteSchemaNode(node), context) - : super.visitCreateTimeSeries(node, context); + return super.visitCreateTimeSeries(node, context); } else { MetadataException metadataException = failingMeasurementMap.get(0); LOGGER.error(METADATA_ERROR_MSG, metadataException); @@ -358,22 +328,13 @@ public class RegionWriteExecutor { context.getRegionWriteValidationRWLock().writeLock().unlock(); } } else { - return receivedFromPipe - ? super.visitPipeEnrichedWriteSchema(new PipeEnrichedWriteSchemaNode(node), context) - : super.visitCreateTimeSeries(node, context); + return super.visitCreateTimeSeries(node, context); } } @Override public RegionExecutionResult visitCreateAlignedTimeSeries( CreateAlignedTimeSeriesNode node, WritePlanNodeExecutionContext context) { - return executeCreateAlignedTimeSeries(node, context, false); - } - - private RegionExecutionResult executeCreateAlignedTimeSeries( - CreateAlignedTimeSeriesNode node, - WritePlanNodeExecutionContext context, - boolean receivedFromPipe) { ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); RegionExecutionResult result = @@ -389,9 +350,7 @@ public class RegionWriteExecutor { schemaRegion.checkMeasurementExistence( node.getDevicePath(), node.getMeasurements(), node.getAliasList()); if (failingMeasurementMap.isEmpty()) { - return receivedFromPipe - ? super.visitPipeEnrichedWriteSchema(new PipeEnrichedWriteSchemaNode(node), context) - : super.visitCreateAlignedTimeSeries(node, context); + return super.visitCreateAlignedTimeSeries(node, context); } else { MetadataException metadataException = failingMeasurementMap.values().iterator().next(); LOGGER.error(METADATA_ERROR_MSG, metadataException); @@ -407,22 +366,13 @@ public class RegionWriteExecutor { context.getRegionWriteValidationRWLock().writeLock().unlock(); } } else { - return receivedFromPipe - ? super.visitPipeEnrichedWriteSchema(new PipeEnrichedWriteSchemaNode(node), context) - : super.visitCreateAlignedTimeSeries(node, context); + return super.visitCreateAlignedTimeSeries(node, context); } } @Override public RegionExecutionResult visitCreateMultiTimeSeries( CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) { - return executeCreateMultiTimeSeries(node, context, false); - } - - private RegionExecutionResult executeCreateMultiTimeSeries( - CreateMultiTimeSeriesNode node, - WritePlanNodeExecutionContext context, - boolean receivedFromPipe) { ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); RegionExecutionResult result; @@ -450,8 +400,7 @@ public class RegionWriteExecutor { } RegionExecutionResult failingResult = - registerTimeSeries( - measurementGroupMap, node, context, failingStatus, receivedFromPipe); + registerTimeSeries(measurementGroupMap, node, context, failingStatus); if (failingResult != null) { return failingResult; @@ -467,9 +416,7 @@ public class RegionWriteExecutor { context.getRegionWriteValidationRWLock().writeLock().unlock(); } } else { - return receivedFromPipe - ? super.visitPipeEnrichedWriteSchema(new PipeEnrichedWriteSchemaNode(node), context) - : super.visitCreateMultiTimeSeries(node, context); + return super.visitCreateMultiTimeSeries(node, context); } } @@ -508,14 +455,10 @@ public class RegionWriteExecutor { Map<PartialPath, MeasurementGroup> measurementGroupMap, CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context, - List<TSStatus> failingStatus, - boolean receivedFromPipe) { + List<TSStatus> failingStatus) { if (!measurementGroupMap.isEmpty()) { // try registering the rest timeseries - RegionExecutionResult executionResult = - receivedFromPipe - ? super.visitPipeEnrichedWriteSchema(new PipeEnrichedWriteSchemaNode(node), context) - : super.visitCreateMultiTimeSeries(node, context); + RegionExecutionResult executionResult = super.visitCreateMultiTimeSeries(node, context); if (failingStatus.isEmpty()) { return executionResult; } @@ -533,13 +476,6 @@ public class RegionWriteExecutor { @Override public RegionExecutionResult visitInternalCreateTimeSeries( InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext context) { - return executeInternalCreateTimeSeries(node, context, false); - } - - private RegionExecutionResult executeInternalCreateTimeSeries( - InternalCreateTimeSeriesNode node, - WritePlanNodeExecutionContext context, - boolean receivedFromPipe) { ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); RegionExecutionResult result = @@ -583,32 +519,20 @@ public class RegionWriteExecutor { measurementGroup.removeMeasurements(failingMeasurementMap.keySet()); return processExecutionResultOfInternalCreateSchema( - receivedFromPipe - ? super.visitPipeEnrichedWriteSchema( - new PipeEnrichedWriteSchemaNode(node), context) - : super.visitInternalCreateTimeSeries(node, context), + super.visitInternalCreateTimeSeries(node, context), failingStatus, alreadyExistingStatus); } finally { context.getRegionWriteValidationRWLock().writeLock().unlock(); } } else { - return receivedFromPipe - ? super.visitPipeEnrichedWriteSchema(new PipeEnrichedWriteSchemaNode(node), context) - : super.visitInternalCreateTimeSeries(node, context); + return super.visitInternalCreateTimeSeries(node, context); } } @Override public RegionExecutionResult visitInternalCreateMultiTimeSeries( InternalCreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) { - return executeInternalCreateMultiTimeSeries(node, context, false); - } - - private RegionExecutionResult executeInternalCreateMultiTimeSeries( - InternalCreateMultiTimeSeriesNode node, - WritePlanNodeExecutionContext context, - boolean receivedFromPipe) { ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); RegionExecutionResult result; @@ -662,19 +586,14 @@ public class RegionWriteExecutor { } return processExecutionResultOfInternalCreateSchema( - receivedFromPipe - ? super.visitPipeEnrichedWriteSchema( - new PipeEnrichedWriteSchemaNode(node), context) - : super.visitInternalCreateMultiTimeSeries(node, context), + super.visitInternalCreateMultiTimeSeries(node, context), failingStatus, alreadyExistingStatus); } finally { context.getRegionWriteValidationRWLock().writeLock().unlock(); } } else { - return receivedFromPipe - ? super.visitPipeEnrichedWriteSchema(new PipeEnrichedWriteSchemaNode(node), context) - : super.visitInternalCreateMultiTimeSeries(node, context); + return super.visitInternalCreateMultiTimeSeries(node, context); } } @@ -757,22 +676,17 @@ public class RegionWriteExecutor { @Override public RegionExecutionResult visitAlterTimeSeries( AlterTimeSeriesNode node, WritePlanNodeExecutionContext context) { - return executeAlterTimeSeries(node, context, false); - } - - private RegionExecutionResult executeAlterTimeSeries( - AlterTimeSeriesNode node, WritePlanNodeExecutionContext context, boolean receivedFromPipe) { ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); try { MeasurementPath measurementPath = schemaRegion.fetchMeasurementPath(node.getPath()); - if (node.isAlterView() && !measurementPath.getMeasurementSchema().isLogicalView()) { - throw new MetadataException( - String.format("%s is not view.", measurementPath.getFullPath())); + if (node.isAlterView()) { + if (!measurementPath.getMeasurementSchema().isLogicalView()) { + throw new MetadataException( + String.format("%s is not view.", measurementPath.getFullPath())); + } } - return receivedFromPipe - ? super.visitPipeEnrichedWriteSchema(new PipeEnrichedWriteSchemaNode(node), context) - : super.visitAlterTimeSeries(node, context); + return super.visitAlterTimeSeries(node, context); } catch (MetadataException e) { RegionExecutionResult result = new RegionExecutionResult(); result.setAccepted(true); @@ -785,13 +699,6 @@ public class RegionWriteExecutor { @Override public RegionExecutionResult visitActivateTemplate( ActivateTemplateNode node, WritePlanNodeExecutionContext context) { - return executeActivateTemplate(node, context, false); - } - - private RegionExecutionResult executeActivateTemplate( - ActivateTemplateNode node, - WritePlanNodeExecutionContext context, - boolean receivedFromPipe) { // activate template operation shall be blocked by unset template check context.getRegionWriteValidationRWLock().readLock().lock(); try { @@ -815,13 +722,7 @@ public class RegionWriteExecutor { RegionExecutionResult result = checkQuotaBeforeCreatingTimeSeries( schemaRegion, node.getActivatePath(), templateSetInfo.left.getMeasurementNumber()); - if (result == null) { - return receivedFromPipe - ? super.visitPipeEnrichedWriteSchema(new PipeEnrichedWriteSchemaNode(node), context) - : super.visitActivateTemplate(node, context); - } else { - return result; - } + return result == null ? super.visitActivateTemplate(node, context) : result; } finally { context.getRegionWriteValidationRWLock().readLock().unlock(); } @@ -830,13 +731,6 @@ public class RegionWriteExecutor { @Override public RegionExecutionResult visitBatchActivateTemplate( BatchActivateTemplateNode node, WritePlanNodeExecutionContext context) { - return executeBatchActivateTemplate(node, context, false); - } - - private RegionExecutionResult executeBatchActivateTemplate( - BatchActivateTemplateNode node, - WritePlanNodeExecutionContext context, - boolean receivedFromPipe) { // activate template operation shall be blocked by unset template check context.getRegionWriteValidationRWLock().readLock().lock(); try { @@ -866,9 +760,7 @@ public class RegionWriteExecutor { } } - return receivedFromPipe - ? super.visitPipeEnrichedWriteSchema(new PipeEnrichedWriteSchemaNode(node), context) - : super.visitBatchActivateTemplate(node, context); + return super.visitBatchActivateTemplate(node, context); } finally { context.getRegionWriteValidationRWLock().readLock().unlock(); } @@ -877,13 +769,6 @@ public class RegionWriteExecutor { @Override public RegionExecutionResult visitInternalBatchActivateTemplate( InternalBatchActivateTemplateNode node, WritePlanNodeExecutionContext context) { - return executeInternalBatchActivateTemplate(node, context, false); - } - - private RegionExecutionResult executeInternalBatchActivateTemplate( - InternalBatchActivateTemplateNode node, - WritePlanNodeExecutionContext context, - boolean receivedFromPipe) { // activate template operation shall be blocked by unset template check context.getRegionWriteValidationRWLock().readLock().lock(); try { @@ -916,9 +801,7 @@ public class RegionWriteExecutor { } } - return receivedFromPipe - ? super.visitPipeEnrichedWriteSchema(new PipeEnrichedWriteSchemaNode(node), context) - : super.visitInternalBatchActivateTemplate(node, context); + return super.visitInternalBatchActivateTemplate(node, context); } finally { context.getRegionWriteValidationRWLock().readLock().unlock(); } @@ -927,19 +810,12 @@ public class RegionWriteExecutor { @Override public RegionExecutionResult visitCreateLogicalView( CreateLogicalViewNode node, WritePlanNodeExecutionContext context) { - return executeCreateLogicalView(node, context, false); - } - - private RegionExecutionResult executeCreateLogicalView( - CreateLogicalViewNode node, - WritePlanNodeExecutionContext context, - boolean receivedFromPipe) { ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); if (CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) { context.getRegionWriteValidationRWLock().writeLock().lock(); try { - // step 1. make sure all target paths do NOT exist. + // step 1. make sure all target paths are NOT exist. List<PartialPath> targetPaths = node.getViewPathList(); List<MetadataException> failingMetadataException = new ArrayList<>(); for (PartialPath thisPath : targetPaths) { @@ -949,12 +825,12 @@ public class RegionWriteExecutor { thisPath.getDevicePath(), Collections.singletonList(thisPath.getMeasurement()), null); - // merge all exceptions into one map + // merge all exception into one map for (Map.Entry<Integer, MetadataException> entry : failingMeasurementMap.entrySet()) { failingMetadataException.add(entry.getValue()); } } - // if there are some exceptions, handle each exception and return first of them. + // if there is some exception, handle each exception and return first of them. if (!failingMetadataException.isEmpty()) { MetadataException metadataException = failingMetadataException.get(0); LOGGER.error(METADATA_ERROR_MSG, metadataException); @@ -966,102 +842,16 @@ public class RegionWriteExecutor { metadataException.getErrorCode(), metadataException.getMessage())); return result; } - // step 2. make sure all source paths exist. - return receivedFromPipe - ? super.visitPipeEnrichedWriteSchema(new PipeEnrichedWriteSchemaNode(node), context) - : super.visitCreateLogicalView(node, context); + // step 2. make sure all source paths are existed. + return super.visitCreateLogicalView(node, context); } finally { context.getRegionWriteValidationRWLock().writeLock().unlock(); } } else { - return receivedFromPipe - ? super.visitPipeEnrichedWriteSchema(new PipeEnrichedWriteSchemaNode(node), context) - : super.visitCreateLogicalView(node, context); + return super.visitCreateLogicalView(node, context); } // end of visitCreateLogicalView } - - @Override - public RegionExecutionResult visitPipeEnrichedWriteSchema( - PipeEnrichedWriteSchemaNode node, WritePlanNodeExecutionContext context) { - return node.getWriteSchemaNode().accept(pipeExecutionVisitor, context); - } - } - - private class PipeEnrichedWriteSchemaNodeExecutionVisitor - extends PlanVisitor<RegionExecutionResult, WritePlanNodeExecutionContext> { - - WritePlanNodeExecutionVisitor visitor; - - private PipeEnrichedWriteSchemaNodeExecutionVisitor(WritePlanNodeExecutionVisitor visitor) { - this.visitor = visitor; - } - - @Override - public RegionExecutionResult visitPlan(PlanNode node, WritePlanNodeExecutionContext context) { - throw new UnsupportedOperationException( - "PipeEnrichedWriteSchemaNodeExecutionVisitor does not support visiting general plan."); - } - - @Override - public RegionExecutionResult visitCreateTimeSeries( - CreateTimeSeriesNode node, WritePlanNodeExecutionContext context) { - return visitor.executeCreateTimeSeries(node, context, true); - } - - @Override - public RegionExecutionResult visitCreateAlignedTimeSeries( - CreateAlignedTimeSeriesNode node, WritePlanNodeExecutionContext context) { - return visitor.executeCreateAlignedTimeSeries(node, context, true); - } - - @Override - public RegionExecutionResult visitCreateMultiTimeSeries( - CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) { - return visitor.executeCreateMultiTimeSeries(node, context, true); - } - - @Override - public RegionExecutionResult visitInternalCreateTimeSeries( - InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext context) { - return visitor.executeInternalCreateTimeSeries(node, context, true); - } - - @Override - public RegionExecutionResult visitInternalCreateMultiTimeSeries( - InternalCreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) { - return visitor.executeInternalCreateMultiTimeSeries(node, context, true); - } - - @Override - public RegionExecutionResult visitAlterTimeSeries( - AlterTimeSeriesNode node, WritePlanNodeExecutionContext context) { - return visitor.executeAlterTimeSeries(node, context, true); - } - - @Override - public RegionExecutionResult visitActivateTemplate( - ActivateTemplateNode node, WritePlanNodeExecutionContext context) { - return visitor.executeActivateTemplate(node, context, true); - } - - @Override - public RegionExecutionResult visitBatchActivateTemplate( - BatchActivateTemplateNode node, WritePlanNodeExecutionContext context) { - return visitor.executeBatchActivateTemplate(node, context, true); - } - - @Override - public RegionExecutionResult visitInternalBatchActivateTemplate( - InternalBatchActivateTemplateNode node, WritePlanNodeExecutionContext context) { - return visitor.executeInternalBatchActivateTemplate(node, context, true); - } - - @Override - public RegionExecutionResult visitCreateLogicalView( - CreateLogicalViewNode node, WritePlanNodeExecutionContext context) { - return visitor.executeCreateLogicalView(node, context, true); - } } private static class WritePlanNodeExecutionContext { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 6ddf58895ac..89842c3d1af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -100,6 +100,8 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement; import org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement; @@ -131,7 +133,6 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ShowPath import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ShowSchemaTemplateStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.ShowLogicalViewStatement; -import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement; @@ -2569,13 +2570,33 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> } @Override - public Analysis visitPipeEnrichedStatement( - PipeEnrichedStatement pipeEnrichedStatement, MPPQueryContext context) { - Analysis analysis = pipeEnrichedStatement.getInnerStatement().accept(this, context); + public Analysis visitPipeEnrichedInsert( + PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement, MPPQueryContext context) { + Analysis analysis; + + final InsertBaseStatement insertBaseStatement = + pipeEnrichedInsertBaseStatement.getInsertBaseStatement(); + if (insertBaseStatement instanceof InsertTabletStatement) { + analysis = visitInsertTablet((InsertTabletStatement) insertBaseStatement, context); + } else if (insertBaseStatement instanceof InsertMultiTabletsStatement) { + analysis = + visitInsertMultiTablets((InsertMultiTabletsStatement) insertBaseStatement, context); + } else if (insertBaseStatement instanceof InsertRowStatement) { + analysis = visitInsertRow((InsertRowStatement) insertBaseStatement, context); + } else if (insertBaseStatement instanceof InsertRowsStatement) { + analysis = visitInsertRows((InsertRowsStatement) insertBaseStatement, context); + } else if (insertBaseStatement instanceof InsertRowsOfOneDeviceStatement) { + analysis = + visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceStatement) insertBaseStatement, context); + } else { + throw new UnsupportedOperationException( + "Unsupported insert statement type: " + insertBaseStatement.getClass().getName()); + } // statement may be changed because of logical view - pipeEnrichedStatement.setInnerStatement(analysis.getStatement()); - analysis.setStatement(pipeEnrichedStatement); + pipeEnrichedInsertBaseStatement.setInsertBaseStatement( + (InsertBaseStatement) analysis.getStatement()); + analysis.setStatement(pipeEnrichedInsertBaseStatement); return analysis; } @@ -2632,6 +2653,15 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> .analyzeFileByFile(); } + @Override + public Analysis visitPipeEnrichedLoadFile( + PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, MPPQueryContext context) { + final Analysis analysis = + visitLoadFile(pipeEnrichedLoadTsFileStatement.getLoadTsFileStatement(), context); + analysis.setStatement(pipeEnrichedLoadTsFileStatement); + return analysis; + } + /** get analysis according to statement and params */ private Analysis getAnalysisForWriting( Analysis analysis, List<DataPartitionQueryParam> dataPartitionQueryParams, String userName) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 9af8cfa867b..ce33e169b36 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -65,7 +65,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; -import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement; import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.iotdb.rpc.RpcUtils; @@ -318,11 +318,7 @@ public class QueryExecution implements IQueryExecution { private void schedule() { final long startTime = System.nanoTime(); - boolean isPipeEnrichedTsFileLoad = - rawStatement instanceof PipeEnrichedStatement - && ((PipeEnrichedStatement) rawStatement).getInnerStatement() - instanceof LoadTsFileStatement; - if (rawStatement instanceof LoadTsFileStatement || isPipeEnrichedTsFileLoad) { + if (rawStatement instanceof LoadTsFileStatement) { this.scheduler = new LoadTsFileScheduler( distributedPlan, @@ -330,7 +326,7 @@ public class QueryExecution implements IQueryExecution { stateMachine, syncInternalServiceClientManager, partitionFetcher, - isPipeEnrichedTsFileLoad); + rawStatement instanceof PipeEnrichedLoadTsFileStatement); this.scheduler.start(); return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index e55521ea107..090c2a55a6f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -31,7 +31,6 @@ import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; import org.apache.iotdb.db.queryengine.plan.expression.visitor.TransformToViewExpressionVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ActivateTemplateNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode; @@ -44,9 +43,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.Int import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.MeasurementGroup; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode; @@ -55,16 +51,20 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; import org.apache.iotdb.db.queryengine.plan.statement.crud.DeleteDataStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement; import org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement; @@ -87,7 +87,6 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchAct import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ShowPathsUsingTemplateStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.ShowLogicalViewStatement; -import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; import org.apache.iotdb.db.schemaengine.template.Template; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -559,20 +558,37 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte } @Override - public PlanNode visitPipeEnrichedStatement( - PipeEnrichedStatement pipeEnrichedStatement, MPPQueryContext context) { - WritePlanNode node = - (WritePlanNode) pipeEnrichedStatement.getInnerStatement().accept(this, context); - - if (node instanceof LoadTsFileNode) { - return node; - } else if (node instanceof InsertNode) { - return new PipeEnrichedInsertNode((InsertNode) node); - } else if (node instanceof DeleteDataNode) { - return new PipeEnrichedDeleteDataNode((DeleteDataNode) node); + public PlanNode visitPipeEnrichedInsert( + PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement, MPPQueryContext context) { + InsertNode insertNode; + + final InsertBaseStatement insertBaseStatement = + pipeEnrichedInsertBaseStatement.getInsertBaseStatement(); + if (insertBaseStatement instanceof InsertRowStatement) { + insertNode = + (InsertRowNode) visitInsertRow((InsertRowStatement) insertBaseStatement, context); + } else if (insertBaseStatement instanceof InsertRowsStatement) { + insertNode = + (InsertRowsNode) visitInsertRows((InsertRowsStatement) insertBaseStatement, context); + } else if (insertBaseStatement instanceof InsertRowsOfOneDeviceStatement) { + insertNode = + (InsertRowsOfOneDeviceNode) + visitInsertRowsOfOneDevice( + (InsertRowsOfOneDeviceStatement) insertBaseStatement, context); + } else if (insertBaseStatement instanceof InsertTabletStatement) { + insertNode = + (InsertTabletNode) + visitInsertTablet((InsertTabletStatement) insertBaseStatement, context); + } else if (insertBaseStatement instanceof InsertMultiTabletsStatement) { + insertNode = + (InsertMultiTabletsNode) + visitInsertMultiTablets((InsertMultiTabletsStatement) insertBaseStatement, context); + } else { + throw new UnsupportedOperationException( + "Unsupported insert statement type: " + insertBaseStatement.getClass().getName()); } - return new PipeEnrichedWriteSchemaNode(node); + return new PipeEnrichedInsertNode(insertNode); } @Override @@ -581,6 +597,12 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte context.getQueryId().genPlanNodeId(), loadTsFileStatement.getResources()); } + @Override + public PlanNode visitPipeEnrichedLoadFile( + PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, MPPQueryContext context) { + return visitLoadFile(pipeEnrichedLoadTsFileStatement.getLoadTsFileStatement(), context); + } + @Override public PlanNode visitShowTimeSeries( ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java index 18f678974b7..2b73576ea03 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java @@ -46,8 +46,6 @@ public abstract class PlanNode implements IConsensusRequest { protected PlanNodeId id; - protected boolean isGeneratedByPipe = false; - protected PlanNode(PlanNodeId id) { requireNonNull(id, "id is null"); this.id = id; @@ -61,14 +59,6 @@ public abstract class PlanNode implements IConsensusRequest { this.id = id; } - public boolean isGeneratedByPipe() { - return isGeneratedByPipe; - } - - public void markAsGeneratedByPipe() { - isGeneratedByPipe = true; - } - public abstract List<PlanNode> getChildren(); public abstract void addChild(PlanNode child); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index a17ea3493fd..ea819b466f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -56,10 +56,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.vie import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.RollbackLogicalViewBlackListNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedConfigSchemaNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode; @@ -101,6 +97,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.DataInputStream; @@ -192,11 +189,7 @@ public enum PlanNodeType { LAST_QUERY_TRANSFORM((short) 81), TOP_K((short) 82), - COLUMN_INJECT((short) 83), - PIPE_ENRICHED_DELETE_DATA((short) 84), - PIPE_ENRICHED_WRITE_SCHEMA((short) 85), - PIPE_ENRICHED_DELETE_SCHEMA((short) 86), - ; + COLUMN_INJECT((short) 83); public static final int BYTES = Short.BYTES; @@ -411,13 +404,6 @@ public enum PlanNodeType { return TopKNode.deserialize(buffer); case 83: return ColumnInjectNode.deserialize(buffer); - case 84: - return PipeEnrichedDeleteDataNode.deserialize(buffer); - case 85: - return PipeEnrichedWriteSchemaNode.deserialize(buffer); - case 86: - return PipeEnrichedConfigSchemaNode.deserialize(buffer); - default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 104242486d7..9dbb3df3b57 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -53,10 +53,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.vie import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.RollbackLogicalViewBlackListNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedConfigSchemaNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode; @@ -101,6 +97,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode; public abstract class PlanVisitor<R, C> { @@ -442,27 +439,11 @@ public abstract class PlanVisitor<R, C> { return visitPlan(node, context); } - public R visitDeleteData(DeleteDataNode node, C context) { - return visitPlan(node, context); - } - - ///////////////////////////////////////////////////////////////////////////////////////////////// - // Pipe Enriched Node - ///////////////////////////////////////////////////////////////////////////////////////////////// - public R visitPipeEnrichedInsert(PipeEnrichedInsertNode node, C context) { return visitPlan(node, context); } - public R visitPipeEnrichedDeleteData(PipeEnrichedDeleteDataNode node, C context) { - return visitPlan(node, context); - } - - public R visitPipeEnrichedWriteSchema(PipeEnrichedWriteSchemaNode node, C context) { - return visitPlan(node, context); - } - - public R visitPipeEnrichedConfigSchema(PipeEnrichedConfigSchemaNode node, C context) { + public R visitDeleteData(DeleteDataNode node, C context) { return visitPlan(node, context); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java index 997f5bd89e6..23a5620a611 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java @@ -25,7 +25,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; -import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.tsfile.exception.NotImplementedException; @@ -89,11 +88,7 @@ public class LoadTsFileNode extends WritePlanNode { @Override public List<WritePlanNode> splitByPartition(Analysis analysis) { List<WritePlanNode> res = new ArrayList<>(); - LoadTsFileStatement statement = - analysis.getStatement() instanceof PipeEnrichedStatement - ? (LoadTsFileStatement) - ((PipeEnrichedStatement) analysis.getStatement()).getInnerStatement() - : (LoadTsFileStatement) analysis.getStatement(); + LoadTsFileStatement statement = (LoadTsFileStatement) analysis.getStatement(); for (int i = 0; i < resources.size(); i++) { res.add( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java index 060acc2f9ca..43b8f9c6510 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java @@ -187,7 +187,7 @@ public class CreateTimeSeriesNode extends WritePlanNode implements ICreateTimeSe public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer) { String id; - PartialPath path; + PartialPath path = null; TSDataType dataType; TSEncoding encoding; CompressionType compressor; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/AlterLogicalViewNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/AlterLogicalViewNode.java index 664575e1bf7..4122ebfb461 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/AlterLogicalViewNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/AlterLogicalViewNode.java @@ -19,13 +19,16 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression; +import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.tsfile.exception.NotImplementedException; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -38,7 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; -public class AlterLogicalViewNode extends PlanNode { +public class AlterLogicalViewNode extends WritePlanNode { /** * A map from target path to source expression. Yht target path is the name of this logical view, @@ -46,6 +49,12 @@ public class AlterLogicalViewNode extends PlanNode { */ private final Map<PartialPath, ViewExpression> viewPathToSourceMap; + /** + * This variable will be set in function splitByPartition() according to analysis. And it will be + * set when creating new split nodes. + */ + private final TRegionReplicaSet regionReplicaSet = null; + public AlterLogicalViewNode(PlanNodeId id, Map<PartialPath, ViewExpression> viewPathToSourceMap) { super(id); this.viewPathToSourceMap = viewPathToSourceMap; @@ -62,6 +71,11 @@ public class AlterLogicalViewNode extends PlanNode { return visitor.visitAlterLogicalView(this, context); } + @Override + public TRegionReplicaSet getRegionReplicaSet() { + return this.regionReplicaSet; + } + @Override public List<PlanNode> getChildren() { return new ArrayList<>(); @@ -146,5 +160,32 @@ public class AlterLogicalViewNode extends PlanNode { PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); return new AlterLogicalViewNode(planNodeId, viewPathToSourceMap); } + + @Override + public List<WritePlanNode> splitByPartition(Analysis analysis) { + Map<TRegionReplicaSet, Map<PartialPath, ViewExpression>> splitMap = new HashMap<>(); + for (Map.Entry<PartialPath, ViewExpression> entry : this.viewPathToSourceMap.entrySet()) { + // for each entry in the map for target path to source expression, + // build a map from TRegionReplicaSet to this entry. + // Please note that getSchemaRegionReplicaSet needs a device path as parameter. + TRegionReplicaSet regionReplicaSet = + analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(entry.getKey().getDevice()); + + // create a map if the key(regionReplicaSet) is not exists, + // then put this entry into this map(from regionReplicaSet to this entry) + splitMap + .computeIfAbsent(regionReplicaSet, k -> new HashMap<>()) + .put(entry.getKey(), entry.getValue()); + } + + // split this node into several nodes according to their regionReplicaSet + List<WritePlanNode> result = new ArrayList<>(); + for (Map.Entry<TRegionReplicaSet, Map<PartialPath, ViewExpression>> entry : + splitMap.entrySet()) { + // for each entry in splitMap, create a plan node. + result.add(new CreateLogicalViewNode(getPlanNodeId(), entry.getValue(), entry.getKey())); + } + return result; + } // endregion } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedConfigSchemaNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedConfigSchemaNode.java deleted file mode 100644 index 1ff6cabb7a2..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedConfigSchemaNode.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe; - -import org.apache.iotdb.db.consensus.statemachine.schemaregion.SchemaExecutionVisitor; -import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ConstructSchemaBlackListNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeactivateTemplateNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode; -import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; -import org.apache.iotdb.db.queryengine.plan.statement.Statement; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; - -/** - * This class aims to mark the {@link PlanNode} of schema operation assigned by configNode to - * selectively forward the operations, {@link Statement}s of which extending {@link - * IConfigStatement}. The handling logic is defined only in {@link SchemaExecutionVisitor} to - * execute deletion logic and mark it as received, since the request does not need to pass {@link - * RegionWriteExecutor} and is assigned directly to regions by configNode. - * - * <p> - * - * <p>Notes: The contents includes all the collected {@link PlanNode}s of schema deletion: - * - * <p>- Timeseries: {@link DeleteTimeSeriesNode} - * - * <p>- Template: {@link DeactivateTemplateNode} - * - * <p>- LogicalView: {@link AlterLogicalViewNode}, {@link DeleteLogicalViewNode} - * - * <p>Intermediate nodes like {@link ConstructSchemaBlackListNode} will not be included since they - * will not be collected by pipe. - */ -public class PipeEnrichedConfigSchemaNode extends PlanNode { - - private final PlanNode configSchemaNode; - - public PipeEnrichedConfigSchemaNode(PlanNode configSchemaNode) { - super(configSchemaNode.getPlanNodeId()); - this.configSchemaNode = configSchemaNode; - } - - public PlanNode getConfigSchemaNode() { - return configSchemaNode; - } - - @Override - public boolean isGeneratedByPipe() { - return configSchemaNode.isGeneratedByPipe(); - } - - @Override - public void markAsGeneratedByPipe() { - configSchemaNode.markAsGeneratedByPipe(); - } - - @Override - public PlanNodeId getPlanNodeId() { - return configSchemaNode.getPlanNodeId(); - } - - @Override - public void setPlanNodeId(PlanNodeId id) { - configSchemaNode.setPlanNodeId(id); - } - - @Override - public List<PlanNode> getChildren() { - return configSchemaNode.getChildren(); - } - - @Override - public void addChild(PlanNode child) { - configSchemaNode.addChild(child); - } - - @Override - public PlanNode clone() { - return new PipeEnrichedConfigSchemaNode(configSchemaNode.clone()); - } - - @Override - public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { - return new PipeEnrichedConfigSchemaNode( - configSchemaNode.createSubNode(subNodeId, startIndex, endIndex)); - } - - @Override - public PlanNode cloneWithChildren(List<PlanNode> children) { - return new PipeEnrichedConfigSchemaNode(configSchemaNode.cloneWithChildren(children)); - } - - @Override - public int allowedChildCount() { - return configSchemaNode.allowedChildCount(); - } - - @Override - public List<String> getOutputColumnNames() { - return configSchemaNode.getOutputColumnNames(); - } - - @Override - public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitPipeEnrichedConfigSchema(this, context); - } - - @Override - protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.PIPE_ENRICHED_DELETE_SCHEMA.serialize(byteBuffer); - configSchemaNode.serialize(byteBuffer); - } - - @Override - protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.PIPE_ENRICHED_DELETE_SCHEMA.serialize(stream); - configSchemaNode.serialize(stream); - } - - public static PipeEnrichedConfigSchemaNode deserialize(ByteBuffer buffer) { - return new PipeEnrichedConfigSchemaNode(PlanNodeType.deserialize(buffer)); - } - - @Override - public boolean equals(Object o) { - return o instanceof PipeEnrichedConfigSchemaNode - && configSchemaNode.equals(((PipeEnrichedConfigSchemaNode) o).configSchemaNode); - } - - @Override - public int hashCode() { - return configSchemaNode.hashCode(); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java deleted file mode 100644 index 29d2e011c58..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe; - -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor; -import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor; -import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.stream.Collectors; - -/** - * This class aims to mark the {@link DeleteDataNode} to enable selectively forwarding pipe - * deletion. The handling logic is defined in: - * - * <p>1.{@link RegionWriteExecutor}, to serialize and reach the target data region. - * - * <p>2.{@link DataExecutionVisitor}, to actually write data on data region and mark it as received - * from pipe. - */ -public class PipeEnrichedDeleteDataNode extends DeleteDataNode { - - private final DeleteDataNode deleteDataNode; - - public PipeEnrichedDeleteDataNode(DeleteDataNode deleteDataNode) { - super( - deleteDataNode.getPlanNodeId(), - deleteDataNode.getPathList(), - deleteDataNode.getDeleteStartTime(), - deleteDataNode.getDeleteEndTime()); - this.deleteDataNode = deleteDataNode; - } - - public PlanNode getDeleteDataNode() { - return deleteDataNode; - } - - @Override - public boolean isGeneratedByPipe() { - return deleteDataNode.isGeneratedByPipe(); - } - - @Override - public void markAsGeneratedByPipe() { - deleteDataNode.markAsGeneratedByPipe(); - } - - @Override - public PlanNodeId getPlanNodeId() { - return deleteDataNode.getPlanNodeId(); - } - - @Override - public void setPlanNodeId(PlanNodeId id) { - deleteDataNode.setPlanNodeId(id); - } - - @Override - public List<PlanNode> getChildren() { - return deleteDataNode.getChildren(); - } - - @Override - public void addChild(PlanNode child) { - deleteDataNode.addChild(child); - } - - @Override - public DeleteDataNode clone() { - return new PipeEnrichedDeleteDataNode((DeleteDataNode) deleteDataNode.clone()); - } - - @Override - public DeleteDataNode createSubNode(int subNodeId, int startIndex, int endIndex) { - return new PipeEnrichedDeleteDataNode( - (DeleteDataNode) deleteDataNode.createSubNode(subNodeId, startIndex, endIndex)); - } - - @Override - public PlanNode cloneWithChildren(List<PlanNode> children) { - return new PipeEnrichedDeleteDataNode( - (DeleteDataNode) deleteDataNode.cloneWithChildren(children)); - } - - @Override - public int allowedChildCount() { - return deleteDataNode.allowedChildCount(); - } - - @Override - public List<String> getOutputColumnNames() { - return deleteDataNode.getOutputColumnNames(); - } - - @Override - public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitPipeEnrichedDeleteData(this, context); - } - - @Override - protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.PIPE_ENRICHED_DELETE_DATA.serialize(byteBuffer); - deleteDataNode.serialize(byteBuffer); - } - - @Override - protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.PIPE_ENRICHED_DELETE_DATA.serialize(stream); - deleteDataNode.serialize(stream); - } - - public static PipeEnrichedDeleteDataNode deserialize(ByteBuffer buffer) { - return new PipeEnrichedDeleteDataNode((DeleteDataNode) PlanNodeType.deserialize(buffer)); - } - - @Override - public boolean equals(Object o) { - return o instanceof PipeEnrichedDeleteDataNode - && deleteDataNode.equals(((PipeEnrichedDeleteDataNode) o).deleteDataNode); - } - - @Override - public int hashCode() { - return deleteDataNode.hashCode(); - } - - @Override - public TRegionReplicaSet getRegionReplicaSet() { - return deleteDataNode.getRegionReplicaSet(); - } - - @Override - public List<WritePlanNode> splitByPartition(Analysis analysis) { - return deleteDataNode.splitByPartition(analysis).stream() - .map( - plan -> - plan instanceof PipeEnrichedDeleteDataNode - ? plan - : new PipeEnrichedDeleteDataNode((DeleteDataNode) plan)) - .collect(Collectors.toList()); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWriteSchemaNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWriteSchemaNode.java deleted file mode 100644 index 4ac9ffbe69e..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWriteSchemaNode.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe; - -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.db.consensus.statemachine.schemaregion.SchemaExecutionVisitor; -import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor; -import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ActivateTemplateNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.BatchActivateTemplateNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode; -import org.apache.iotdb.db.queryengine.plan.statement.Statement; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.stream.Collectors; - -/** - * This class aims to mark the {@link WritePlanNode} of schema writing to selectively forward pipe - * schema operations, {@link Statement}s of which passing through {@link SchemaExecutionVisitor}. - * The handling logic is defined in: - * - * <p>1.{@link RegionWriteExecutor}, to serialize and reach the target schema region. - * - * <p>2.{@link SchemaExecutionVisitor}, to actually write data on schema region and mark it as - * received from pipe. - * - * <p> - * - * <p>Notes: The content varies in all the {@link WritePlanNode}s of schema writing nodes: - * - * <p>- Timeseries(Manual): {@link CreateTimeSeriesNode}, {@link CreateAlignedTimeSeriesNode}, - * {@link CreateMultiTimeSeriesNode}, {@link AlterTimeSeriesNode} - * - * <p>- Timeseries(Auto): {@link InternalCreateTimeSeriesNode}, {@link - * InternalCreateMultiTimeSeriesNode} - * - * <p>- Template(Manual): {@link ActivateTemplateNode}, {@link BatchActivateTemplateNode} - * - * <p>- Template(Auto): {@link InternalBatchActivateTemplateNode} - * - * <p>- LogicalView: {@link CreateLogicalViewNode} - */ -public class PipeEnrichedWriteSchemaNode extends WritePlanNode { - - private final WritePlanNode writeSchemaNode; - - public PipeEnrichedWriteSchemaNode(WritePlanNode schemaWriteNode) { - super(schemaWriteNode.getPlanNodeId()); - this.writeSchemaNode = schemaWriteNode; - } - - public WritePlanNode getWriteSchemaNode() { - return writeSchemaNode; - } - - @Override - public boolean isGeneratedByPipe() { - return writeSchemaNode.isGeneratedByPipe(); - } - - @Override - public void markAsGeneratedByPipe() { - writeSchemaNode.markAsGeneratedByPipe(); - } - - @Override - public PlanNodeId getPlanNodeId() { - return writeSchemaNode.getPlanNodeId(); - } - - @Override - public void setPlanNodeId(PlanNodeId id) { - writeSchemaNode.setPlanNodeId(id); - } - - @Override - public List<PlanNode> getChildren() { - return writeSchemaNode.getChildren(); - } - - @Override - public void addChild(PlanNode child) { - writeSchemaNode.addChild(child); - } - - @Override - public WritePlanNode clone() { - return new PipeEnrichedWriteSchemaNode((WritePlanNode) writeSchemaNode.clone()); - } - - @Override - public WritePlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { - return new PipeEnrichedWriteSchemaNode( - (WritePlanNode) writeSchemaNode.createSubNode(subNodeId, startIndex, endIndex)); - } - - @Override - public PlanNode cloneWithChildren(List<PlanNode> children) { - return new PipeEnrichedWriteSchemaNode( - (WritePlanNode) writeSchemaNode.cloneWithChildren(children)); - } - - @Override - public int allowedChildCount() { - return writeSchemaNode.allowedChildCount(); - } - - @Override - public List<String> getOutputColumnNames() { - return writeSchemaNode.getOutputColumnNames(); - } - - @Override - public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitPipeEnrichedWriteSchema(this, context); - } - - @Override - protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.PIPE_ENRICHED_WRITE_SCHEMA.serialize(byteBuffer); - writeSchemaNode.serialize(byteBuffer); - } - - @Override - protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.PIPE_ENRICHED_WRITE_SCHEMA.serialize(stream); - writeSchemaNode.serialize(stream); - } - - public static PipeEnrichedWriteSchemaNode deserialize(ByteBuffer buffer) { - return new PipeEnrichedWriteSchemaNode((WritePlanNode) PlanNodeType.deserialize(buffer)); - } - - @Override - public boolean equals(Object o) { - return o instanceof PipeEnrichedWriteSchemaNode - && writeSchemaNode.equals(((PipeEnrichedWriteSchemaNode) o).writeSchemaNode); - } - - @Override - public int hashCode() { - return writeSchemaNode.hashCode(); - } - - @Override - public TRegionReplicaSet getRegionReplicaSet() { - return writeSchemaNode.getRegionReplicaSet(); - } - - @Override - public List<WritePlanNode> splitByPartition(Analysis analysis) { - return writeSchemaNode.splitByPartition(analysis).stream() - .map( - plan -> - plan instanceof PipeEnrichedWriteSchemaNode - ? plan - : new PipeEnrichedWriteSchemaNode(plan)) - .collect(Collectors.toList()); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java index 5f0b83b64fa..0b0fe8da749 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java @@ -78,6 +78,8 @@ public abstract class InsertNode extends WritePlanNode implements ComparableCons protected ProgressIndex progressIndex; + protected boolean isGeneratedByPipe = false; + protected InsertNode(PlanNodeId id) { super(id); } @@ -174,6 +176,14 @@ public abstract class InsertNode extends WritePlanNode implements ComparableCons throw new NotImplementedException("serializeAttributes of InsertNode is not implemented"); } + public boolean isGeneratedByPipe() { + return isGeneratedByPipe; + } + + public void markAsGeneratedByPipe() { + isGeneratedByPipe = true; + } + // region Serialization methods for WAL /** Serialized size of measurement schemas, ignoring failed time series */ protected int serializeMeasurementSchemasSize() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java similarity index 89% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java index 2f2408f8097..28d217176f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java @@ -17,22 +17,18 @@ * under the License. */ -package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe; +package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor; -import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.storageengine.dataregion.memtable.IDeviceID; -import org.apache.iotdb.db.trigger.executor.TriggerFireVisitor; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -42,17 +38,6 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.stream.Collectors; -/** - * This class aims to mark the {@link InsertNode} to prevent forwarding pipe insertions. The - * handling logic is defined in: - * - * <p>1.{@link RegionWriteExecutor}, to serialize and reach the target data region. - * - * <p>2.{@link TriggerFireVisitor}, to fire the trigger before writing to data region. - * - * <p>3.{@link DataExecutionVisitor}, to actually write data on data region and mark it as received - * from pipe. - */ public class PipeEnrichedInsertNode extends InsertNode { private final InsertNode insertNode; @@ -230,7 +215,7 @@ public class PipeEnrichedInsertNode extends InsertNode { insertNode.serialize(stream); } - public static PipeEnrichedInsertNode deserialize(ByteBuffer buffer) { + public static PlanNode deserialize(ByteBuffer buffer) { return new PipeEnrichedInsertNode((InsertNode) PlanNodeType.deserialize(buffer)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java index 2b1cf8a68f8..b673703649a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java @@ -64,6 +64,7 @@ public enum StatementType { BATCH_INSERT_ROWS, BATCH_INSERT_ONE_DEVICE, MULTI_BATCH_INSERT, + PIPE_ENRICHED_INSERT, DELETE, @@ -173,6 +174,4 @@ public enum StatementType { DELETE_LOGICAL_VIEW, RENAME_LOGICAL_VIEW, ALTER_LOGICAL_VIEW, - - PIPE_ENRICHED, } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java index c824593bd15..2d34e5affba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java @@ -27,6 +27,8 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement; import org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement; @@ -96,7 +98,6 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogica import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.DeleteLogicalViewStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.RenameLogicalViewStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.ShowLogicalViewStatement; -import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.AuthorStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ClearCacheStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement; @@ -282,6 +283,11 @@ public abstract class StatementVisitor<R, C> { return visitStatement(loadTsFileStatement, context); } + public R visitPipeEnrichedLoadFile( + PipeEnrichedLoadTsFileStatement pipeEnrichedLoadTsFileStatement, C context) { + return visitStatement(pipeEnrichedLoadTsFileStatement, context); + } + public R visitInsertRow(InsertRowStatement insertRowStatement, C context) { return visitStatement(insertRowStatement, context); } @@ -300,8 +306,9 @@ public abstract class StatementVisitor<R, C> { return visitStatement(insertRowsOfOneDeviceStatement, context); } - public R visitPipeEnrichedStatement(PipeEnrichedStatement pipeEnrichedStatement, C context) { - return visitStatement(pipeEnrichedStatement, context); + public R visitPipeEnrichedInsert( + PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement, C context) { + return visitStatement(pipeEnrichedInsertBaseStatement, context); } /** Data Control Language (DCL) */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java new file mode 100644 index 00000000000..4b659b4ab15 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.statement.crud; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; +import org.apache.iotdb.db.exception.metadata.PathNotExistException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation; +import org.apache.iotdb.db.queryengine.plan.statement.StatementType; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class PipeEnrichedInsertBaseStatement extends InsertBaseStatement { + + private InsertBaseStatement insertBaseStatement; + + public PipeEnrichedInsertBaseStatement(InsertBaseStatement insertBaseStatement) { + statementType = StatementType.PIPE_ENRICHED_INSERT; + this.insertBaseStatement = insertBaseStatement; + } + + public InsertBaseStatement getInsertBaseStatement() { + return insertBaseStatement; + } + + public void setInsertBaseStatement(InsertBaseStatement insertBaseStatement) { + this.insertBaseStatement = insertBaseStatement; + } + + @Override + public <R, C> R accept(StatementVisitor<R, C> visitor, C context) { + return visitor.visitPipeEnrichedInsert(this, context); + } + + @Override + public boolean isDebug() { + return insertBaseStatement.isDebug(); + } + + @Override + public void setDebug(boolean debug) { + insertBaseStatement.setDebug(debug); + } + + @Override + public boolean isQuery() { + return !Objects.isNull(insertBaseStatement) && insertBaseStatement.isQuery(); + } + + @Override + public PartialPath getDevicePath() { + return insertBaseStatement.getDevicePath(); + } + + @Override + public void setDevicePath(PartialPath devicePath) { + insertBaseStatement.setDevicePath(devicePath); + } + + @Override + public String[] getMeasurements() { + return insertBaseStatement.getMeasurements(); + } + + @Override + public void setMeasurements(String[] measurements) { + insertBaseStatement.setMeasurements(measurements); + } + + @Override + public MeasurementSchema[] getMeasurementSchemas() { + return insertBaseStatement.getMeasurementSchemas(); + } + + @Override + public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) { + insertBaseStatement.setMeasurementSchemas(measurementSchemas); + } + + @Override + public boolean isAligned() { + return insertBaseStatement.isAligned(); + } + + @Override + public void setAligned(boolean aligned) { + insertBaseStatement.setAligned(aligned); + } + + @Override + public TSDataType[] getDataTypes() { + return insertBaseStatement.getDataTypes(); + } + + @Override + public void setDataTypes(TSDataType[] dataTypes) { + insertBaseStatement.setDataTypes(dataTypes); + } + + @Override + public List<PartialPath> getPaths() { + return insertBaseStatement.getPaths(); + } + + @Override + public void updateAfterSchemaValidation() throws QueryProcessException { + insertBaseStatement.updateAfterSchemaValidation(); + } + + @Override + protected void selfCheckDataTypes(int index) + throws DataTypeMismatchException, PathNotExistException { + insertBaseStatement.selfCheckDataTypes(index); + } + + @Override + public void markFailedMeasurement(int index, Exception cause) { + insertBaseStatement.markFailedMeasurement(index, cause); + } + + @Override + public boolean hasValidMeasurements() { + return insertBaseStatement.hasValidMeasurements(); + } + + @Override + public boolean hasFailedMeasurements() { + return insertBaseStatement.hasFailedMeasurements(); + } + + @Override + public int getFailedMeasurementNumber() { + return insertBaseStatement.getFailedMeasurementNumber(); + } + + @Override + public List<String> getFailedMeasurements() { + return insertBaseStatement.getFailedMeasurements(); + } + + @Override + public List<Exception> getFailedExceptions() { + return insertBaseStatement.getFailedExceptions(); + } + + @Override + public List<String> getFailedMessages() { + return insertBaseStatement.getFailedMessages(); + } + + @Override + public void setFailedMeasurementIndex2Info( + Map<Integer, InsertBaseStatement.FailedMeasurementInfo> failedMeasurementIndex2Info) { + insertBaseStatement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info); + } + + @Override + protected Map<PartialPath, List<Pair<String, Integer>>> getMapFromDeviceToMeasurementAndIndex() { + return insertBaseStatement.getMapFromDeviceToMeasurementAndIndex(); + } + + @Override + public boolean isEmpty() { + return insertBaseStatement.isEmpty(); + } + + @Override + public ISchemaValidation getSchemaValidation() { + return insertBaseStatement.getSchemaValidation(); + } + + @Override + public List<ISchemaValidation> getSchemaValidationList() { + return insertBaseStatement.getSchemaValidationList(); + } + + @Override + protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) { + return insertBaseStatement.checkAndCastDataType(columnIndex, dataType); + } + + @Override + public long getMinTime() { + return insertBaseStatement.getMinTime(); + } + + @Override + public Object getFirstValueOfIndex(int index) { + return insertBaseStatement.getFirstValueOfIndex(index); + } + + @Override + public InsertBaseStatement removeLogicalView() { + return new PipeEnrichedInsertBaseStatement(insertBaseStatement.removeLogicalView()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java new file mode 100644 index 00000000000..c2d9f349052 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.statement.crud; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.plan.statement.StatementType; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import java.io.File; +import java.util.List; + +public class PipeEnrichedLoadTsFileStatement extends LoadTsFileStatement { + + private final LoadTsFileStatement loadTsFileStatement; + + public PipeEnrichedLoadTsFileStatement(LoadTsFileStatement loadTsFileStatement) { + this.loadTsFileStatement = loadTsFileStatement; + statementType = StatementType.MULTI_BATCH_INSERT; + } + + public LoadTsFileStatement getLoadTsFileStatement() { + return loadTsFileStatement; + } + + @Override + public boolean isDebug() { + return loadTsFileStatement.isDebug(); + } + + @Override + public void setDebug(boolean debug) { + loadTsFileStatement.setDebug(debug); + } + + @Override + public boolean isQuery() { + return loadTsFileStatement.isQuery(); + } + + @Override + public void setDeleteAfterLoad(boolean deleteAfterLoad) { + loadTsFileStatement.setDeleteAfterLoad(deleteAfterLoad); + } + + @Override + public void setDatabaseLevel(int databaseLevel) { + loadTsFileStatement.setDatabaseLevel(databaseLevel); + } + + @Override + public void setVerifySchema(boolean verifySchema) { + loadTsFileStatement.setVerifySchema(verifySchema); + } + + @Override + public void setAutoCreateDatabase(boolean autoCreateDatabase) { + loadTsFileStatement.setAutoCreateDatabase(autoCreateDatabase); + } + + @Override + public boolean isVerifySchema() { + return loadTsFileStatement.isVerifySchema(); + } + + @Override + public boolean isDeleteAfterLoad() { + return loadTsFileStatement.isDeleteAfterLoad(); + } + + @Override + public boolean isAutoCreateDatabase() { + return loadTsFileStatement.isAutoCreateDatabase(); + } + + @Override + public int getDatabaseLevel() { + return loadTsFileStatement.getDatabaseLevel(); + } + + @Override + public List<File> getTsFiles() { + return loadTsFileStatement.getTsFiles(); + } + + @Override + public void addTsFileResource(TsFileResource resource) { + loadTsFileStatement.addTsFileResource(resource); + } + + @Override + public List<TsFileResource> getResources() { + return loadTsFileStatement.getResources(); + } + + @Override + public void addWritePointCount(long writePointCount) { + loadTsFileStatement.addWritePointCount(writePointCount); + } + + @Override + public long getWritePointCount(int resourceIndex) { + return loadTsFileStatement.getWritePointCount(resourceIndex); + } + + @Override + public List<PartialPath> getPaths() { + return loadTsFileStatement.getPaths(); + } + + @Override + public <R, C> R accept(StatementVisitor<R, C> visitor, C context) { + return visitor.visitPipeEnrichedLoadFile(this, context); + } + + @Override + public String toString() { + return "PipeEnrichedLoadTsFileStatement{" + "loadTsFileStatement=" + loadTsFileStatement + '}'; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java deleted file mode 100644 index 1f2080be372..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.plan.statement.pipe; - -import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.queryengine.plan.statement.Statement; -import org.apache.iotdb.db.queryengine.plan.statement.StatementType; -import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; - -import java.util.Collections; -import java.util.List; -import java.util.Objects; - -public class PipeEnrichedStatement extends Statement { - - private Statement innerStatement; - - public PipeEnrichedStatement(Statement innerStatement) { - statementType = StatementType.PIPE_ENRICHED; - this.innerStatement = innerStatement; - } - - public Statement getInnerStatement() { - return innerStatement; - } - - public void setInnerStatement(Statement innerStatement) { - this.innerStatement = innerStatement; - } - - @Override - public <R, C> R accept(StatementVisitor<R, C> visitor, C context) { - return visitor.visitPipeEnrichedStatement(this, context); - } - - @Override - public boolean isDebug() { - return innerStatement.isDebug(); - } - - @Override - public void setDebug(boolean debug) { - innerStatement.setDebug(debug); - } - - @Override - public boolean isQuery() { - return !Objects.isNull(innerStatement) && innerStatement.isQuery(); - } - - @Override - public List<PartialPath> getPaths() { - return Collections.emptyList(); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java index 8c85795f1c9..70037a99089 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java @@ -85,7 +85,7 @@ public class DataNodeThrottleQuotaManager { case BATCH_INSERT_ONE_DEVICE: case BATCH_INSERT_ROWS: case MULTI_BATCH_INSERT: - case PIPE_ENRICHED: + case PIPE_ENRICHED_INSERT: return checkQuota(userName, 1, 0, s); case QUERY: case GROUP_BY_TIME: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java index 5d5882430d5..b6b714b5de2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java @@ -29,7 +29,8 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; -import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement; import org.apache.iotdb.db.utils.TypeInferenceUtils; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.BitMap; @@ -94,10 +95,10 @@ public class DefaultOperationQuota implements OperationQuota { protected void updateEstimateConsumeQuota(int numWrites, int numReads, Statement s) { if (numWrites > 0) { long avgSize = 0; - if (s.getType() == StatementType.PIPE_ENRICHED) { - s = ((PipeEnrichedStatement) s).getInnerStatement(); - } - final StatementType statementType = s.getType(); + final StatementType statementType = + s.getType() == StatementType.PIPE_ENRICHED_INSERT + ? ((PipeEnrichedInsertBaseStatement) s).getInsertBaseStatement().getType() + : s.getType(); switch (statementType) { case INSERT: // InsertStatement InsertRowStatement @@ -137,7 +138,10 @@ public class DefaultOperationQuota implements OperationQuota { } break; case MULTI_BATCH_INSERT: - // LoadTsFileStatement InsertMultiTabletsStatement + // PipeEnrichedLoadTsFileStatement LoadTsFileStatement InsertMultiTabletsStatement + if (s instanceof PipeEnrichedLoadTsFileStatement) { + s = ((PipeEnrichedLoadTsFileStatement) s).getLoadTsFileStatement(); + } if (s instanceof LoadTsFileStatement) { LoadTsFileStatement loadTsFileStatement = (LoadTsFileStatement) s; for (int i = 0; i < loadTsFileStatement.getResources().size(); i++) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java index e791faad552..68f87bcc86f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java @@ -36,13 +36,13 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnrichedInsertNode; import org.apache.iotdb.db.trigger.service.TriggerManagementService; import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq; import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp; @@ -253,7 +253,20 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv @Override public TriggerFireResult visitPipeEnrichedInsert( PipeEnrichedInsertNode node, TriggerEvent context) { - return node.getInsertNode().accept(this, context); + final InsertNode realInsertNode = node.getInsertNode(); + if (realInsertNode instanceof InsertRowNode) { + return visitInsertRow((InsertRowNode) realInsertNode, context); + } else if (realInsertNode instanceof InsertTabletNode) { + return visitInsertTablet((InsertTabletNode) realInsertNode, context); + } else if (realInsertNode instanceof InsertRowsNode) { + return visitInsertRows((InsertRowsNode) realInsertNode, context); + } else if (realInsertNode instanceof InsertMultiTabletsNode) { + return visitInsertMultiTablets((InsertMultiTabletsNode) realInsertNode, context); + } else if (realInsertNode instanceof InsertRowsOfOneDeviceNode) { + return visitInsertRowsOfOneDevice((InsertRowsOfOneDeviceNode) realInsertNode, context); + } else { + return visitPlan(realInsertNode, context); + } } private Map<String, Integer> constructMeasurementToSchemaIndexMap(
