This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/rel-1.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7396767cb731a916607d91a07000e74057498507 Author: Beyyes <[email protected]> AuthorDate: Fri Sep 15 20:03:00 2023 +0800 Enhance last query, support non single base series (#11120) --- .../operator/process/last/LastQueryOperator.java | 6 +- .../process/last/LastQuerySortOperator.java | 4 +- .../process/last/LastQueryTransformOperator.java | 120 ++++++++++++++++++ .../db/queryengine/plan/analyze/Analysis.java | 42 ++++++- .../queryengine/plan/analyze/AnalyzeVisitor.java | 33 +++-- .../schema/lastcache/ILastCacheContainer.java | 2 +- .../plan/planner/LogicalPlanBuilder.java | 134 ++++++++++++++++----- .../plan/planner/LogicalPlanVisitor.java | 11 +- .../plan/planner/OperatorTreeGenerator.java | 26 +++- .../plan/planner/SubPlanTypeExtractor.java | 15 +++ .../planner/distribution/DistributionPlanner.java | 4 +- .../planner/distribution/ExchangeNodeAdder.java | 12 +- .../SimpleFragmentParallelPlanner.java | 5 +- .../plan/planner/distribution/SourceRewriter.java | 15 ++- .../db/queryengine/plan/planner/plan/SubPlan.java | 5 +- .../plan/planner/plan/node/PlanGraphPrinter.java | 9 ++ .../plan/planner/plan/node/PlanNodeType.java | 11 +- .../plan/planner/plan/node/PlanVisitor.java | 5 + .../node/process/last/LastQueryCollectNode.java | 17 ++- .../plan/node/process/last/LastQueryMergeNode.java | 24 ++-- .../plan/node/process/last/LastQueryNode.java | 27 ++++- ...ollectNode.java => LastQueryTransformNode.java} | 89 ++++++++------ .../impl/mem/mnode/factory/MemMNodeFactory.java | 4 +- .../impl/mem/mnode/impl/LogicalViewMNode.java | 64 ++++++++++ .../execution/operator/OperatorMemoryTest.java | 5 +- .../plan/plan/QueryLogicalPlanUtil.java | 2 +- .../plan/plan/distribution/LastQueryTest.java | 2 +- 27 files changed, 562 insertions(+), 131 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java index 1408207c828..b1150507b55 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java @@ -44,7 +44,7 @@ public class LastQueryOperator implements ProcessOperator { private final OperatorContext operatorContext; - private final List<AbstractUpdateLastCacheOperator> children; + private final List<Operator> children; private final int inputOperatorsCount; @@ -53,9 +53,7 @@ public class LastQueryOperator implements ProcessOperator { private TsBlockBuilder tsBlockBuilder; public LastQueryOperator( - OperatorContext operatorContext, - List<AbstractUpdateLastCacheOperator> children, - TsBlockBuilder builder) { + OperatorContext operatorContext, List<Operator> children, TsBlockBuilder builder) { this.operatorContext = operatorContext; this.children = children; this.inputOperatorsCount = children.size(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java index f08aa156b21..aafdc71df9d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java @@ -55,7 +55,7 @@ public class LastQuerySortOperator implements ProcessOperator { private int cachedTsBlockRowIndex; // we must make sure that Operator in children has already been sorted - private final List<AbstractUpdateLastCacheOperator> children; + private final List<Operator> children; private final OperatorContext operatorContext; @@ -75,7 +75,7 @@ public class LastQuerySortOperator implements ProcessOperator { public LastQuerySortOperator( OperatorContext operatorContext, TsBlock cachedTsBlock, - List<AbstractUpdateLastCacheOperator> children, + List<Operator> children, Comparator<Binary> timeSeriesComparator) { this.cachedTsBlock = cachedTsBlock; this.cachedTsBlockSize = cachedTsBlock.getPositionCount(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryTransformOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryTransformOperator.java new file mode 100644 index 00000000000..0b87821bb1d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryTransformOperator.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.last; + +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; + +import com.google.common.util.concurrent.ListenableFuture; + +public class LastQueryTransformOperator implements ProcessOperator { + + private String viewPath; + + private String dataType; + + private final OperatorContext operatorContext; + + // the child of LastQueryTransformOperator will always be AggOperator + private final Operator child; + + private TsBlockBuilder tsBlockBuilder; + + public LastQueryTransformOperator( + String viewPath, String dataType, OperatorContext operatorContext, Operator child) { + this.viewPath = viewPath; + this.dataType = dataType; + this.operatorContext = operatorContext; + this.child = child; + this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(1); + } + + @Override + public OperatorContext getOperatorContext() { + return this.operatorContext; + } + + @Override + public ListenableFuture<?> isBlocked() { + return child.isBlocked(); + } + + @Override + public TsBlock next() throws Exception { + if (!tsBlockBuilder.isFull()) { + TsBlock tsBlock = child.nextWithTimer(); + if (tsBlock == null) { + return null; + } else if (!tsBlock.isEmpty()) { + if (tsBlock.getColumn(1).isNull(0)) { + return null; + } + LastQueryUtil.appendLastValue( + tsBlockBuilder, + tsBlock.getColumn(0).getLong(0), + viewPath, + tsBlock.getColumn(1).getTsPrimitiveType(0).getStringValue(), + dataType); + } + } else { + child.close(); + } + + TsBlock res = tsBlockBuilder.build(); + tsBlockBuilder.reset(); + return res; + } + + @Override + public boolean hasNext() throws Exception { + return child.hasNext(); + } + + @Override + public boolean isFinished() throws Exception { + return !hasNextWithTimer(); + } + + @Override + public long calculateMaxPeekMemory() { + return Math.max(child.calculateMaxPeekMemory(), child.calculateRetainedSizeAfterCallingNext()); + } + + @Override + public long calculateMaxReturnSize() { + return child.calculateMaxReturnSize(); + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return child.calculateRetainedSizeAfterCallingNext(); + } + + @Override + public void close() throws Exception { + if (child != null) { + child.close(); + } + tsBlockBuilder = null; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java index caeb12e3295..341b4cd8b61 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java @@ -209,6 +209,12 @@ public class Analysis { // timeseries, otherwise it will be null private Ordering timeseriesOrderingForLastQuery = null; + // Used to store view expression in last query which is non-writable + private Set<Expression> lastQueryNonWritableViewExpressions; + private Map<Expression, List<Expression>> lastQueryNonWritableViewSourceExpressionMap; + + private Set<Expression> lastQueryBaseExpressions; + // header of result dataset private DatasetHeader respDatasetHeader; @@ -344,7 +350,7 @@ public class Analysis { return null; } TSDataType type = expressionTypes.get(NodeRef.of(expression)); - checkArgument(type != null, "Expression not analyzed: %s", expression); + checkArgument(type != null, "Expression is not analyzed: %s", expression); return type; } @@ -726,6 +732,40 @@ public class Analysis { this.timeseriesOrderingForLastQuery = timeseriesOrderingForLastQuery; } + public Set<Expression> getLastQueryBaseExpressions() { + return this.lastQueryBaseExpressions; + } + + public void setLastQueryBaseExpressions(Set<Expression> lastQueryBaseExpressions) { + this.lastQueryBaseExpressions = lastQueryBaseExpressions; + } + + public Set<Expression> getLastQueryNonWritableViewExpressions() { + return this.lastQueryNonWritableViewExpressions; + } + + public void setLastQueryNonWritableViewExpression( + Set<Expression> lastQueryNonWritableViewExpression) { + this.lastQueryNonWritableViewExpressions = lastQueryNonWritableViewExpression; + } + + public Map<Expression, List<Expression>> getLastQueryNonWritableViewSourceExpressionMap() { + return this.lastQueryNonWritableViewSourceExpressionMap; + } + + public void setLastQueryNonWritableViewSourceExpressionMap( + Map<Expression, List<Expression>> lastQueryNonWritableViewSourceExpressionMap) { + this.lastQueryNonWritableViewSourceExpressionMap = lastQueryNonWritableViewSourceExpressionMap; + } + + public ModelInferenceDescriptor getModelInferenceDescriptor() { + return modelInferenceDescriptor; + } + + public void setModelInferenceDescriptor(ModelInferenceDescriptor modelInferenceDescriptor) { + this.modelInferenceDescriptor = modelInferenceDescriptor; + } + public Map<String, List<String>> getOutputDeviceToQueriedDevicesMap() { return outputDeviceToQueriedDevicesMap; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 051b55db58f..d3f8c49c6f3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -435,20 +435,37 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> private void analyzeLastSource( Analysis analysis, List<Expression> selectExpressions, ISchemaTree schemaTree) { - Set<Expression> sourceExpressions; - - sourceExpressions = new LinkedHashSet<>(); + Set<Expression> sourceExpressions = new LinkedHashSet<>(); + Set<Expression> lastQueryBaseExpressions = new LinkedHashSet<>(); + Set<Expression> lastQueryNonWritableViewExpressions = null; + Map<Expression, List<Expression>> lastQueryNonWritableViewSourceExpressionMap = null; for (Expression selectExpression : selectExpressions) { - for (Expression sourceExpression : bindSchemaForExpression(selectExpression, schemaTree)) { - if (!(sourceExpression instanceof TimeSeriesOperand)) { - throw new SemanticException( - "Views with functions and expressions cannot be used in LAST query"); + for (Expression lastQuerySourceExpression : + bindSchemaForExpression(selectExpression, schemaTree)) { + if (lastQuerySourceExpression instanceof TimeSeriesOperand) { + lastQueryBaseExpressions.add(lastQuerySourceExpression); + sourceExpressions.add(lastQuerySourceExpression); + } else { + if (lastQueryNonWritableViewExpressions == null) { + lastQueryNonWritableViewExpressions = new LinkedHashSet<>(); + lastQueryNonWritableViewSourceExpressionMap = new HashMap<>(); + } + List<Expression> sourceExpressionsOfNonWritableView = + searchSourceExpressions(lastQuerySourceExpression); + lastQueryNonWritableViewExpressions.add(lastQuerySourceExpression); + lastQueryNonWritableViewSourceExpressionMap.put( + lastQuerySourceExpression, sourceExpressionsOfNonWritableView); + sourceExpressions.addAll(sourceExpressionsOfNonWritableView); } - sourceExpressions.add(sourceExpression); } } + analysis.setSourceExpressions(sourceExpressions); + analysis.setLastQueryBaseExpressions(lastQueryBaseExpressions); + analysis.setLastQueryNonWritableViewExpression(lastQueryNonWritableViewExpressions); + analysis.setLastQueryNonWritableViewSourceExpressionMap( + lastQueryNonWritableViewSourceExpressionMap); } private void updateSchemaTreeByViews(Analysis analysis, ISchemaTree originSchemaTree) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/ILastCacheContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/ILastCacheContainer.java index 20f5eb711db..3ada99d8ebf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/ILastCacheContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/ILastCacheContainer.java @@ -24,7 +24,7 @@ import org.apache.iotdb.tsfile.read.TimeValuePair; /** this interface declares the operations of LastCache data */ public interface ILastCacheContainer { - // get lastCache of monad timseries + // get lastCache of monad timeseries TimeValuePair getCachedLast(); /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index fbd80dbe22f..42eb082ea5f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -71,6 +71,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; @@ -106,11 +107,14 @@ import org.apache.commons.lang.Validate; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -121,7 +125,10 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.DEVICE; import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpression; import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME; +import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_VALUE; +import static org.apache.iotdb.db.utils.constant.SqlConstant.MAX_TIME; public class LogicalPlanBuilder { @@ -214,16 +221,13 @@ public class LogicalPlanBuilder { } public LogicalPlanBuilder planLast( - Set<Expression> sourceExpressions, Filter globalTimeFilter, Ordering timeseriesOrdering) { - List<PlanNode> sourceNodeList = new ArrayList<>(); + Analysis analysis, Ordering timeseriesOrdering, Ordering resultTimeOrder, ZoneId zoneId) { + Set<String> deviceAlignedSet = new HashSet<>(); + Set<String> deviceExistViewSet = new HashSet<>(); + // <Device, <Measurement, Expression>> + Map<String, Map<String, Expression>> outputPathToSourceExpressionMap = new LinkedHashMap<>(); - Map<String, Boolean> deviceAlignedMap = new HashMap<>(); - Map<String, Boolean> deviceExistViewMap = new HashMap<>(); - Map<String, Map<String, Expression>> outputPathToSourceExpressionMap = - timeseriesOrdering != null - ? new TreeMap<>(timeseriesOrdering.getStringComparator()) - : new LinkedHashMap<>(); - for (Expression sourceExpression : sourceExpressions) { + for (Expression sourceExpression : analysis.getLastQueryBaseExpressions()) { MeasurementPath outputPath = (MeasurementPath) (sourceExpression.isViewExpression() @@ -238,46 +242,47 @@ public class LogicalPlanBuilder { ? new TreeMap<>(timeseriesOrdering.getStringComparator()) : new LinkedHashMap<>()) .put(outputPath.getMeasurement(), sourceExpression); - if (!deviceAlignedMap.containsKey(outputDevice)) { - deviceAlignedMap.put(outputDevice, outputPath.isUnderAlignedEntity()); + if (outputPath.isUnderAlignedEntity()) { + deviceAlignedSet.add(outputDevice); + } + if (sourceExpression.isViewExpression()) { + deviceExistViewSet.add(outputDevice); } - deviceExistViewMap.put( - outputDevice, - deviceExistViewMap.getOrDefault(outputDevice, false) - || sourceExpression.isViewExpression()); } + List<PlanNode> sourceNodeList = new ArrayList<>(); for (Map.Entry<String, Map<String, Expression>> deviceMeasurementExpressionEntry : outputPathToSourceExpressionMap.entrySet()) { String outputDevice = deviceMeasurementExpressionEntry.getKey(); - if (deviceExistViewMap.get(outputDevice)) { + Map<String, Expression> measurementToExpressionsOfDevice = + deviceMeasurementExpressionEntry.getValue(); + if (deviceExistViewSet.contains(outputDevice)) { // exist view - for (Expression sourceExpression : deviceMeasurementExpressionEntry.getValue().values()) { + for (Expression sourceExpression : measurementToExpressionsOfDevice.values()) { MeasurementPath selectedPath = (MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath(); + String outputViewPath = + sourceExpression.isViewExpression() + ? sourceExpression.getViewPath().getFullPath() + : null; + if (selectedPath.isUnderAlignedEntity()) { // aligned series sourceNodeList.add( new AlignedLastQueryScanNode( context.getQueryId().genPlanNodeId(), new AlignedPath(selectedPath), - sourceExpression.isViewExpression() - ? sourceExpression.getViewPath().getFullPath() - : null)); + outputViewPath)); } else { // non-aligned series sourceNodeList.add( new LastQueryScanNode( - context.getQueryId().genPlanNodeId(), - selectedPath, - sourceExpression.isViewExpression() - ? sourceExpression.getViewPath().getFullPath() - : null)); + context.getQueryId().genPlanNodeId(), selectedPath, outputViewPath)); } } } else { - if (deviceAlignedMap.get(outputDevice)) { + if (deviceAlignedSet.contains(outputDevice)) { // aligned series List<MeasurementPath> measurementPaths = - deviceMeasurementExpressionEntry.getValue().values().stream() + measurementToExpressionsOfDevice.values().stream() .map(expression -> (MeasurementPath) ((TimeSeriesOperand) expression).getPath()) .collect(Collectors.toList()); AlignedPath alignedPath = new AlignedPath(measurementPaths.get(0).getDevicePath()); @@ -289,7 +294,7 @@ public class LogicalPlanBuilder { context.getQueryId().genPlanNodeId(), alignedPath, null)); } else { // non-aligned series - for (Expression sourceExpression : deviceMeasurementExpressionEntry.getValue().values()) { + for (Expression sourceExpression : measurementToExpressionsOfDevice.values()) { MeasurementPath selectedPath = (MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath(); sourceNodeList.add( @@ -299,12 +304,81 @@ public class LogicalPlanBuilder { } } + Set<Expression> lastQueryNonWriteViewExpressions = + analysis.getLastQueryNonWritableViewExpressions(); + Map<Expression, List<Expression>> lastQueryNonWritableViewSourceExpressionMap = + analysis.getLastQueryNonWritableViewSourceExpressionMap(); + if (lastQueryNonWriteViewExpressions != null) { + for (Expression expression : lastQueryNonWriteViewExpressions) { + Set<Expression> sourceTransformExpressions = Collections.singleton(expression); + FunctionExpression maxTimeAgg = + new FunctionExpression( + MAX_TIME, new LinkedHashMap<>(), Collections.singletonList(expression)); + FunctionExpression lastValueAgg = + new FunctionExpression( + LAST_VALUE, new LinkedHashMap<>(), Collections.singletonList(expression)); + analyzeExpression(analysis, expression); + analyzeExpression(analysis, maxTimeAgg); + analyzeExpression(analysis, lastValueAgg); + + Set<Expression> sources = + new LinkedHashSet<>(lastQueryNonWritableViewSourceExpressionMap.get(expression)); + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); + planBuilder = + planBuilder + .planRawDataSource( + sources, + resultTimeOrder, + analysis.getGlobalTimeFilter(), + analysis.isLastLevelUseWildcard()) + .planWhereAndSourceTransform( + null, sourceTransformExpressions, false, zoneId, resultTimeOrder) + .planAggregation( + new LinkedHashSet<>(Arrays.asList(maxTimeAgg, lastValueAgg)), + null, + analysis.getGroupByTimeParameter(), + analysis.getGroupByParameter(), + false, + AggregationStep.SINGLE, + resultTimeOrder); + + LastQueryTransformNode transformNode = + new LastQueryTransformNode( + context.getQueryId().genPlanNodeId(), + planBuilder.getRoot(), + expression.getViewPath().getFullPath(), + analysis.getType(expression).toString()); + sourceNodeList.add(transformNode); + } + } + + if (timeseriesOrdering != null) { + sourceNodeList.sort( + Comparator.comparing( + child -> { + String sortKey = ""; + if (child instanceof LastQueryScanNode) { + sortKey = ((LastQueryScanNode) child).getOutputSymbolForSort(); + } else if (child instanceof AlignedLastQueryScanNode) { + sortKey = ((AlignedLastQueryScanNode) child).getOutputSymbolForSort(); + } else if (child instanceof LastQueryTransformNode) { + sortKey = ((LastQueryTransformNode) child).getOutputSymbolForSort(); + } + return sortKey; + })); + if (timeseriesOrdering.equals(Ordering.DESC)) { + Collections.reverse(sourceNodeList); + } + } + this.root = new LastQueryNode( context.getQueryId().genPlanNodeId(), sourceNodeList, - globalTimeFilter, - timeseriesOrdering); + analysis.getGlobalTimeFilter(), + timeseriesOrdering, + lastQueryNonWriteViewExpressions != null); + ColumnHeaderConstant.lastQueryColumnHeaders.forEach( columnHeader -> context diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index d5180aacf46..68fd72004de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -92,6 +92,7 @@ import org.apache.iotdb.tsfile.utils.Pair; import org.apache.commons.lang3.StringUtils; +import java.time.ZoneId; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; @@ -99,6 +100,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.iotdb.db.queryengine.plan.statement.component.Ordering.ASC; import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME; /** @@ -127,9 +129,10 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte planBuilder = planBuilder .planLast( - analysis.getSourceExpressions(), - analysis.getGlobalTimeFilter(), - analysis.getTimeseriesOrderingForLastQuery()) + analysis, + analysis.getTimeseriesOrderingForLastQuery(), + queryStatement.getResultTimeOrder(), + queryStatement.getSelectComponent().getZoneId()) .planOffset(queryStatement.getRowOffset()) .planLimit(queryStatement.getRowLimit()); @@ -599,7 +602,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte && 0 != analysis.getDataPartitionInfo().getDataPartitionMap().size()) { PlanNode lastPlanNode = new LogicalPlanBuilder(analysis, context) - .planLast(analysis.getSourceExpressions(), analysis.getGlobalTimeFilter(), null) + .planLast(analysis, null, ASC, ZoneId.systemDefault()) .getRoot(); planBuilder = planBuilder.planSchemaQueryOrderByHeat(lastPlanNode); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 3585b301e10..628d8ba879a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -92,13 +92,13 @@ import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.Mul import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger; import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger; import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator; -import org.apache.iotdb.db.queryengine.execution.operator.process.last.AbstractUpdateLastCacheOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.last.AlignedUpdateLastCacheOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.last.AlignedUpdateViewPathLastCacheOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryCollectOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryMergeOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQuerySortOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryTransformOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil; import org.apache.iotdb.db.queryengine.execution.operator.process.last.UpdateLastCacheOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.last.UpdateViewPathLastCacheOperator; @@ -179,6 +179,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; @@ -2020,8 +2021,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanContext context) { PartialPath seriesPath = node.getSeriesPath().transformToPartialPath(); TimeValuePair timeValuePair = null; + context.dataNodeQueryContext.lock(); try { - context.dataNodeQueryContext.lock(); if (!context.dataNodeQueryContext.unCached(seriesPath)) { timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath); if (timeValuePair == null) { @@ -2279,16 +2280,14 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP @Override public Operator visitLastQuery(LastQueryNode node, LocalExecutionPlanContext context) { - context.setLastQueryTimeFilter(node.getTimeFilter()); context.setNeedUpdateLastCache(LastQueryUtil.needUpdateCache(node.getTimeFilter())); context.setNeedUpdateNullEntry(LastQueryUtil.needUpdateNullEntry(node.getTimeFilter())); - List<AbstractUpdateLastCacheOperator> operatorList = + List<Operator> operatorList = node.getChildren().stream() .map(child -> child.accept(this, context)) .filter(Objects::nonNull) - .map(o -> (AbstractUpdateLastCacheOperator) o) .collect(Collectors.toList()); List<Pair<TimeValuePair, Binary>> cachedLastValueAndPathList = @@ -2390,6 +2389,23 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP return new LastQueryCollectOperator(operatorContext, children); } + @Override + public Operator visitLastQueryTransform( + LastQueryTransformNode node, LocalExecutionPlanContext context) { + Operator operator = node.getChild().accept(this, context); + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + LastQueryCollectOperator.class.getSimpleName()); + + context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); + return new LastQueryTransformOperator( + node.getViewPath(), node.getDataType(), operatorContext, operator); + } + private Map<String, List<InputLocation>> makeLayout(PlanNode node) { Map<String, List<InputLocation>> outputMappings = new LinkedHashMap<>(); int tsBlockIndex = 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java index 12b083c9af9..b8920cffbc7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWin import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; @@ -128,19 +129,33 @@ public class SubPlanTypeExtractor { @Override public Void visitLastQuery(LastQueryNode node, Void context) { + if (node.isContainsLastTransformNode()) { + return visitPlan(node, context); + } return null; } @Override public Void visitLastQueryMerge(LastQueryMergeNode node, Void context) { + if (node.isContainsLastTransformNode()) { + return visitPlan(node, context); + } return null; } @Override public Void visitLastQueryCollect(LastQueryCollectNode node, Void context) { + if (node.isContainsLastTransformNode()) { + return visitPlan(node, context); + } return null; } + @Override + public Void visitLastQueryTransform(LastQueryTransformNode node, Void context) { + return visitPlan(node, context); + } + // end region PlanNode of last read private void updateTypeProviderByAggregationDescriptor( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java index 2f14ffcd112..087ce5338eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java @@ -196,7 +196,7 @@ public class DistributionPlanner { List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan); // Only execute this step for READ operation if (context.getQueryType() == QueryType.READ) { - SetSinkForRootInstance(subPlan, fragmentInstances); + setSinkForRootInstance(subPlan, fragmentInstances); } return new DistributedQueryPlan( logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances); @@ -213,7 +213,7 @@ public class DistributionPlanner { } // TODO: (xingtanzjr) Maybe we should handle ResultNode in LogicalPlanner ? - public void SetSinkForRootInstance(SubPlan subPlan, List<FragmentInstance> instances) { + public void setSinkForRootInstance(SubPlan subPlan, List<FragmentInstance> instances) { FragmentInstance rootInstance = null; for (FragmentInstance instance : instances) { if (instance.getFragment().getId().equals(subPlan.getPlanFragment().getId())) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java index b031ca9947e..6f2cce9a83f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java @@ -50,6 +50,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; @@ -229,6 +230,11 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { return processMultiChildNode(node, context); } + @Override + public PlanNode visitLastQueryTransform(LastQueryTransformNode node, NodeGroupContext context) { + return processOneChildNode(node, context); + } + @Override public PlanNode visitTimeJoin(TimeJoinNode node, NodeGroupContext context) { return processMultiChildNode(node, context); @@ -282,11 +288,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone(); List<PlanNode> visitedChildren = new ArrayList<>(); - node.getChildren() - .forEach( - child -> { - visitedChildren.add(visit(child, context)); - }); + node.getChildren().forEach(child -> visitedChildren.add(visit(child, context))); TRegionReplicaSet dataRegion; NodeDistributionType distributionType; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java index 2bf79e2a113..ff24932bca6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNo import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastSeriesSourceNode; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.filter.basic.Filter; @@ -183,7 +184,9 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { }); if (analysis.getStatement() instanceof QueryStatement - || analysis.getStatement() instanceof ShowQueriesStatement) { + || analysis.getStatement() instanceof ShowQueriesStatement + || (analysis.getStatement() instanceof ShowTimeSeriesStatement + && ((ShowTimeSeriesStatement) analysis.getStatement()).isOrderByHeat())) { fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider()); } instanceMap.putIfAbsent(fragment.getId(), fragmentInstance); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java index 5fd9c4972a6..91e818cb43c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java @@ -480,7 +480,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte LastQueryScanNode node, DistributionPlanContext context) { LastQueryNode mergeNode = new LastQueryNode( - context.queryContext.getQueryId().genPlanNodeId(), node.getPartitionTimeFilter(), null); + context.queryContext.getQueryId().genPlanNodeId(), + node.getPartitionTimeFilter(), + null, + false); return processRawSeriesScan(node, context, mergeNode); } @@ -489,7 +492,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte AlignedLastQueryScanNode node, DistributionPlanContext context) { LastQueryNode mergeNode = new LastQueryNode( - context.queryContext.getQueryId().genPlanNodeId(), node.getPartitionTimeFilter(), null); + context.queryContext.getQueryId().genPlanNodeId(), + node.getPartitionTimeFilter(), + null, + false); return processRawSeriesScan(node, context, mergeNode); } @@ -678,9 +684,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte // if the series is from multi regions or order by clause only refer to timeseries, use // LastQueryMergeNode if (context.oneSeriesInMultiRegion || node.needOrderByTimeseries()) { - return new LastQueryMergeNode(id, node.getTimeseriesOrdering()); + return new LastQueryMergeNode( + id, node.getTimeseriesOrdering(), node.isContainsLastTransformNode()); } - return new LastQueryCollectNode(id); + return new LastQueryCollectNode(id, node.isContainsLastTransformNode()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java index 38741d91e0e..0bceb53eb5c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java @@ -60,10 +60,7 @@ public class SubPlan { public List<PlanFragment> getPlanFragmentList() { List<PlanFragment> result = new ArrayList<>(); result.add(this.planFragment); - this.children.forEach( - child -> { - result.addAll(child.getPlanFragmentList()); - }); + this.children.forEach(child -> result.addAll(child.getPlanFragmentList())); return result; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index ec63414d2f7..820a41352aa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -45,6 +45,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; @@ -423,6 +424,14 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter return render(node, boxValue, context); } + @Override + public List<String> visitLastQueryTransform(LastQueryTransformNode node, GraphContext context) { + List<String> boxValue = new ArrayList<>(); + boxValue.add(String.format("LastQueryTransform-%s", node.getPlanNodeId().getId())); + boxValue.add(String.format("ViewPath: %s", node.getViewPath())); + return render(node, boxValue, context); + } + @Override public List<String> visitHorizontallyConcat(HorizontallyConcatNode node, GraphContext context) { List<String> boxValue = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 2364aef9d7e..5c2a9b56605 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -78,6 +78,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; @@ -180,7 +181,8 @@ public enum PlanNodeType { LOGICAL_VIEW_SCHEMA_SCAN((short) 77), ALTER_LOGICAL_VIEW((short) 78), PIPE_ENRICHED_INSERT((short) 79), - ; + FORECAST((short) 80), + LAST_QUERY_TRANSFORM((short) 81); public static final int BYTES = Short.BYTES; @@ -385,6 +387,13 @@ public enum PlanNodeType { return AlterLogicalViewNode.deserialize(buffer); case 79: return PipeEnrichedInsertNode.deserialize(buffer); +<<<<<<< HEAD +======= + case 80: + return ForecastNode.deserialize(buffer); + case 81: + return LastQueryTransformNode.deserialize(buffer); +>>>>>>> 5f5a1d4fb7 (Enhance last query, support non single base series (#11120)) default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index e6eb22fc5a2..965e83beb15 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -78,6 +78,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; @@ -234,6 +235,10 @@ public abstract class PlanVisitor<R, C> { return visitMultiChildProcess(node, context); } + public R visitLastQueryTransform(LastQueryTransformNode node, C context) { + return visitSingleChildProcess(node, context); + } + public R visitMergeSort(MergeSortNode node, C context) { return visitMultiChildProcess(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java index e38b851e8dd..fbba5c6d336 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java @@ -34,8 +34,11 @@ import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.Last public class LastQueryCollectNode extends MultiChildProcessNode { - public LastQueryCollectNode(PlanNodeId id) { + private boolean containsLastTransformNode; + + public LastQueryCollectNode(PlanNodeId id, boolean containsLastTransformNode) { super(id); + this.containsLastTransformNode = containsLastTransformNode; } public LastQueryCollectNode(PlanNodeId id, List<PlanNode> children) { @@ -54,7 +57,7 @@ public class LastQueryCollectNode extends MultiChildProcessNode { @Override public PlanNode clone() { - return new LastQueryCollectNode(getPlanNodeId()); + return new LastQueryCollectNode(getPlanNodeId(), containsLastTransformNode); } @Override @@ -99,11 +102,19 @@ public class LastQueryCollectNode extends MultiChildProcessNode { public static LastQueryCollectNode deserialize(ByteBuffer byteBuffer) { PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new LastQueryCollectNode(planNodeId); + return new LastQueryCollectNode(planNodeId, false); } @Override public void setChildren(List<PlanNode> children) { this.children = children; } + + public boolean isContainsLastTransformNode() { + return this.containsLastTransformNode; + } + + public void setContainsLastQueryTransformNode() { + this.containsLastTransformNode = true; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java index 3c240f48903..7bdc4b245b0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java @@ -40,14 +40,14 @@ public class LastQueryMergeNode extends MultiChildProcessNode { // The size of this list is 2 and the first SortItem in this list has higher priority. private final Ordering timeseriesOrdering; - public LastQueryMergeNode(PlanNodeId id, Ordering timeseriesOrdering) { - super(id); - this.timeseriesOrdering = timeseriesOrdering; - } + // if children contains LastTransformNode + private boolean containsLastTransformNode; - public LastQueryMergeNode(PlanNodeId id, List<PlanNode> children, Ordering timeseriesOrdering) { - super(id, children); + public LastQueryMergeNode( + PlanNodeId id, Ordering timeseriesOrdering, boolean containsLastTransformNode) { + super(id); this.timeseriesOrdering = timeseriesOrdering; + this.containsLastTransformNode = containsLastTransformNode; } @Override @@ -62,7 +62,7 @@ public class LastQueryMergeNode extends MultiChildProcessNode { @Override public PlanNode clone() { - return new LastQueryMergeNode(getPlanNodeId(), timeseriesOrdering); + return new LastQueryMergeNode(getPlanNodeId(), timeseriesOrdering, containsLastTransformNode); } @Override @@ -135,7 +135,7 @@ public class LastQueryMergeNode extends MultiChildProcessNode { timeseriesOrdering = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)]; } PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new LastQueryMergeNode(planNodeId, timeseriesOrdering); + return new LastQueryMergeNode(planNodeId, timeseriesOrdering, false); } @Override @@ -146,4 +146,12 @@ public class LastQueryMergeNode extends MultiChildProcessNode { public Ordering getTimeseriesOrdering() { return timeseriesOrdering; } + + public boolean isContainsLastTransformNode() { + return this.containsLastTransformNode; + } + + public void setContainsNonWritableView() { + this.containsLastTransformNode = true; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java index a47810a0076..0d6ade61aad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java @@ -46,20 +46,30 @@ public class LastQueryNode extends MultiChildProcessNode { // which is set to null if there is no need to sort private Ordering timeseriesOrdering; - public LastQueryNode(PlanNodeId id, Filter timeFilter, @Nullable Ordering timeseriesOrdering) { + // if children contains LastTransformNode, this variable is only used in distribute plan + private boolean containsLastTransformNode; + + public LastQueryNode( + PlanNodeId id, + Filter timeFilter, + @Nullable Ordering timeseriesOrdering, + boolean containsLastTransformNode) { super(id); this.timeFilter = timeFilter; this.timeseriesOrdering = timeseriesOrdering; + this.containsLastTransformNode = containsLastTransformNode; } public LastQueryNode( PlanNodeId id, List<PlanNode> children, Filter timeFilter, - @Nullable Ordering timeseriesOrdering) { + @Nullable Ordering timeseriesOrdering, + boolean containsLastTransformNode) { super(id, children); this.timeFilter = timeFilter; this.timeseriesOrdering = timeseriesOrdering; + this.containsLastTransformNode = containsLastTransformNode; } @Override @@ -74,7 +84,8 @@ public class LastQueryNode extends MultiChildProcessNode { @Override public PlanNode clone() { - return new LastQueryNode(getPlanNodeId(), timeFilter, timeseriesOrdering); + return new LastQueryNode( + getPlanNodeId(), timeFilter, timeseriesOrdering, containsLastTransformNode); } @Override @@ -163,7 +174,7 @@ public class LastQueryNode extends MultiChildProcessNode { timeseriesOrdering = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)]; } PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new LastQueryNode(planNodeId, timeFilter, timeseriesOrdering); + return new LastQueryNode(planNodeId, timeFilter, timeseriesOrdering, false); } @Override @@ -184,6 +195,14 @@ public class LastQueryNode extends MultiChildProcessNode { this.timeseriesOrdering = timeseriesOrdering; } + public boolean isContainsLastTransformNode() { + return this.containsLastTransformNode; + } + + public void setContainsLastTransformNode() { + this.containsLastTransformNode = true; + } + public boolean needOrderByTimeseries() { return timeseriesOrdering != null; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java similarity index 53% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java index e38b851e8dd..fb2133f7dc2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java @@ -16,13 +16,15 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; import java.io.IOException; @@ -32,34 +34,27 @@ import java.util.Objects; import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; -public class LastQueryCollectNode extends MultiChildProcessNode { +public class LastQueryTransformNode extends SingleChildProcessNode { - public LastQueryCollectNode(PlanNodeId id) { - super(id); - } + private final String viewPath; - public LastQueryCollectNode(PlanNodeId id, List<PlanNode> children) { - super(id, children); - } + private final String dataType; - @Override - public List<PlanNode> getChildren() { - return children; + public LastQueryTransformNode(PlanNodeId id, String viewPath, String dataType) { + super(id); + this.viewPath = viewPath; + this.dataType = dataType; } - @Override - public void addChild(PlanNode child) { - children.add(child); + public LastQueryTransformNode(PlanNodeId id, PlanNode aggNode, String viewPath, String dataType) { + super(id, aggNode); + this.viewPath = viewPath; + this.dataType = dataType; } @Override public PlanNode clone() { - return new LastQueryCollectNode(getPlanNodeId()); - } - - @Override - public int allowedChildCount() { - return CHILD_COUNT_NO_LIMIT; + return new LastQueryTransformNode(getPlanNodeId(), viewPath, dataType); } @Override @@ -68,42 +63,60 @@ public class LastQueryCollectNode extends MultiChildProcessNode { } @Override - public String toString() { - return String.format("LastQueryCollectNode-%s", this.getPlanNodeId()); + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.LAST_QUERY_TRANSFORM.serialize(byteBuffer); + ReadWriteIOUtils.write(viewPath, byteBuffer); + ReadWriteIOUtils.write(dataType, byteBuffer); } @Override - public boolean equals(Object o) { - return super.equals(o); + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.LAST_QUERY_TRANSFORM.serialize(stream); + ReadWriteIOUtils.write(viewPath, stream); + ReadWriteIOUtils.write(dataType, stream); } - @Override - public int hashCode() { - return Objects.hash(super.hashCode()); + public static LastQueryTransformNode deserialize(ByteBuffer byteBuffer) { + String viewPath = ReadWriteIOUtils.readString(byteBuffer); + String dataType = ReadWriteIOUtils.readString(byteBuffer); + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new LastQueryTransformNode(planNodeId, viewPath, dataType); } @Override public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitLastQueryCollect(this, context); + return visitor.visitLastQueryTransform(this, context); } @Override - protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.LAST_QUERY_COLLECT.serialize(byteBuffer); + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + LastQueryTransformNode that = (LastQueryTransformNode) o; + return viewPath.equals(that.viewPath) && dataType.equals(that.dataType); } @Override - protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.LAST_QUERY_COLLECT.serialize(stream); + public int hashCode() { + return Objects.hash(super.hashCode(), viewPath, dataType); } - public static LastQueryCollectNode deserialize(ByteBuffer byteBuffer) { - PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new LastQueryCollectNode(planNodeId); + public String getViewPath() { + return this.viewPath; } - @Override - public void setChildren(List<PlanNode> children) { - this.children = children; + public String getDataType() { + return this.dataType; + } + + public String getOutputSymbolForSort() { + return viewPath; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/factory/MemMNodeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/factory/MemMNodeFactory.java index 62f700c398c..a48d21d9230 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/factory/MemMNodeFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/factory/MemMNodeFactory.java @@ -30,7 +30,9 @@ import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.impl.B import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.impl.DatabaseDeviceMNode; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.impl.DatabaseMNode; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.impl.DeviceMNode; +import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.impl.LogicalViewMNode; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.impl.MeasurementMNode; +import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.info.LogicalViewInfo; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; @MNodeFactory @@ -76,6 +78,6 @@ public class MemMNodeFactory implements IMNodeFactory<IMemMNode> { @Override public IMeasurementMNode<IMemMNode> createLogicalViewMNode( IDeviceMNode<IMemMNode> parent, String name, IMeasurementInfo measurementInfo) { - throw new UnsupportedOperationException("View is not supported."); + return new LogicalViewMNode(parent, name, ((LogicalViewInfo) measurementInfo).getExpression()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/impl/LogicalViewMNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/impl/LogicalViewMNode.java new file mode 100644 index 00000000000..4b7382f300c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/impl/LogicalViewMNode.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.impl; + +import org.apache.iotdb.commons.schema.node.common.AbstractMeasurementMNode; +import org.apache.iotdb.commons.schema.node.info.IMeasurementInfo; +import org.apache.iotdb.commons.schema.node.role.IDeviceMNode; +import org.apache.iotdb.commons.schema.node.utils.IMNodeContainer; +import org.apache.iotdb.commons.schema.view.LogicalViewSchema; +import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression; +import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.IMemMNode; +import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.basic.BasicMNode; +import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.container.MemMNodeContainer; +import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.info.LogicalViewInfo; + +public class LogicalViewMNode extends AbstractMeasurementMNode<IMemMNode, BasicMNode> + implements IMemMNode { + + public LogicalViewMNode( + IDeviceMNode<IMemMNode> parent, String name, ViewExpression viewExpression) { + super( + new BasicMNode(parent == null ? null : parent.getAsMNode(), name), + new LogicalViewInfo(new LogicalViewSchema(name, viewExpression))); + } + + @Override + public IMNodeContainer<IMemMNode> getChildren() { + return MemMNodeContainer.emptyMNodeContainer(); + } + + @Override + public IMemMNode getAsMNode() { + return this; + } + + public void setExpression(ViewExpression expression) { + IMeasurementInfo measurementInfo = this.getMeasurementInfo(); + if (measurementInfo instanceof LogicalViewInfo) { + ((LogicalViewInfo) measurementInfo).setExpression(expression); + } + } + + @Override + public final boolean isLogicalView() { + return true; + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java index 539d90bc84d..0231def964d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java @@ -48,7 +48,6 @@ import org.apache.iotdb.db.queryengine.execution.operator.process.fill.IFill; import org.apache.iotdb.db.queryengine.execution.operator.process.fill.linear.LinearFill; import org.apache.iotdb.db.queryengine.execution.operator.process.join.HorizontallyConcatOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.join.RowBasedTimeJoinOperator; -import org.apache.iotdb.db.queryengine.execution.operator.process.last.AbstractUpdateLastCacheOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryCollectOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryMergeOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryOperator; @@ -346,7 +345,7 @@ public class OperatorMemoryTest { public void lastQueryOperatorTest() { TsBlockBuilder builder = Mockito.mock(TsBlockBuilder.class); Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(1024L); - List<AbstractUpdateLastCacheOperator> children = new ArrayList<>(4); + List<Operator> children = new ArrayList<>(4); long expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; for (int i = 0; i < 4; i++) { UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class); @@ -376,7 +375,7 @@ public class OperatorMemoryTest { TsBlock tsBlock = Mockito.mock(TsBlock.class); Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(16 * 1024L); Mockito.when(tsBlock.getPositionCount()).thenReturn(16); - List<AbstractUpdateLastCacheOperator> children = new ArrayList<>(4); + List<Operator> children = new ArrayList<>(4); for (int i = 0; i < 4; i++) { UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java index b36feab3cff..87ffe9baca7 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java @@ -143,7 +143,7 @@ public class QueryLogicalPlanUtil { LastQueryNode lastQueryNode = new LastQueryNode( - queryId.genPlanNodeId(), sourceNodeList, TimeFilter.gt(100), Ordering.ASC); + queryId.genPlanNodeId(), sourceNodeList, TimeFilter.gt(100), Ordering.ASC, false); querySQLs.add(sql); sqlToPlanMap.put(sql, lastQueryNode); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/LastQueryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/LastQueryTest.java index db875ef9b38..13d1586d475 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/LastQueryTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/LastQueryTest.java @@ -209,7 +209,7 @@ public class LastQueryTest { } PlanNode root = - new LastQueryNode(context.getQueryId().genPlanNodeId(), sourceNodeList, null, null); + new LastQueryNode(context.getQueryId().genPlanNodeId(), sourceNodeList, null, null, false); return new LogicalQueryPlan(context, root); } }
