This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch mergeLastQueryScanNodeOfSameDevice in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2955c26453f48fc308b387dfb3c3638364c556be Author: shuwenwei <[email protected]> AuthorDate: Mon Jun 16 19:04:15 2025 +0800 add DeviceLastQueryScanNode --- .../operator/process/last/LastQueryOperator.java | 1 + .../plan/planner/LogicalPlanBuilder.java | 72 +++--- .../plan/planner/OperatorTreeGenerator.java | 123 +++++------ .../plan/planner/SubPlanTypeExtractor.java | 10 +- .../planner/distribution/ExchangeNodeAdder.java | 11 +- .../plan/planner/distribution/SourceRewriter.java | 42 ++-- .../plan/planner/plan/node/PlanGraphPrinter.java | 24 +- .../plan/planner/plan/node/PlanNodeType.java | 12 +- .../plan/planner/plan/node/PlanVisitor.java | 9 +- .../node/process/last/LastQueryCollectNode.java | 2 +- .../plan/node/process/last/LastQueryMergeNode.java | 2 +- .../plan/node/process/last/LastQueryNode.java | 2 +- .../node/process/last/LastQueryTransformNode.java | 2 +- .../plan/node/source/AlignedLastQueryScanNode.java | 245 --------------------- ...yScanNode.java => DeviceLastQueryScanNode.java} | 191 ++++++++++++---- .../plan/planner/distribution/LastQueryTest.java | 14 +- .../logical/DataQueryLogicalPlannerTest.java | 83 ++++--- 17 files changed, 320 insertions(+), 525 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java index 9f50dcb6e7d..ad3f3f6dd05 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java @@ -116,6 +116,7 @@ public class LastQueryOperator implements ProcessOperator { return null; } else if (!tsBlock.isEmpty()) { LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock); + continue; } } else { children.get(currentIndex).close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index fe71feb4639..ae8f92ce578 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -81,10 +81,9 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; @@ -110,6 +109,7 @@ import org.apache.iotdb.db.utils.columngenerator.parameter.SlidingTimeColumnGene import org.apache.commons.lang3.Validate; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.write.schema.IMeasurementSchema; import java.util.ArrayList; import java.util.Arrays; @@ -278,46 +278,34 @@ public class LogicalPlanBuilder { ? sourceExpression.getViewPath().getFullPath() : null; - if (selectedPath.isUnderAlignedEntity()) { // aligned series - sourceNodeList.add( - reserveMemoryForSeriesSourceNode( - new AlignedLastQueryScanNode( - context.getQueryId().genPlanNodeId(), - new AlignedPath(selectedPath), - outputViewPath))); - } else { // non-aligned series - sourceNodeList.add( - reserveMemoryForSeriesSourceNode( - new LastQueryScanNode( - context.getQueryId().genPlanNodeId(), selectedPath, outputViewPath))); - } - } - } else { - if (deviceAlignedSet.contains(outputDevice)) { - // aligned series - List<MeasurementPath> measurementPaths = - measurementToExpressionsOfDevice.values().stream() - .map(expression -> (MeasurementPath) ((TimeSeriesOperand) expression).getPath()) - .collect(Collectors.toList()); - AlignedPath alignedPath = new AlignedPath(measurementPaths.get(0).getDevicePath()); - for (MeasurementPath measurementPath : measurementPaths) { - alignedPath.addMeasurement(measurementPath); - } sourceNodeList.add( reserveMemoryForSeriesSourceNode( - new AlignedLastQueryScanNode( - context.getQueryId().genPlanNodeId(), alignedPath, null))); - } else { - // non-aligned series - for (Expression sourceExpression : measurementToExpressionsOfDevice.values()) { - MeasurementPath selectedPath = - (MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath(); - sourceNodeList.add( - reserveMemoryForSeriesSourceNode( - new LastQueryScanNode( - context.getQueryId().genPlanNodeId(), selectedPath, null))); - } + new DeviceLastQueryScanNode( + context.getQueryId().genPlanNodeId(), + selectedPath.getDevicePath(), + selectedPath.isUnderAlignedEntity(), + Collections.singletonList(selectedPath.getMeasurementSchema()), + outputViewPath))); + } + } else { + boolean aligned = false; + List<IMeasurementSchema> measurementSchemas = + new ArrayList<>(measurementToExpressionsOfDevice.size()); + PartialPath devicePath = null; + for (Expression sourceExpression : measurementToExpressionsOfDevice.values()) { + MeasurementPath selectedPath = + (MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath(); + aligned = selectedPath.isUnderAlignedEntity(); + devicePath = devicePath == null ? selectedPath.getDevicePath() : devicePath; + measurementSchemas.add(selectedPath.getMeasurementSchema()); } + sourceNodeList.add( + new DeviceLastQueryScanNode( + context.getQueryId().genPlanNodeId(), + devicePath, + aligned, + measurementSchemas, + null)); } } @@ -328,10 +316,8 @@ public class LogicalPlanBuilder { Comparator.comparing( child -> { String sortKey = ""; - if (child instanceof LastQueryScanNode) { - sortKey = ((LastQueryScanNode) child).getOutputSymbolForSort(); - } else if (child instanceof AlignedLastQueryScanNode) { - sortKey = ((AlignedLastQueryScanNode) child).getOutputSymbolForSort(); + if (child instanceof DeviceLastQueryScanNode) { + sortKey = ((DeviceLastQueryScanNode) child).getOutputSymbolForSort(); } else if (child instanceof LastQueryTransformNode) { sortKey = ((LastQueryTransformNode) child).getOutputSymbolForSort(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 59f717e2860..25140d0072e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -229,11 +229,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; @@ -2814,55 +2813,18 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP ((SchemaDriverContext) (context.getDriverContext())).getSchemaRegion()); } - @Override - public Operator visitLastQueryScan( - final LastQueryScanNode node, final LocalExecutionPlanContext context) { - final MeasurementPath seriesPath = node.getSeriesPath(); - TimeValuePair timeValuePair = null; - context.dataNodeQueryContext.lock(); - try { - if (!context.dataNodeQueryContext.unCached(seriesPath)) { - timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath); - if (timeValuePair == null) { - context.dataNodeQueryContext.addUnCachePath(seriesPath, node.getDataNodeSeriesScanNum()); - } - } - } finally { - context.dataNodeQueryContext.unLock(); - } - - if (timeValuePair == null) { // last value is not cached - return createUpdateLastCacheOperator(node, context, node.getSeriesPath()); - } else if (timeValuePair.getValue() - == TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE) { // there is no data for this time series - return null; - } else if (!LastQueryUtil.satisfyFilter( - updateFilterUsingTTL( - context.getGlobalTimeFilter(), - DataNodeTTLCache.getInstance().getTTLForTree(seriesPath.getDevicePath().getNodes())), - timeValuePair)) { // cached last value is not satisfied - - if (!isFilterGtOrGe(context.getGlobalTimeFilter())) { - // time filter is not > or >=, we still need to read from disk - return createUpdateLastCacheOperator(node, context, node.getSeriesPath()); - } else { // otherwise, we just ignore it and return null - return null; - } - } else { // cached last value is satisfied, put it into LastCacheScanOperator - context.addCachedLastValue(timeValuePair, node.outputPathSymbol()); - return null; - } - } - public static boolean isFilterGtOrGe(Filter filter) { return filter instanceof TimeGt || filter instanceof TimeGtEq; } private UpdateLastCacheOperator createUpdateLastCacheOperator( - final LastQueryScanNode node, - final LocalExecutionPlanContext context, - final MeasurementPath fullPath) { - final SeriesAggregationScanOperator lastQueryScan = createLastQueryScanOperator(node, context); + final DeviceLastQueryScanNode node, final LocalExecutionPlanContext context, final int idx) { + IMeasurementSchema measurementSchema = node.getMeasurementSchemas().get(idx); + final SeriesAggregationScanOperator lastQueryScan = + createLastQueryScanOperator(node, context, measurementSchema); + MeasurementPath fullPath = + node.getDevicePath().concatAsMeasurementPath(measurementSchema.getMeasurementName()); + fullPath.setMeasurementSchema(measurementSchema); final OperatorContext operatorContext = context .getDriverContext() @@ -2887,7 +2849,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP operatorContext, lastQueryScan, fullPath, - node.getSeriesPath().getSeriesType(), + measurementSchema.getType(), DATA_NODE_SCHEMA_CACHE, isNeedUpdateLastCache, context.isNeedUpdateNullEntry()) @@ -2895,7 +2857,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP operatorContext, lastQueryScan, fullPath, - node.getSeriesPath().getSeriesType(), + measurementSchema.getType(), DATA_NODE_SCHEMA_CACHE, isNeedUpdateLastCache, context.isNeedUpdateNullEntry(), @@ -2903,19 +2865,20 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP } private AlignedUpdateLastCacheOperator createAlignedUpdateLastCacheOperator( - final AlignedLastQueryScanNode node, + final String outputViewPath, + final PlanNodeId planNodeId, final AlignedPath unCachedPath, final LocalExecutionPlanContext context) { final AlignedSeriesAggregationScanOperator lastQueryScan = createLastQueryScanOperator( - node, (AlignedFullPath) IFullPath.convertToIFullPath(unCachedPath), context); + planNodeId, (AlignedFullPath) IFullPath.convertToIFullPath(unCachedPath), context); final OperatorContext operatorContext = context .getDriverContext() .addOperatorContext( context.getNextOperatorId(), - node.getPlanNodeId(), + planNodeId, AlignedUpdateLastCacheOperator.class.getSimpleName()); final boolean isNeedUpdateLastCache = context.isNeedUpdateLastCache(); @@ -2939,7 +2902,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP } } - return Objects.isNull(node.getOutputViewPath()) + return Objects.isNull(outputViewPath) ? new AlignedUpdateLastCacheOperator( operatorContext, lastQueryScan, @@ -2954,13 +2917,15 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP DATA_NODE_SCHEMA_CACHE, isNeedUpdateLastCache, context.isNeedUpdateNullEntry(), - node.getOutputViewPath()); + outputViewPath); } private SeriesAggregationScanOperator createLastQueryScanOperator( - LastQueryScanNode node, LocalExecutionPlanContext context) { + DeviceLastQueryScanNode node, + LocalExecutionPlanContext context, + IMeasurementSchema measurementSchema) { NonAlignedFullPath seriesPath = - (NonAlignedFullPath) IFullPath.convertToIFullPath(node.getSeriesPath()); + new NonAlignedFullPath(node.getDevicePath().getIDeviceID(), measurementSchema); OperatorContext operatorContext = context .getDriverContext() @@ -2977,7 +2942,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); scanOptionsBuilder.withAllSensors( - context.getAllSensors(seriesPath.getDeviceId(), seriesPath.getMeasurement())); + context.getAllSensors(seriesPath.getDeviceId(), measurementSchema.getMeasurementName())); scanOptionsBuilder.withGlobalTimeFilter(context.getGlobalTimeFilter()); SeriesAggregationScanOperator seriesAggregationScanOperator = @@ -2999,9 +2964,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP } private AlignedSeriesAggregationScanOperator createLastQueryScanOperator( - AlignedLastQueryScanNode node, - AlignedFullPath unCachedPath, - LocalExecutionPlanContext context) { + PlanNodeId planNodeId, AlignedFullPath unCachedPath, LocalExecutionPlanContext context) { // last_time, last_value List<TreeAggregator> aggregators = new ArrayList<>(); boolean canUseStatistics = true; @@ -3025,11 +2988,11 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP .getDriverContext() .addOperatorContext( context.getNextOperatorId(), - node.getPlanNodeId(), + planNodeId, AlignedSeriesAggregationScanOperator.class.getSimpleName()); AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = new AlignedSeriesAggregationScanOperator( - node.getPlanNodeId(), + planNodeId, unCachedPath, Ordering.DESC, scanOptionsBuilder.build(), @@ -3046,16 +3009,15 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP } @Override - public Operator visitAlignedLastQueryScan( - AlignedLastQueryScanNode node, LocalExecutionPlanContext context) { - AlignedPath alignedPath = node.getSeriesPath(); - PartialPath devicePath = alignedPath.getDevicePath(); - // get series under aligned entity that has not been cached + public Operator visitDeviceLastQueryScan( + DeviceLastQueryScanNode node, LocalExecutionPlanContext context) { + final PartialPath devicePath = node.getDevicePath(); + List<IMeasurementSchema> measurementSchemas = node.getMeasurementSchemas(); List<Integer> unCachedMeasurementIndexes = new ArrayList<>(); - List<String> measurementList = alignedPath.getMeasurementList(); - for (int i = 0; i < measurementList.size(); i++) { + for (int i = 0; i < measurementSchemas.size(); i++) { + IMeasurementSchema measurementSchema = measurementSchemas.get(i); final MeasurementPath measurementPath = - devicePath.concatAsMeasurementPath(measurementList.get(i)); + devicePath.concatAsMeasurementPath(measurementSchema.getMeasurementName()); TimeValuePair timeValuePair = null; context.dataNodeQueryContext.lock(); try { @@ -3094,12 +3056,29 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP } if (unCachedMeasurementIndexes.isEmpty()) { return null; + } + if (node.isAligned()) { + AlignedPath unCachedPath = new AlignedPath(node.getDevicePath()); + for (int i : unCachedMeasurementIndexes) { + IMeasurementSchema measurementSchema = measurementSchemas.get(i); + unCachedPath.addMeasurement(measurementSchema.getMeasurementName(), measurementSchema); + } + return createAlignedUpdateLastCacheOperator( + node.getOutputViewPath(), node.getPlanNodeId(), unCachedPath, context); } else { - AlignedPath unCachedPath = new AlignedPath(alignedPath.getDevicePath()); + List<Operator> operators = new ArrayList<>(unCachedMeasurementIndexes.size()); for (int i : unCachedMeasurementIndexes) { - unCachedPath.addMeasurement(measurementList.get(i), alignedPath.getSchemaList().get(i)); + Operator operator = createUpdateLastCacheOperator(node, context, i); + operators.add(operator); } - return createAlignedUpdateLastCacheOperator(node, unCachedPath, context); + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + LastQueryCollectOperator.class.getSimpleName()); + return new LastQueryCollectOperator(operatorContext, operators); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java index a7982c43dc8..fceff338cde 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java @@ -34,9 +34,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; @@ -130,12 +129,7 @@ public class SubPlanTypeExtractor { // region PlanNode of last read // No need to deal with type of last read @Override - public Void visitLastQueryScan(LastQueryScanNode node, Void context) { - return null; - } - - @Override - public Void visitAlignedLastQueryScan(AlignedLastQueryScanNode node, Void context) { + public Void visitDeviceLastQueryScan(DeviceLastQueryScanNode node, Void context) { return null; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java index 4fee3052a54..f2502d4247e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java @@ -61,10 +61,9 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.RegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; @@ -191,13 +190,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { } @Override - public PlanNode visitLastQueryScan(LastQueryScanNode node, NodeGroupContext context) { - return processNoChildSourceNode(node, context); - } - - @Override - public PlanNode visitAlignedLastQueryScan( - AlignedLastQueryScanNode node, NodeGroupContext context) { + public PlanNode visitDeviceLastQueryScan(DeviceLastQueryScanNode node, NodeGroupContext context) { return processNoChildSourceNode(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java index b2a43684a4b..e0dd18cf363 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java @@ -61,11 +61,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.Inner import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.RegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode; @@ -82,8 +81,10 @@ import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; import org.apache.iotdb.db.utils.constant.SqlConstant; +import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.Binary; import java.util.ArrayList; import java.util.Arrays; @@ -756,16 +757,8 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> } @Override - public List<PlanNode> visitLastQueryScan( - LastQueryScanNode node, DistributionPlanContext context) { - LastQueryNode mergeNode = - new LastQueryNode(context.queryContext.getQueryId().genPlanNodeId(), null, false); - return processRawSeriesScan(node, context, mergeNode); - } - - @Override - public List<PlanNode> visitAlignedLastQueryScan( - AlignedLastQueryScanNode node, DistributionPlanContext context) { + public List<PlanNode> visitDeviceLastQueryScan( + DeviceLastQueryScanNode node, DistributionPlanContext context) { LastQueryNode mergeNode = new LastQueryNode(context.queryContext.getQueryId().genPlanNodeId(), null, false); return processRawSeriesScan(node, context, mergeNode); @@ -981,8 +974,7 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> private void addSortForEachLastQueryNode(PlanNode root, Ordering timeseriesOrdering) { if (root instanceof LastQueryNode - && (root.getChildren().get(0) instanceof LastQueryScanNode - || root.getChildren().get(0) instanceof AlignedLastQueryScanNode)) { + && (root.getChildren().get(0) instanceof DeviceLastQueryScanNode)) { LastQueryNode lastQueryNode = (LastQueryNode) root; lastQueryNode.setTimeseriesOrdering(timeseriesOrdering); // sort children node @@ -992,10 +984,8 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> Comparator.comparing( child -> { String sortKey = ""; - if (child instanceof LastQueryScanNode) { - sortKey = ((LastQueryScanNode) child).getOutputSymbolForSort(); - } else if (child instanceof AlignedLastQueryScanNode) { - sortKey = ((AlignedLastQueryScanNode) child).getOutputSymbolForSort(); + if (child instanceof DeviceLastQueryScanNode) { + sortKey = ((DeviceLastQueryScanNode) child).getOutputSymbolForSort(); } return sortKey; })) @@ -1004,11 +994,17 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> .getChildren() .forEach( child -> { - if (child instanceof AlignedLastQueryScanNode) { - // sort the measurements of AlignedPath for LastQueryMergeOperator - ((AlignedLastQueryScanNode) child) - .getSeriesPath() - .sortMeasurement(Comparator.naturalOrder()); + if (child instanceof DeviceLastQueryScanNode) { + // sort the measurements for LastQueryMergeOperator + ((DeviceLastQueryScanNode) child) + .getMeasurementSchemas() + .sort( + Comparator.comparing( + iMeasurementSchema -> + new Binary( + iMeasurementSchema.getMeasurementName(), + TSFileConfig.STRING_CHARSET), + Comparator.naturalOrder())); } }); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index eaf0d700ba6..17149992f55 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -55,10 +55,9 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; @@ -544,26 +543,13 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter } @Override - public List<String> visitLastQueryScan(LastQueryScanNode node, GraphContext context) { + public List<String> visitDeviceLastQueryScan(DeviceLastQueryScanNode node, GraphContext context) { List<String> boxValue = new ArrayList<>(); - boxValue.add(String.format("LastQueryScan-%s", node.getPlanNodeId().getId())); - boxValue.add(String.format("Series: %s", node.getSeriesPath())); - if (StringUtil.isNotBlank(node.getOutputViewPath())) { - boxValue.add(String.format("ViewPath: %s", node.getOutputViewPath())); - } - boxValue.add(printRegion(node.getRegionReplicaSet())); - return render(node, boxValue, context); - } - - @Override - public List<String> visitAlignedLastQueryScan( - AlignedLastQueryScanNode node, GraphContext context) { - List<String> boxValue = new ArrayList<>(); - boxValue.add(String.format("AlignedLastQueryScan-%s", node.getPlanNodeId().getId())); + boxValue.add(String.format("DeviceLastQueryScan-%s", node.getPlanNodeId().getId())); + boxValue.add(String.format("Aligned: %s", node.isAligned())); boxValue.add( String.format( - "Series: %s%s", - node.getSeriesPath().getIDeviceID(), node.getSeriesPath().getMeasurementList())); + "Series: %s%s", node.getDevicePath().getIDeviceID(), node.getMeasurementSchemas())); if (StringUtil.isNotBlank(node.getOutputViewPath())) { boxValue.add(String.format("ViewPath: %s", node.getOutputViewPath())); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index bcae251f358..cdae773d000 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -96,11 +96,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; @@ -198,7 +197,9 @@ public enum PlanNodeType { NODE_MANAGEMENT_MEMORY_MERGE((short) 42), DELETE_DATA((short) 44), DELETE_TIME_SERIES((short) 45), + @Deprecated LAST_QUERY_SCAN((short) 46), + @Deprecated ALIGNED_LAST_QUERY_SCAN((short) 47), LAST_QUERY((short) 48), LAST_QUERY_MERGE((short) 49), @@ -258,6 +259,8 @@ public enum PlanNodeType { CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR((short) 97), + DEVICE_LAST_QUERY_SCAN((short) 98), + CREATE_OR_UPDATE_TABLE_DEVICE((short) 902), TABLE_DEVICE_QUERY_SCAN((short) 903), TABLE_DEVICE_FETCH((short) 904), @@ -475,9 +478,8 @@ public enum PlanNodeType { case 45: return DeleteTimeSeriesNode.deserialize(buffer); case 46: - return LastQueryScanNode.deserialize(buffer); case 47: - return AlignedLastQueryScanNode.deserialize(buffer); + throw new UnsupportedOperationException("LastQueryScanNode is deprecated"); case 48: return LastQueryNode.deserialize(buffer); case 49: @@ -580,6 +582,8 @@ public enum PlanNodeType { case 97: throw new UnsupportedOperationException( "You should never see ContinuousSameSearchIndexSeparatorNode in this function, because ContinuousSameSearchIndexSeparatorNode should never be used in network transmission."); + case 98: + return DeviceLastQueryScanNode.deserialize(buffer); case 902: return CreateOrUpdateTableDeviceNode.deserialize(buffer); case 903: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 331bbc3a261..28a7196080d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -97,11 +97,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.RegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode; @@ -195,11 +194,7 @@ public abstract class PlanVisitor<R, C> { return visitSeriesAggregationSourceNode(node, context); } - public R visitLastQueryScan(LastQueryScanNode node, C context) { - return visitSourceNode(node, context); - } - - public R visitAlignedLastQueryScan(AlignedLastQueryScanNode node, C context) { + public R visitDeviceLastQueryScan(DeviceLastQueryScanNode node, C context) { return visitSourceNode(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java index ef40d6cfd17..340d949721f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java @@ -30,7 +30,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; -import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; +import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceLastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; public class LastQueryCollectNode extends MultiChildProcessNode { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java index 7a7cfabff6a..6b854589ba6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java @@ -33,7 +33,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; -import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; +import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceLastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; public class LastQueryMergeNode extends MultiChildProcessNode { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java index 5d0589e40d4..81a1cab4723 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java @@ -35,7 +35,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; -import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; +import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceLastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; public class LastQueryNode extends MultiChildProcessNode { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java index 53f85d520a1..cac08fd09f0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java @@ -33,7 +33,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; -import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; +import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceLastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; public class LastQueryTransformNode extends SingleChildProcessNode { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java deleted file mode 100644 index ad834e9373b..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source; - -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.commons.path.AlignedPath; -import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.commons.path.PathDeserializeUtil; -import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; - -import com.google.common.collect.ImmutableList; -import org.apache.tsfile.utils.RamUsageEstimator; -import org.apache.tsfile.utils.ReadWriteIOUtils; -import org.eclipse.jetty.util.StringUtil; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; - -public class AlignedLastQueryScanNode extends LastSeriesSourceNode { - private static final long INSTANCE_SIZE = - RamUsageEstimator.shallowSizeOfInstance(AlignedLastQueryScanNode.class); - - // The path of the target series which will be scanned. - private final AlignedPath seriesPath; - - // The id of DataRegion where the node will run - private TRegionReplicaSet regionReplicaSet; - - private final String outputViewPath; - - public AlignedLastQueryScanNode(PlanNodeId id, AlignedPath seriesPath, String outputViewPath) { - super(id, new AtomicInteger(1)); - this.seriesPath = seriesPath; - this.outputViewPath = outputViewPath; - } - - public AlignedLastQueryScanNode( - PlanNodeId id, - AlignedPath seriesPath, - AtomicInteger dataNodeSeriesScanNum, - String outputViewPath, - TRegionReplicaSet regionReplicaSet) { - super(id, dataNodeSeriesScanNum); - this.seriesPath = seriesPath; - this.outputViewPath = outputViewPath; - this.regionReplicaSet = regionReplicaSet; - } - - public AlignedLastQueryScanNode( - PlanNodeId id, - AlignedPath seriesPath, - AtomicInteger dataNodeSeriesScanNum, - String outputViewPath) { - super(id, dataNodeSeriesScanNum); - this.seriesPath = seriesPath; - this.outputViewPath = outputViewPath; - } - - public String getOutputViewPath() { - return outputViewPath; - } - - @Override - public void open() throws Exception { - // Do nothing - } - - @Override - public TRegionReplicaSet getRegionReplicaSet() { - return regionReplicaSet; - } - - @Override - public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) { - this.regionReplicaSet = regionReplicaSet; - } - - @Override - public void close() throws Exception { - // Do nothing - } - - @Override - public List<PlanNode> getChildren() { - return ImmutableList.of(); - } - - @Override - public void addChild(PlanNode child) { - throw new UnsupportedOperationException("no child is allowed for SeriesScanNode"); - } - - @Override - public PlanNodeType getType() { - return PlanNodeType.ALIGNED_LAST_QUERY_SCAN; - } - - @Override - public PlanNode clone() { - return new AlignedLastQueryScanNode( - getPlanNodeId(), seriesPath, getDataNodeSeriesScanNum(), outputViewPath, regionReplicaSet); - } - - @Override - public int allowedChildCount() { - return NO_CHILD_ALLOWED; - } - - @Override - public List<String> getOutputColumnNames() { - return LAST_QUERY_HEADER_COLUMNS; - } - - @Override - public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitAlignedLastQueryScan(this, context); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - AlignedLastQueryScanNode that = (AlignedLastQueryScanNode) o; - return Objects.equals(seriesPath, that.seriesPath) - && Objects.equals(outputViewPath, that.outputViewPath) - && Objects.equals(regionReplicaSet, that.regionReplicaSet); - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), seriesPath, outputViewPath, regionReplicaSet); - } - - @Override - public String toString() { - if (StringUtil.isNotBlank(outputViewPath)) { - return String.format( - "AlignedLastQueryScanNode-%s:[SeriesPath: %s, ViewPath: %s, DataRegion: %s]", - this.getPlanNodeId(), - this.getSeriesPath().getFormattedString(), - this.getOutputViewPath(), - PlanNodeUtil.printRegionReplicaSet(this.getRegionReplicaSet())); - } else { - return String.format( - "AlignedLastQueryScanNode-%s:[SeriesPath: %s, DataRegion: %s]", - this.getPlanNodeId(), - this.getSeriesPath().getFormattedString(), - PlanNodeUtil.printRegionReplicaSet(this.getRegionReplicaSet())); - } - } - - @Override - protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.ALIGNED_LAST_QUERY_SCAN.serialize(byteBuffer); - seriesPath.serialize(byteBuffer); - ReadWriteIOUtils.write(getDataNodeSeriesScanNum().get(), byteBuffer); - ReadWriteIOUtils.write(outputViewPath == null, byteBuffer); - if (outputViewPath != null) { - ReadWriteIOUtils.write(outputViewPath, byteBuffer); - } - } - - @Override - protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.ALIGNED_LAST_QUERY_SCAN.serialize(stream); - seriesPath.serialize(stream); - ReadWriteIOUtils.write(getDataNodeSeriesScanNum().get(), stream); - ReadWriteIOUtils.write(outputViewPath == null, stream); - if (outputViewPath != null) { - ReadWriteIOUtils.write(outputViewPath, stream); - } - } - - public static AlignedLastQueryScanNode deserialize(ByteBuffer byteBuffer) { - AlignedPath partialPath = (AlignedPath) PathDeserializeUtil.deserialize(byteBuffer); - int dataNodeSeriesScanNum = ReadWriteIOUtils.readInt(byteBuffer); - boolean isNull = ReadWriteIOUtils.readBool(byteBuffer); - String outputPathSymbol = isNull ? null : ReadWriteIOUtils.readString(byteBuffer); - PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new AlignedLastQueryScanNode( - planNodeId, partialPath, new AtomicInteger(dataNodeSeriesScanNum), outputPathSymbol); - } - - public AlignedPath getSeriesPath() { - return seriesPath; - } - - public String getOutputSymbolForSort() { - if (outputViewPath != null) { - return outputViewPath; - } - if (seriesPath.getMeasurementList().size() > 1) { - return seriesPath.getIDeviceID().toString(); - } - return seriesPath.transformToPartialPath().getFullPath(); - } - - @Override - public PartialPath getPartitionPath() { - return getSeriesPath(); - } - - @Override - public long ramBytesUsed() { - return INSTANCE_SIZE - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id) - + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath) - + RamUsageEstimator.sizeOf(outputViewPath); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/DeviceLastQueryScanNode.java similarity index 50% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/DeviceLastQueryScanNode.java index 2663c6e0c9c..ac8ed4c62a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/DeviceLastQueryScanNode.java @@ -16,12 +16,13 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.commons.path.PathDeserializeUtil; import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; @@ -33,19 +34,24 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import com.google.common.collect.ImmutableList; import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.schema.VectorMeasurementSchema; import org.eclipse.jetty.util.StringUtil; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; -public class LastQueryScanNode extends LastSeriesSourceNode { +public class DeviceLastQueryScanNode extends LastSeriesSourceNode { private static final long INSTANCE_SIZE = - RamUsageEstimator.shallowSizeOfInstance(LastQueryScanNode.class); + RamUsageEstimator.shallowSizeOfInstance(DeviceLastQueryScanNode.class); public static final List<String> LAST_QUERY_HEADER_COLUMNS = ImmutableList.of( @@ -53,38 +59,71 @@ public class LastQueryScanNode extends LastSeriesSourceNode { ColumnHeaderConstant.VALUE, ColumnHeaderConstant.DATATYPE); - // The path of the target series which will be scanned. - private final MeasurementPath seriesPath; + private final PartialPath devicePath; + private final boolean aligned; + private final List<IMeasurementSchema> measurementSchemas; private final String outputViewPath; // The id of DataRegion where the node will run private TRegionReplicaSet regionReplicaSet; - public LastQueryScanNode(PlanNodeId id, MeasurementPath seriesPath, String outputViewPath) { + public DeviceLastQueryScanNode( + PlanNodeId id, + PartialPath devicePath, + boolean aligned, + List<IMeasurementSchema> measurementSchemas, + String outputViewPath) { + super(id, new AtomicInteger(1)); + this.aligned = aligned; + this.devicePath = devicePath; + this.measurementSchemas = measurementSchemas; + this.outputViewPath = outputViewPath; + } + + public DeviceLastQueryScanNode( + PlanNodeId id, MeasurementPath measurementPath, String outputViewPath) { super(id, new AtomicInteger(1)); - this.seriesPath = seriesPath; + this.aligned = measurementPath.isUnderAlignedEntity(); + this.devicePath = measurementPath.getDevicePath(); + this.measurementSchemas = Collections.singletonList(measurementPath.getMeasurementSchema()); this.outputViewPath = outputViewPath; } - public LastQueryScanNode( + public DeviceLastQueryScanNode(PlanNodeId id, AlignedPath alignedPath, String outputViewPath) { + super(id, new AtomicInteger(1)); + this.aligned = true; + this.devicePath = alignedPath.getDevicePath(); + this.measurementSchemas = alignedPath.getSchemaList(); + this.outputViewPath = outputViewPath; + } + + public DeviceLastQueryScanNode( PlanNodeId id, - MeasurementPath seriesPath, + PartialPath devicePath, + boolean aligned, + List<IMeasurementSchema> measurementSchemas, AtomicInteger dataNodeSeriesScanNum, String outputViewPath) { super(id, dataNodeSeriesScanNum); - this.seriesPath = seriesPath; + this.aligned = aligned; + this.devicePath = devicePath; + this.measurementSchemas = measurementSchemas; this.outputViewPath = outputViewPath; } - public LastQueryScanNode( + public DeviceLastQueryScanNode( PlanNodeId id, - MeasurementPath seriesPath, + PartialPath devicePath, + boolean aligned, + List<IMeasurementSchema> measurementSchemas, AtomicInteger dataNodeSeriesScanNum, String outputViewPath, TRegionReplicaSet regionReplicaSet) { super(id, dataNodeSeriesScanNum); - this.seriesPath = seriesPath; + this.devicePath = devicePath; + this.aligned = aligned; + this.measurementSchemas = measurementSchemas; this.outputViewPath = outputViewPath; this.regionReplicaSet = regionReplicaSet; } @@ -102,8 +141,12 @@ public class LastQueryScanNode extends LastSeriesSourceNode { this.regionReplicaSet = regionReplicaSet; } - public MeasurementPath getSeriesPath() { - return seriesPath; + public PartialPath getSeriesPath() { + return devicePath; + } + + public boolean isAligned() { + return this.aligned; } public String getOutputViewPath() { @@ -114,7 +157,7 @@ public class LastQueryScanNode extends LastSeriesSourceNode { if (outputViewPath != null) { return outputViewPath; } - return seriesPath.getFullPath(); + return devicePath.toString(); } @Override @@ -132,13 +175,19 @@ public class LastQueryScanNode extends LastSeriesSourceNode { @Override public PlanNodeType getType() { - return PlanNodeType.LAST_QUERY_SCAN; + return PlanNodeType.DEVICE_LAST_QUERY_SCAN; } @Override public PlanNode clone() { - return new LastQueryScanNode( - getPlanNodeId(), seriesPath, getDataNodeSeriesScanNum(), outputViewPath, regionReplicaSet); + return new DeviceLastQueryScanNode( + getPlanNodeId(), + devicePath, + aligned, + measurementSchemas, + getDataNodeSeriesScanNum(), + outputViewPath, + regionReplicaSet); } @Override @@ -153,7 +202,7 @@ public class LastQueryScanNode extends LastSeriesSourceNode { @Override public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitLastQueryScan(this, context); + return visitor.visitDeviceLastQueryScan(this, context); } @Override @@ -161,39 +210,61 @@ public class LastQueryScanNode extends LastSeriesSourceNode { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; if (!super.equals(o)) return false; - LastQueryScanNode that = (LastQueryScanNode) o; - return Objects.equals(seriesPath, that.seriesPath) + DeviceLastQueryScanNode that = (DeviceLastQueryScanNode) o; + return Objects.equals(devicePath, that.devicePath) + && Objects.equals(aligned, that.aligned) + && Objects.equals(measurementSchemas, that.measurementSchemas) && Objects.equals(outputViewPath, that.outputViewPath) && Objects.equals(regionReplicaSet, that.regionReplicaSet); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), seriesPath, outputViewPath, regionReplicaSet); + return Objects.hash( + super.hashCode(), + devicePath, + aligned, + measurementSchemas, + outputViewPath, + regionReplicaSet); } @Override public String toString() { if (StringUtil.isNotBlank(outputViewPath)) { return String.format( - "LastQueryScanNode-%s:[SeriesPath: %s, ViewPath: %s, DataRegion: %s]", + "DeviceLastQueryScanNode-%s:[Device: %s, Aligned: %s, Measurements: %s, ViewPath: %s, DataRegion: %s]", this.getPlanNodeId(), - this.getSeriesPath(), + this.getDevicePath(), + this.aligned, + this.getMeasurementSchemas(), this.getOutputViewPath(), PlanNodeUtil.printRegionReplicaSet(getRegionReplicaSet())); } else { return String.format( - "LastQueryScanNode-%s:[SeriesPath: %s, DataRegion: %s]", + "DeviceLastQueryScanNode-%s:[Device: %s, Aligned: %s, Measurements: %s, DataRegion: %s]", this.getPlanNodeId(), - this.getSeriesPath(), + this.getDevicePath(), + this.aligned, + this.getMeasurementSchemas(), PlanNodeUtil.printRegionReplicaSet(getRegionReplicaSet())); } } @Override protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.LAST_QUERY_SCAN.serialize(byteBuffer); - seriesPath.serialize(byteBuffer); + PlanNodeType.DEVICE_LAST_QUERY_SCAN.serialize(byteBuffer); + devicePath.serialize(byteBuffer); + ReadWriteIOUtils.write(aligned, byteBuffer); + ReadWriteIOUtils.write(measurementSchemas.size(), byteBuffer); + for (IMeasurementSchema measurementSchema : measurementSchemas) { + if (measurementSchema instanceof MeasurementSchema) { + ReadWriteIOUtils.write((byte) 0, byteBuffer); + } else if (measurementSchema instanceof VectorMeasurementSchema) { + ReadWriteIOUtils.write((byte) 1, byteBuffer); + } + measurementSchema.serializeTo(byteBuffer); + } ReadWriteIOUtils.write(getDataNodeSeriesScanNum().get(), byteBuffer); ReadWriteIOUtils.write(outputViewPath == null, byteBuffer); if (outputViewPath != null) { @@ -203,8 +274,18 @@ public class LastQueryScanNode extends LastSeriesSourceNode { @Override protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.LAST_QUERY_SCAN.serialize(stream); - seriesPath.serialize(stream); + PlanNodeType.DEVICE_LAST_QUERY_SCAN.serialize(stream); + devicePath.serialize(stream); + ReadWriteIOUtils.write(aligned, stream); + ReadWriteIOUtils.write(measurementSchemas.size(), stream); + for (IMeasurementSchema measurementSchema : measurementSchemas) { + if (measurementSchema instanceof MeasurementSchema) { + ReadWriteIOUtils.write((byte) 0, stream); + } else if (measurementSchema instanceof VectorMeasurementSchema) { + ReadWriteIOUtils.write((byte) 1, stream); + } + measurementSchema.serializeTo(stream); + } ReadWriteIOUtils.write(getDataNodeSeriesScanNum().get(), stream); ReadWriteIOUtils.write(outputViewPath == null, stream); if (outputViewPath != null) { @@ -212,34 +293,54 @@ public class LastQueryScanNode extends LastSeriesSourceNode { } } - public static LastQueryScanNode deserialize(ByteBuffer byteBuffer) { - MeasurementPath partialPath = (MeasurementPath) PathDeserializeUtil.deserialize(byteBuffer); + public static DeviceLastQueryScanNode deserialize(ByteBuffer byteBuffer) { + PartialPath devicePath = PartialPath.deserialize(byteBuffer); + boolean aligned = ReadWriteIOUtils.readBool(byteBuffer); + int measurementSize = ReadWriteIOUtils.readInt(byteBuffer); + List<IMeasurementSchema> measurementSchemas = new ArrayList<>(measurementSize); + for (int i = 0; i < measurementSize; i++) { + byte type = ReadWriteIOUtils.readByte(byteBuffer); + if (type == 0) { + measurementSchemas.add(MeasurementSchema.deserializeFrom(byteBuffer)); + } else if (type == 1) { + measurementSchemas.add(VectorMeasurementSchema.deserializeFrom(byteBuffer)); + } + } + int dataNodeSeriesScanNum = ReadWriteIOUtils.readInt(byteBuffer); boolean isNull = ReadWriteIOUtils.readBool(byteBuffer); String outputPathSymbol = isNull ? null : ReadWriteIOUtils.readString(byteBuffer); PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new LastQueryScanNode( - planNodeId, partialPath, new AtomicInteger(dataNodeSeriesScanNum), outputPathSymbol); + return new DeviceLastQueryScanNode( + planNodeId, + devicePath, + aligned, + measurementSchemas, + new AtomicInteger(dataNodeSeriesScanNum), + outputPathSymbol); } - @Override - public PartialPath getPartitionPath() { - return getSeriesPath(); + public PartialPath getDevicePath() { + return this.devicePath; } - public String outputPathSymbol() { - if (outputViewPath == null) { - return seriesPath.getFullPath(); - } else { - return outputViewPath; - } + public List<IMeasurementSchema> getMeasurementSchemas() { + return measurementSchemas; + } + + @Override + public PartialPath getPartitionPath() { + return devicePath; } @Override public long ramBytesUsed() { return INSTANCE_SIZE + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id) - + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath) + + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(devicePath) + + measurementSchemas.stream() + .mapToLong(schema -> RamUsageEstimator.sizeOf(schema.getMeasurementName())) + .sum() + RamUsageEstimator.sizeOf(outputViewPath); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java index f0c930d0d00..30e6912f25f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.queryengine.plan.planner.distribution; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; @@ -32,8 +31,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNo import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceLastQueryScanNode; import org.junit.Assert; import org.junit.Test; @@ -197,14 +195,8 @@ public class LastQueryTest { List<PlanNode> sourceNodeList = new ArrayList<>(); for (String path : paths) { MeasurementPath selectPath = new MeasurementPath(path); - if (selectPath.isUnderAlignedEntity()) { - sourceNodeList.add( - new AlignedLastQueryScanNode( - context.getQueryId().genPlanNodeId(), new AlignedPath(selectPath), null)); - } else { - sourceNodeList.add( - new LastQueryScanNode(context.getQueryId().genPlanNodeId(), selectPath, null)); - } + sourceNodeList.add( + new DeviceLastQueryScanNode(context.getQueryId().genPlanNodeId(), selectPath, null)); } PlanNode root = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java index f52b8c241a1..4209f6bdac4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java @@ -39,9 +39,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAgg import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; @@ -53,6 +52,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.junit.Assert; import org.junit.Test; @@ -77,29 +77,39 @@ public class DataQueryLogicalPlannerTest { // fake initResultNodeContext() queryId.genPlanNodeId(); - LastQueryScanNode d1s1 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d1.s1"), null); - LastQueryScanNode d1s2 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d1.s2"), null); - LastQueryScanNode d1s3 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d1.s3"), null); - LastQueryScanNode d2s1 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d2.s1"), null); - LastQueryScanNode d2s2 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d2.s2"), null); - LastQueryScanNode d2s4 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d2.s4"), null); - AlignedLastQueryScanNode d2a = - new AlignedLastQueryScanNode( + List<IMeasurementSchema> measurementSchemas = + Arrays.asList( + ((MeasurementPath) schemaMap.get("root.sg.d1.s1")).getMeasurementSchema(), + ((MeasurementPath) schemaMap.get("root.sg.d1.s2")).getMeasurementSchema(), + ((MeasurementPath) schemaMap.get("root.sg.d1.s3")).getMeasurementSchema()); + MeasurementPath d1s1Path = (MeasurementPath) schemaMap.get("root.sg.d1.s1"); + DeviceLastQueryScanNode d1s123 = + new DeviceLastQueryScanNode( + queryId.genPlanNodeId(), + d1s1Path.getDevicePath(), + d1s1Path.isUnderAlignedEntity(), + measurementSchemas, + null); + + measurementSchemas = + Arrays.asList( + ((MeasurementPath) schemaMap.get("root.sg.d2.s1")).getMeasurementSchema(), + ((MeasurementPath) schemaMap.get("root.sg.d2.s2")).getMeasurementSchema(), + ((MeasurementPath) schemaMap.get("root.sg.d2.s4")).getMeasurementSchema()); + MeasurementPath d2s1Path = (MeasurementPath) schemaMap.get("root.sg.d2.s1"); + DeviceLastQueryScanNode d2s124 = + new DeviceLastQueryScanNode( + queryId.genPlanNodeId(), + d2s1Path.getDevicePath(), + d2s1Path.isUnderAlignedEntity(), + measurementSchemas, + null); + + DeviceLastQueryScanNode d2a = + new DeviceLastQueryScanNode( queryId.genPlanNodeId(), (AlignedPath) schemaMap.get("root.sg.d2.a"), null); - List<PlanNode> sourceNodeList = Arrays.asList(d1s1, d1s2, d1s3, d2a, d2s1, d2s2, d2s4); + List<PlanNode> sourceNodeList = Arrays.asList(d1s123, d2s124, d2a); LastQueryNode lastQueryNode = new LastQueryNode(queryId.genPlanNodeId(), sourceNodeList, Ordering.ASC, false); @@ -115,19 +125,22 @@ public class DataQueryLogicalPlannerTest { // fake initResultNodeContext() queryId.genPlanNodeId(); - LastQueryScanNode d1s3 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d1.s3"), null); - LastQueryScanNode d1s1 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d1.s1"), null); - LastQueryScanNode d1s2 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d1.s2"), null); - - List<PlanNode> sourceNodeList = Arrays.asList(d1s3, d1s1, d1s2); + List<IMeasurementSchema> measurementSchemas = + Arrays.asList( + ((MeasurementPath) schemaMap.get("root.sg.d1.s3")).getMeasurementSchema(), + ((MeasurementPath) schemaMap.get("root.sg.d1.s1")).getMeasurementSchema(), + ((MeasurementPath) schemaMap.get("root.sg.d1.s2")).getMeasurementSchema()); + MeasurementPath s3Path = (MeasurementPath) schemaMap.get("root.sg.d1.s3"); + DeviceLastQueryScanNode d1s312 = + new DeviceLastQueryScanNode( + queryId.genPlanNodeId(), + s3Path.getDevicePath(), + s3Path.isUnderAlignedEntity(), + measurementSchemas, + null); + LastQueryNode lastQueryNode = - new LastQueryNode(queryId.genPlanNodeId(), sourceNodeList, null, false); + new LastQueryNode(queryId.genPlanNodeId(), Collections.singletonList(d1s312), null, false); SortNode sortNode = new SortNode( queryId.genPlanNodeId(),
