This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new f0ce39615e3 [To rel/1.2] Cherry-pick last query feature for non-base
view
f0ce39615e3 is described below
commit f0ce39615e336537c197beabc2dabdb6c47e528f
Author: Beyyes <[email protected]>
AuthorDate: Thu Sep 28 10:01:20 2023 +0800
[To rel/1.2] Cherry-pick last query feature for non-base view
---
.../operator/process/last/LastQueryOperator.java | 6 +-
.../process/last/LastQuerySortOperator.java | 4 +-
.../process/last/LastQueryTransformOperator.java | 120 ++++++++++++++++++
.../db/queryengine/plan/analyze/Analysis.java | 24 +++-
.../queryengine/plan/analyze/AnalyzeVisitor.java | 29 +++--
.../schema/lastcache/ILastCacheContainer.java | 2 +-
.../plan/planner/LogicalPlanBuilder.java | 138 ++++++++++++++++-----
.../plan/planner/LogicalPlanVisitor.java | 10 +-
.../plan/planner/OperatorTreeGenerator.java | 26 +++-
.../plan/planner/SubPlanTypeExtractor.java | 15 +++
.../distribution/DistributionPlanContext.java | 5 +-
.../planner/distribution/DistributionPlanner.java | 10 +-
.../planner/distribution/ExchangeNodeAdder.java | 82 ++++++------
.../planner/distribution/NodeGroupContext.java | 18 ++-
.../SimpleFragmentParallelPlanner.java | 5 +-
.../plan/planner/distribution/SourceRewriter.java | 91 +++++++-------
.../db/queryengine/plan/planner/plan/SubPlan.java | 5 +-
.../plan/planner/plan/node/PlanGraphPrinter.java | 9 ++
.../plan/planner/plan/node/PlanNodeType.java | 6 +-
.../plan/planner/plan/node/PlanVisitor.java | 5 +
.../node/process/last/LastQueryCollectNode.java | 17 ++-
.../plan/node/process/last/LastQueryMergeNode.java | 24 ++--
.../plan/node/process/last/LastQueryNode.java | 27 +++-
...ollectNode.java => LastQueryTransformNode.java} | 92 ++++++++------
.../planner/plan/node/sink/IdentitySinkNode.java | 5 +
.../plan/node/source/AlignedSeriesScanNode.java | 4 +-
.../execution/operator/OperatorMemoryTest.java | 5 +-
.../plan/plan/QueryLogicalPlanUtil.java | 2 +-
.../plan/plan/distribution/LastQueryTest.java | 2 +-
29 files changed, 569 insertions(+), 219 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java
index 1408207c828..b1150507b55 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java
@@ -44,7 +44,7 @@ public class LastQueryOperator implements ProcessOperator {
private final OperatorContext operatorContext;
- private final List<AbstractUpdateLastCacheOperator> children;
+ private final List<Operator> children;
private final int inputOperatorsCount;
@@ -53,9 +53,7 @@ public class LastQueryOperator implements ProcessOperator {
private TsBlockBuilder tsBlockBuilder;
public LastQueryOperator(
- OperatorContext operatorContext,
- List<AbstractUpdateLastCacheOperator> children,
- TsBlockBuilder builder) {
+ OperatorContext operatorContext, List<Operator> children, TsBlockBuilder
builder) {
this.operatorContext = operatorContext;
this.children = children;
this.inputOperatorsCount = children.size();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java
index f08aa156b21..aafdc71df9d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java
@@ -55,7 +55,7 @@ public class LastQuerySortOperator implements ProcessOperator
{
private int cachedTsBlockRowIndex;
// we must make sure that Operator in children has already been sorted
- private final List<AbstractUpdateLastCacheOperator> children;
+ private final List<Operator> children;
private final OperatorContext operatorContext;
@@ -75,7 +75,7 @@ public class LastQuerySortOperator implements ProcessOperator
{
public LastQuerySortOperator(
OperatorContext operatorContext,
TsBlock cachedTsBlock,
- List<AbstractUpdateLastCacheOperator> children,
+ List<Operator> children,
Comparator<Binary> timeSeriesComparator) {
this.cachedTsBlock = cachedTsBlock;
this.cachedTsBlockSize = cachedTsBlock.getPositionCount();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryTransformOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryTransformOperator.java
new file mode 100644
index 00000000000..0b87821bb1d
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryTransformOperator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.last;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class LastQueryTransformOperator implements ProcessOperator {
+
+ private String viewPath;
+
+ private String dataType;
+
+ private final OperatorContext operatorContext;
+
+ // the child of LastQueryTransformOperator will always be AggOperator
+ private final Operator child;
+
+ private TsBlockBuilder tsBlockBuilder;
+
+ public LastQueryTransformOperator(
+ String viewPath, String dataType, OperatorContext operatorContext,
Operator child) {
+ this.viewPath = viewPath;
+ this.dataType = dataType;
+ this.operatorContext = operatorContext;
+ this.child = child;
+ this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(1);
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return this.operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ return child.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() throws Exception {
+ if (!tsBlockBuilder.isFull()) {
+ TsBlock tsBlock = child.nextWithTimer();
+ if (tsBlock == null) {
+ return null;
+ } else if (!tsBlock.isEmpty()) {
+ if (tsBlock.getColumn(1).isNull(0)) {
+ return null;
+ }
+ LastQueryUtil.appendLastValue(
+ tsBlockBuilder,
+ tsBlock.getColumn(0).getLong(0),
+ viewPath,
+ tsBlock.getColumn(1).getTsPrimitiveType(0).getStringValue(),
+ dataType);
+ }
+ } else {
+ child.close();
+ }
+
+ TsBlock res = tsBlockBuilder.build();
+ tsBlockBuilder.reset();
+ return res;
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return child.hasNext();
+ }
+
+ @Override
+ public boolean isFinished() throws Exception {
+ return !hasNextWithTimer();
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return Math.max(child.calculateMaxPeekMemory(),
child.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return child.calculateMaxReturnSize();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return child.calculateRetainedSizeAfterCallingNext();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (child != null) {
+ child.close();
+ }
+ tsBlockBuilder = null;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index caeb12e3295..86528182392 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -209,6 +209,11 @@ public class Analysis {
// timeseries, otherwise it will be null
private Ordering timeseriesOrderingForLastQuery = null;
+ // Key: non-writable view expression, Value: corresponding source expressions
+ private Map<Expression, List<Expression>>
lastQueryNonWritableViewSourceExpressionMap;
+
+ private Set<Expression> lastQueryBaseExpressions;
+
// header of result dataset
private DatasetHeader respDatasetHeader;
@@ -344,7 +349,7 @@ public class Analysis {
return null;
}
TSDataType type = expressionTypes.get(NodeRef.of(expression));
- checkArgument(type != null, "Expression not analyzed: %s", expression);
+ checkArgument(type != null, "Expression is not analyzed: %s", expression);
return type;
}
@@ -726,6 +731,23 @@ public class Analysis {
this.timeseriesOrderingForLastQuery = timeseriesOrderingForLastQuery;
}
+ public Set<Expression> getLastQueryBaseExpressions() {
+ return this.lastQueryBaseExpressions;
+ }
+
+ public void setLastQueryBaseExpressions(Set<Expression>
lastQueryBaseExpressions) {
+ this.lastQueryBaseExpressions = lastQueryBaseExpressions;
+ }
+
+ public Map<Expression, List<Expression>>
getLastQueryNonWritableViewSourceExpressionMap() {
+ return this.lastQueryNonWritableViewSourceExpressionMap;
+ }
+
+ public void setLastQueryNonWritableViewSourceExpressionMap(
+ Map<Expression, List<Expression>>
lastQueryNonWritableViewSourceExpressionMap) {
+ this.lastQueryNonWritableViewSourceExpressionMap =
lastQueryNonWritableViewSourceExpressionMap;
+ }
+
public Map<String, List<String>> getOutputDeviceToQueriedDevicesMap() {
return outputDeviceToQueriedDevicesMap;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 051b55db58f..87e9a6b30f0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -435,20 +435,33 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
private void analyzeLastSource(
Analysis analysis, List<Expression> selectExpressions, ISchemaTree
schemaTree) {
- Set<Expression> sourceExpressions;
-
- sourceExpressions = new LinkedHashSet<>();
+ Set<Expression> sourceExpressions = new LinkedHashSet<>();
+ Set<Expression> lastQueryBaseExpressions = new LinkedHashSet<>();
+ Map<Expression, List<Expression>>
lastQueryNonWritableViewSourceExpressionMap = null;
for (Expression selectExpression : selectExpressions) {
- for (Expression sourceExpression :
bindSchemaForExpression(selectExpression, schemaTree)) {
- if (!(sourceExpression instanceof TimeSeriesOperand)) {
- throw new SemanticException(
- "Views with functions and expressions cannot be used in LAST
query");
+ for (Expression lastQuerySourceExpression :
+ bindSchemaForExpression(selectExpression, schemaTree)) {
+ if (lastQuerySourceExpression instanceof TimeSeriesOperand) {
+ lastQueryBaseExpressions.add(lastQuerySourceExpression);
+ sourceExpressions.add(lastQuerySourceExpression);
+ } else {
+ if (lastQueryNonWritableViewSourceExpressionMap == null) {
+ lastQueryNonWritableViewSourceExpressionMap = new HashMap<>();
+ }
+ List<Expression> sourceExpressionsOfNonWritableView =
+ searchSourceExpressions(lastQuerySourceExpression);
+ lastQueryNonWritableViewSourceExpressionMap.put(
+ lastQuerySourceExpression, sourceExpressionsOfNonWritableView);
+ sourceExpressions.addAll(sourceExpressionsOfNonWritableView);
}
- sourceExpressions.add(sourceExpression);
}
}
+
analysis.setSourceExpressions(sourceExpressions);
+ analysis.setLastQueryBaseExpressions(lastQueryBaseExpressions);
+ analysis.setLastQueryNonWritableViewSourceExpressionMap(
+ lastQueryNonWritableViewSourceExpressionMap);
}
private void updateSchemaTreeByViews(Analysis analysis, ISchemaTree
originSchemaTree) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/ILastCacheContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/ILastCacheContainer.java
index 20f5eb711db..3ada99d8ebf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/ILastCacheContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/ILastCacheContainer.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.tsfile.read.TimeValuePair;
/** this interface declares the operations of LastCache data */
public interface ILastCacheContainer {
- // get lastCache of monad timseries
+ // get lastCache of monad timeseries
TimeValuePair getCachedLast();
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index fbd80dbe22f..ee7472df6fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -71,6 +71,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
@@ -106,11 +107,14 @@ import org.apache.commons.lang.Validate;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -121,7 +125,10 @@ import static
com.google.common.base.Preconditions.checkArgument;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.DEVICE;
import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpression;
import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_VALUE;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.MAX_TIME;
public class LogicalPlanBuilder {
@@ -214,16 +221,13 @@ public class LogicalPlanBuilder {
}
public LogicalPlanBuilder planLast(
- Set<Expression> sourceExpressions, Filter globalTimeFilter, Ordering
timeseriesOrdering) {
- List<PlanNode> sourceNodeList = new ArrayList<>();
+ Analysis analysis, Ordering timeseriesOrdering, ZoneId zoneId) {
+ Set<String> deviceAlignedSet = new HashSet<>();
+ Set<String> deviceExistViewSet = new HashSet<>();
+ // <Device, <Measurement, Expression>>
+ Map<String, Map<String, Expression>> outputPathToSourceExpressionMap = new
LinkedHashMap<>();
- Map<String, Boolean> deviceAlignedMap = new HashMap<>();
- Map<String, Boolean> deviceExistViewMap = new HashMap<>();
- Map<String, Map<String, Expression>> outputPathToSourceExpressionMap =
- timeseriesOrdering != null
- ? new TreeMap<>(timeseriesOrdering.getStringComparator())
- : new LinkedHashMap<>();
- for (Expression sourceExpression : sourceExpressions) {
+ for (Expression sourceExpression : analysis.getLastQueryBaseExpressions())
{
MeasurementPath outputPath =
(MeasurementPath)
(sourceExpression.isViewExpression()
@@ -238,46 +242,47 @@ public class LogicalPlanBuilder {
? new TreeMap<>(timeseriesOrdering.getStringComparator())
: new LinkedHashMap<>())
.put(outputPath.getMeasurement(), sourceExpression);
- if (!deviceAlignedMap.containsKey(outputDevice)) {
- deviceAlignedMap.put(outputDevice, outputPath.isUnderAlignedEntity());
+ if (outputPath.isUnderAlignedEntity()) {
+ deviceAlignedSet.add(outputDevice);
+ }
+ if (sourceExpression.isViewExpression()) {
+ deviceExistViewSet.add(outputDevice);
}
- deviceExistViewMap.put(
- outputDevice,
- deviceExistViewMap.getOrDefault(outputDevice, false)
- || sourceExpression.isViewExpression());
}
+ List<PlanNode> sourceNodeList = new ArrayList<>();
for (Map.Entry<String, Map<String, Expression>>
deviceMeasurementExpressionEntry :
outputPathToSourceExpressionMap.entrySet()) {
String outputDevice = deviceMeasurementExpressionEntry.getKey();
- if (deviceExistViewMap.get(outputDevice)) {
+ Map<String, Expression> measurementToExpressionsOfDevice =
+ deviceMeasurementExpressionEntry.getValue();
+ if (deviceExistViewSet.contains(outputDevice)) {
// exist view
- for (Expression sourceExpression :
deviceMeasurementExpressionEntry.getValue().values()) {
+ for (Expression sourceExpression :
measurementToExpressionsOfDevice.values()) {
MeasurementPath selectedPath =
(MeasurementPath) ((TimeSeriesOperand)
sourceExpression).getPath();
+ String outputViewPath =
+ sourceExpression.isViewExpression()
+ ? sourceExpression.getViewPath().getFullPath()
+ : null;
+
if (selectedPath.isUnderAlignedEntity()) { // aligned series
sourceNodeList.add(
new AlignedLastQueryScanNode(
context.getQueryId().genPlanNodeId(),
new AlignedPath(selectedPath),
- sourceExpression.isViewExpression()
- ? sourceExpression.getViewPath().getFullPath()
- : null));
+ outputViewPath));
} else { // non-aligned series
sourceNodeList.add(
new LastQueryScanNode(
- context.getQueryId().genPlanNodeId(),
- selectedPath,
- sourceExpression.isViewExpression()
- ? sourceExpression.getViewPath().getFullPath()
- : null));
+ context.getQueryId().genPlanNodeId(), selectedPath,
outputViewPath));
}
}
} else {
- if (deviceAlignedMap.get(outputDevice)) {
+ if (deviceAlignedSet.contains(outputDevice)) {
// aligned series
List<MeasurementPath> measurementPaths =
- deviceMeasurementExpressionEntry.getValue().values().stream()
+ measurementToExpressionsOfDevice.values().stream()
.map(expression -> (MeasurementPath) ((TimeSeriesOperand)
expression).getPath())
.collect(Collectors.toList());
AlignedPath alignedPath = new
AlignedPath(measurementPaths.get(0).getDevicePath());
@@ -289,7 +294,7 @@ public class LogicalPlanBuilder {
context.getQueryId().genPlanNodeId(), alignedPath, null));
} else {
// non-aligned series
- for (Expression sourceExpression :
deviceMeasurementExpressionEntry.getValue().values()) {
+ for (Expression sourceExpression :
measurementToExpressionsOfDevice.values()) {
MeasurementPath selectedPath =
(MeasurementPath) ((TimeSeriesOperand)
sourceExpression).getPath();
sourceNodeList.add(
@@ -299,12 +304,35 @@ public class LogicalPlanBuilder {
}
}
+ processLastQueryTransformNode(analysis, sourceNodeList, zoneId);
+
+ if (timeseriesOrdering != null) {
+ sourceNodeList.sort(
+ Comparator.comparing(
+ child -> {
+ String sortKey = "";
+ if (child instanceof LastQueryScanNode) {
+ sortKey = ((LastQueryScanNode)
child).getOutputSymbolForSort();
+ } else if (child instanceof AlignedLastQueryScanNode) {
+ sortKey = ((AlignedLastQueryScanNode)
child).getOutputSymbolForSort();
+ } else if (child instanceof LastQueryTransformNode) {
+ sortKey = ((LastQueryTransformNode)
child).getOutputSymbolForSort();
+ }
+ return sortKey;
+ }));
+ if (timeseriesOrdering.equals(Ordering.DESC)) {
+ Collections.reverse(sourceNodeList);
+ }
+ }
+
this.root =
new LastQueryNode(
context.getQueryId().genPlanNodeId(),
sourceNodeList,
- globalTimeFilter,
- timeseriesOrdering);
+ analysis.getGlobalTimeFilter(),
+ timeseriesOrdering,
+ analysis.getLastQueryNonWritableViewSourceExpressionMap() != null);
+
ColumnHeaderConstant.lastQueryColumnHeaders.forEach(
columnHeader ->
context
@@ -314,6 +342,56 @@ public class LogicalPlanBuilder {
return this;
}
+ private void processLastQueryTransformNode(
+ Analysis analysis, List<PlanNode> sourceNodeList, ZoneId zoneId) {
+ if (analysis.getLastQueryNonWritableViewSourceExpressionMap() == null) {
+ return;
+ }
+
+ for (Map.Entry<Expression, List<Expression>> entry :
+ analysis.getLastQueryNonWritableViewSourceExpressionMap().entrySet()) {
+ Expression expression = entry.getKey();
+ Set<Expression> sourceExpressions = new
LinkedHashSet<>(entry.getValue());
+ Set<Expression> sourceTransformExpressions =
Collections.singleton(expression);
+ FunctionExpression maxTimeAgg =
+ new FunctionExpression(
+ MAX_TIME, new LinkedHashMap<>(),
Collections.singletonList(expression));
+ FunctionExpression lastValueAgg =
+ new FunctionExpression(
+ LAST_VALUE, new LinkedHashMap<>(),
Collections.singletonList(expression));
+ analyzeExpression(analysis, expression);
+ analyzeExpression(analysis, maxTimeAgg);
+ analyzeExpression(analysis, lastValueAgg);
+
+ LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis,
context);
+ planBuilder =
+ planBuilder
+ .planRawDataSource(
+ sourceExpressions,
+ Ordering.DESC,
+ analysis.getGlobalTimeFilter(),
+ analysis.isLastLevelUseWildcard())
+ .planWhereAndSourceTransform(
+ null, sourceTransformExpressions, false, zoneId,
Ordering.DESC)
+ .planAggregation(
+ new LinkedHashSet<>(Arrays.asList(maxTimeAgg, lastValueAgg)),
+ null,
+ analysis.getGroupByTimeParameter(),
+ analysis.getGroupByParameter(),
+ false,
+ AggregationStep.SINGLE,
+ Ordering.DESC);
+
+ LastQueryTransformNode transformNode =
+ new LastQueryTransformNode(
+ context.getQueryId().genPlanNodeId(),
+ planBuilder.getRoot(),
+ expression.getViewPath().getFullPath(),
+ analysis.getType(expression).toString());
+ sourceNodeList.add(transformNode);
+ }
+ }
+
public LogicalPlanBuilder planAggregationSource(
AggregationStep curStep,
Ordering scanOrder,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index d5180aacf46..9a2d72dda40 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -92,6 +92,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.commons.lang3.StringUtils;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -127,9 +128,9 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
planBuilder =
planBuilder
.planLast(
- analysis.getSourceExpressions(),
- analysis.getGlobalTimeFilter(),
- analysis.getTimeseriesOrderingForLastQuery())
+ analysis,
+ analysis.getTimeseriesOrderingForLastQuery(),
+ queryStatement.getSelectComponent().getZoneId())
.planOffset(queryStatement.getRowOffset())
.planLimit(queryStatement.getRowLimit());
@@ -593,13 +594,14 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
showTimeSeriesStatement.isPrefixPath(),
analysis.getRelatedTemplateInfo())
.planSchemaQueryMerge(showTimeSeriesStatement.isOrderByHeat());
+
// show latest timeseries
if (showTimeSeriesStatement.isOrderByHeat()
&& null != analysis.getDataPartitionInfo()
&& 0 != analysis.getDataPartitionInfo().getDataPartitionMap().size()) {
PlanNode lastPlanNode =
new LogicalPlanBuilder(analysis, context)
- .planLast(analysis.getSourceExpressions(),
analysis.getGlobalTimeFilter(), null)
+ .planLast(analysis, null, ZoneId.systemDefault())
.getRoot();
planBuilder = planBuilder.planSchemaQueryOrderByHeat(lastPlanNode);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 3585b301e10..628d8ba879a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -92,13 +92,13 @@ import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.Mul
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator;
-import
org.apache.iotdb.db.queryengine.execution.operator.process.last.AbstractUpdateLastCacheOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.last.AlignedUpdateLastCacheOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.last.AlignedUpdateViewPathLastCacheOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryCollectOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryMergeOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQuerySortOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryTransformOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil;
import
org.apache.iotdb.db.queryengine.execution.operator.process.last.UpdateLastCacheOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.last.UpdateViewPathLastCacheOperator;
@@ -179,6 +179,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
@@ -2020,8 +2021,8 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
public Operator visitLastQueryScan(LastQueryScanNode node,
LocalExecutionPlanContext context) {
PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
TimeValuePair timeValuePair = null;
+ context.dataNodeQueryContext.lock();
try {
- context.dataNodeQueryContext.lock();
if (!context.dataNodeQueryContext.unCached(seriesPath)) {
timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
if (timeValuePair == null) {
@@ -2279,16 +2280,14 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
@Override
public Operator visitLastQuery(LastQueryNode node, LocalExecutionPlanContext
context) {
-
context.setLastQueryTimeFilter(node.getTimeFilter());
context.setNeedUpdateLastCache(LastQueryUtil.needUpdateCache(node.getTimeFilter()));
context.setNeedUpdateNullEntry(LastQueryUtil.needUpdateNullEntry(node.getTimeFilter()));
- List<AbstractUpdateLastCacheOperator> operatorList =
+ List<Operator> operatorList =
node.getChildren().stream()
.map(child -> child.accept(this, context))
.filter(Objects::nonNull)
- .map(o -> (AbstractUpdateLastCacheOperator) o)
.collect(Collectors.toList());
List<Pair<TimeValuePair, Binary>> cachedLastValueAndPathList =
@@ -2390,6 +2389,23 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
return new LastQueryCollectOperator(operatorContext, children);
}
+ @Override
+ public Operator visitLastQueryTransform(
+ LastQueryTransformNode node, LocalExecutionPlanContext context) {
+ Operator operator = node.getChild().accept(this, context);
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ LastQueryCollectOperator.class.getSimpleName());
+
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+ return new LastQueryTransformOperator(
+ node.getViewPath(), node.getDataType(), operatorContext, operator);
+ }
+
private Map<String, List<InputLocation>> makeLayout(PlanNode node) {
Map<String, List<InputLocation>> outputMappings = new LinkedHashMap<>();
int tsBlockIndex = 0;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
index 12b083c9af9..b8920cffbc7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWin
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
@@ -128,19 +129,33 @@ public class SubPlanTypeExtractor {
@Override
public Void visitLastQuery(LastQueryNode node, Void context) {
+ if (node.isContainsLastTransformNode()) {
+ return visitPlan(node, context);
+ }
return null;
}
@Override
public Void visitLastQueryMerge(LastQueryMergeNode node, Void context) {
+ if (node.isContainsLastTransformNode()) {
+ return visitPlan(node, context);
+ }
return null;
}
@Override
public Void visitLastQueryCollect(LastQueryCollectNode node, Void context)
{
+ if (node.isContainsLastTransformNode()) {
+ return visitPlan(node, context);
+ }
return null;
}
+ @Override
+ public Void visitLastQueryTransform(LastQueryTransformNode node, Void
context) {
+ return visitPlan(node, context);
+ }
+
// end region PlanNode of last read
private void updateTypeProviderByAggregationDescriptor(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
index 87efdf508b7..ce6b25c26dc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
@@ -41,7 +41,6 @@ public class DistributionPlanContext {
protected DistributionPlanContext(MPPQueryContext queryContext) {
this.isRoot = true;
this.queryContext = queryContext;
- this.forceAddParent = false;
}
protected DistributionPlanContext copy() {
@@ -53,8 +52,8 @@ public class DistributionPlanContext {
return this;
}
- protected void setForceAddParent(boolean forceAddParent) {
- this.forceAddParent = forceAddParent;
+ protected void setForceAddParent() {
+ this.forceAddParent = true;
}
public void setOneSeriesInMultiRegion(boolean oneSeriesInMultiRegion) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
index 2f14ffcd112..f0e016a5e52 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
@@ -81,11 +81,7 @@ public class DistributionPlanner {
public PlanNode addExchangeNode(PlanNode root) {
ExchangeNodeAdder adder = new ExchangeNodeAdder(this.analysis);
NodeGroupContext nodeGroupContext =
- new NodeGroupContext(
- context,
- analysis.getStatement() instanceof QueryStatement
- && (((QueryStatement)
analysis.getStatement()).isAlignByDevice()),
- root);
+ new NodeGroupContext(context, analysis.getStatement(), root);
PlanNode newRoot = adder.visit(root, nodeGroupContext);
adjustUpStream(newRoot, nodeGroupContext);
return newRoot;
@@ -196,7 +192,7 @@ public class DistributionPlanner {
List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
// Only execute this step for READ operation
if (context.getQueryType() == QueryType.READ) {
- SetSinkForRootInstance(subPlan, fragmentInstances);
+ setSinkForRootInstance(subPlan, fragmentInstances);
}
return new DistributedQueryPlan(
logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(),
fragmentInstances);
@@ -213,7 +209,7 @@ public class DistributionPlanner {
}
// TODO: (xingtanzjr) Maybe we should handle ResultNode in LogicalPlanner ?
- public void SetSinkForRootInstance(SubPlan subPlan, List<FragmentInstance>
instances) {
+ public void setSinkForRootInstance(SubPlan subPlan, List<FragmentInstance>
instances) {
FragmentInstance rootInstance = null;
for (FragmentInstance instance : instances) {
if
(instance.getFragment().getId().equals(subPlan.getPlanFragment().getId())) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index b031ca9947e..779f101a52b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -50,6 +50,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
@@ -58,8 +59,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggre
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -106,11 +105,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
private PlanNode internalVisitSchemaMerge(
AbstractSchemaMergeNode node, NodeGroupContext context) {
- node.getChildren()
- .forEach(
- child -> {
- visit(child, context);
- });
+ node.getChildren().forEach(child -> visit(child, context));
NodeDistribution nodeDistribution =
new NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN);
PlanNode newNode = node.clone();
@@ -229,6 +224,11 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
return processMultiChildNode(node, context);
}
+ @Override
+ public PlanNode visitLastQueryTransform(LastQueryTransformNode node,
NodeGroupContext context) {
+ return processOneChildNode(node, context);
+ }
+
@Override
public PlanNode visitTimeJoin(TimeJoinNode node, NodeGroupContext context) {
return processMultiChildNode(node, context);
@@ -281,46 +281,38 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
}
MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
- List<PlanNode> visitedChildren = new ArrayList<>();
- node.getChildren()
- .forEach(
- child -> {
- visitedChildren.add(visit(child, context));
- });
+ List<PlanNode> visitedChildren =
+ node.getChildren().stream()
+ .map(child -> visit(child, context))
+ .collect(Collectors.toList());
TRegionReplicaSet dataRegion;
- NodeDistributionType distributionType;
+ boolean isChildrenDistributionSame =
nodeDistributionIsSame(visitedChildren, context);
+ NodeDistributionType distributionType =
+ isChildrenDistributionSame
+ ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
+ : NodeDistributionType.SAME_WITH_SOME_CHILD;
if (context.isAlignByDevice()) {
// For align by device,
// if dataRegions of children are the same, we set child's dataRegion to
this node,
// else we set the selected mostlyUsedDataRegion to this node
- boolean inSame = nodeDistributionIsSame(visitedChildren, context);
dataRegion =
- inSame
+ isChildrenDistributionSame
?
context.getNodeDistribution(visitedChildren.get(0).getPlanNodeId()).region
: context.getMostlyUsedDataRegion();
context.putNodeDistribution(
- newNode.getPlanNodeId(),
- new NodeDistribution(
- inSame
- ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
- : NodeDistributionType.SAME_WITH_SOME_CHILD,
- dataRegion));
+ newNode.getPlanNodeId(), new NodeDistribution(distributionType,
dataRegion));
} else {
// TODO For align by time, we keep old logic for now
dataRegion = calculateDataRegionByChildren(visitedChildren, context);
- distributionType =
- nodeDistributionIsSame(visitedChildren, context)
- ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
- : NodeDistributionType.SAME_WITH_SOME_CHILD;
context.putNodeDistribution(
newNode.getPlanNodeId(), new NodeDistribution(distributionType,
dataRegion));
+ }
- // If the distributionType of all the children are same, no ExchangeNode
need to be added.
- if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
- newNode.setChildren(visitedChildren);
- return newNode;
- }
+ // If the distributionType of all the children are same, no ExchangeNode
need to be added.
+ if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
+ newNode.setChildren(visitedChildren);
+ return newNode;
}
// Otherwise, we need to add ExchangeNode for the child whose DataRegion
is different from the
@@ -381,6 +373,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
private TRegionReplicaSet calculateDataRegionByChildren(
List<PlanNode> children, NodeGroupContext context) {
+
// Step 1: calculate the count of children group by DataRegion.
Map<TRegionReplicaSet, Long> groupByRegion =
children.stream()
@@ -397,16 +390,27 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
return region;
},
Collectors.counting()));
- if (groupByRegion.entrySet().size() == 1) {
- return groupByRegion.entrySet().iterator().next().getKey();
+
+ if (groupByRegion.size() == 1) {
+ return groupByRegion.keySet().iterator().next();
}
+
// Step 2: return the RegionReplicaSet with max count
- return Collections.max(
- groupByRegion.entrySet().stream()
- .filter(e -> e.getKey() != DataPartition.NOT_ASSIGNED)
- .collect(Collectors.toList()),
- Map.Entry.comparingByValue())
- .getKey();
+ long maxRegionCount = -1;
+ TRegionReplicaSet result = null;
+ for (Map.Entry<TRegionReplicaSet, Long> entry : groupByRegion.entrySet()) {
+ if (DataPartition.NOT_ASSIGNED.equals(entry.getKey())) {
+ continue;
+ }
+ if (entry.getKey().equals(context.getMostlyUsedDataRegion())) {
+ return entry.getKey();
+ }
+ if (entry.getValue() > maxRegionCount) {
+ maxRegionCount = entry.getValue();
+ result = entry.getKey();
+ }
+ }
+ return result;
}
private TRegionReplicaSet calculateSchemaRegionByChildren(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
index 14799292714..9ea36e7994c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
@@ -25,23 +25,31 @@ import
org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class NodeGroupContext {
+
protected final MPPQueryContext queryContext;
private final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
- private final boolean isAlignByDevice;
- private final TRegionReplicaSet mostlyUsedDataRegion;
+ private boolean isAlignByDevice;
+ private TRegionReplicaSet mostlyUsedDataRegion;
protected boolean hasExchangeNode;
- public NodeGroupContext(MPPQueryContext queryContext, boolean
isAlignByDevice, PlanNode root) {
+ public NodeGroupContext(MPPQueryContext queryContext, Statement statement,
PlanNode root) {
this.queryContext = queryContext;
this.nodeDistributionMap = new HashMap<>();
- this.isAlignByDevice = isAlignByDevice;
- this.mostlyUsedDataRegion = isAlignByDevice ?
getMostlyUsedDataRegion(root) : null;
+ if (statement instanceof QueryStatement) {
+ this.isAlignByDevice = ((QueryStatement) statement).isAlignByDevice();
+ this.mostlyUsedDataRegion = getMostlyUsedDataRegion(root);
+ } else if (statement instanceof ShowTimeSeriesStatement) {
+ this.mostlyUsedDataRegion = getMostlyUsedDataRegion(root);
+ }
this.hasExchangeNode = false;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 2bf79e2a113..ff24932bca6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -38,6 +38,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNo
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastSeriesSourceNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -183,7 +184,9 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
});
if (analysis.getStatement() instanceof QueryStatement
- || analysis.getStatement() instanceof ShowQueriesStatement) {
+ || analysis.getStatement() instanceof ShowQueriesStatement
+ || (analysis.getStatement() instanceof ShowTimeSeriesStatement
+ && ((ShowTimeSeriesStatement)
analysis.getStatement()).isOrderByHeat())) {
fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider());
}
instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 5fd9c4972a6..ec481a98566 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -63,7 +63,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDe
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import java.util.ArrayList;
import java.util.Collections;
@@ -480,7 +479,10 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
LastQueryScanNode node, DistributionPlanContext context) {
LastQueryNode mergeNode =
new LastQueryNode(
- context.queryContext.getQueryId().genPlanNodeId(),
node.getPartitionTimeFilter(), null);
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.getPartitionTimeFilter(),
+ null,
+ false);
return processRawSeriesScan(node, context, mergeNode);
}
@@ -489,7 +491,10 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
AlignedLastQueryScanNode node, DistributionPlanContext context) {
LastQueryNode mergeNode =
new LastQueryNode(
- context.queryContext.getQueryId().genPlanNodeId(),
node.getPartitionTimeFilter(), null);
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.getPartitionTimeFilter(),
+ null,
+ false);
return processRawSeriesScan(node, context, mergeNode);
}
@@ -619,8 +624,8 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
public List<PlanNode> visitLastQuery(LastQueryNode node,
DistributionPlanContext context) {
// For last query, we need to keep every FI's root node is
LastQueryMergeNode. So we
// force every region group have a parent node even if there is only 1
child for it.
- context.setForceAddParent(true);
- PlanNode root = processRawMultiChildNode(node, context);
+ context.setForceAddParent();
+ PlanNode root = processRawMultiChildNode(node, context, true);
if (context.queryMultiRegion) {
PlanNode newRoot = genLastQueryRootNode(node, context);
// add sort op for each if we add LastQueryMergeNode as root
@@ -678,9 +683,10 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
// if the series is from multi regions or order by clause only refer to
timeseries, use
// LastQueryMergeNode
if (context.oneSeriesInMultiRegion || node.needOrderByTimeseries()) {
- return new LastQueryMergeNode(id, node.getTimeseriesOrdering());
+ return new LastQueryMergeNode(
+ id, node.getTimeseriesOrdering(),
node.isContainsLastTransformNode());
}
- return new LastQueryCollectNode(id);
+ return new LastQueryCollectNode(id, node.isContainsLastTransformNode());
}
@Override
@@ -691,11 +697,11 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
if (containsAggregationSource(node)) {
return planAggregationWithTimeJoin(node, context);
}
- return Collections.singletonList(processRawMultiChildNode(node, context));
+ return Collections.singletonList(processRawMultiChildNode(node, context,
false));
}
private PlanNode processRawMultiChildNode(
- MultiChildProcessNode node, DistributionPlanContext context) {
+ MultiChildProcessNode node, DistributionPlanContext context, boolean
isLast) {
MultiChildProcessNode root = (MultiChildProcessNode) node.clone();
// Step 1: Get all source nodes. For the node which is not source, add it
as the child of
// current TimeJoinNode
@@ -704,9 +710,10 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
if (child instanceof SeriesSourceNode) {
// If the child is SeriesScanNode, we need to check whether this node
should be seperated
// into several splits.
- SeriesSourceNode handle = (SeriesSourceNode) child;
+ SeriesSourceNode sourceNode = (SeriesSourceNode) child;
List<TRegionReplicaSet> dataDistribution =
- analysis.getPartitionInfo(handle.getPartitionPath(),
handle.getPartitionTimeFilter());
+ analysis.getPartitionInfo(
+ sourceNode.getPartitionPath(),
sourceNode.getPartitionTimeFilter());
if (dataDistribution.size() > 1) {
// We mark this variable to `true` if there is some series which is
distributed in multi
// DataRegions
@@ -715,17 +722,17 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
// If the size of dataDistribution is m, this SeriesScanNode should be
seperated into m
// SeriesScanNode.
for (TRegionReplicaSet dataRegion : dataDistribution) {
- SeriesSourceNode split = (SeriesSourceNode) handle.clone();
+ SeriesSourceNode split = (SeriesSourceNode) sourceNode.clone();
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
split.setRegionReplicaSet(dataRegion);
sources.add(split);
}
}
}
+
// Step 2: For the source nodes, group them by the DataRegion.
Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
-
if (sourceGroup.size() > 1) {
context.setQueryMultiRegion(true);
}
@@ -734,35 +741,31 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
// and make the
// new TimeJoinNode as the child of current TimeJoinNode
// TODO: (xingtanzjr) optimize the procedure here to remove duplicated
TimeJoinNode
- final boolean[] addParent = {false};
- sourceGroup.forEach(
- (dataRegion, seriesScanNodes) -> {
- if (seriesScanNodes.size() == 1 && !context.forceAddParent) {
- root.addChild(seriesScanNodes.get(0));
- } else {
- // If there is only one RegionGroup here, we should not create new
MultiChildNode as the
- // parent.
- // If the size of RegionGroup is larger than 1, we need to
consider the value of
- // `forceAddParent`.
- // If `forceAddParent` is true, we should not create new
MultiChildNode as the parent,
- // either.
- // At last, we can use the parameter `addParent[0]` to judge
whether to create new
- // MultiChildNode.
- boolean appendToRootDirectly =
- sourceGroup.size() == 1 || (!addParent[0] &&
!context.forceAddParent);
- if (appendToRootDirectly) {
- seriesScanNodes.forEach(root::addChild);
- addParent[0] = true;
- } else {
- // We clone a TimeJoinNode from root to make the params to be
consistent.
- // But we need to assign a new ID to it
- MultiChildProcessNode parentOfGroup = (MultiChildProcessNode)
root.clone();
-
parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
- seriesScanNodes.forEach(parentOfGroup::addChild);
- root.addChild(parentOfGroup);
- }
- }
- });
+ boolean addParent = false;
+ for (List<SourceNode> seriesScanNodes : sourceGroup.values()) {
+ if (seriesScanNodes.size() == 1 && (!context.forceAddParent || !isLast))
{
+ root.addChild(seriesScanNodes.get(0));
+ continue;
+ }
+ // If size of RegionGroup = 1, we should not create new MultiChildNode
as the parent.
+ // If size of RegionGroup > 1, we need to consider the value of
`forceAddParent`.
+ // If `forceAddParent` is true, we should not create new MultiChildNode
as the parent, either.
+ // At last, we can use the parameter `addParent` to judge whether to
create new
+ // MultiChildNode.
+ boolean appendToRootDirectly =
+ sourceGroup.size() == 1 || (!addParent && !context.forceAddParent);
+ if (appendToRootDirectly) {
+ seriesScanNodes.forEach(root::addChild);
+ addParent = true;
+ } else {
+ // We clone a TimeJoinNode from root to make the params to be
consistent.
+ // But we need to assign a new ID to it
+ MultiChildProcessNode parentOfGroup = (MultiChildProcessNode)
root.clone();
+
parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ seriesScanNodes.forEach(parentOfGroup::addChild);
+ root.addChild(parentOfGroup);
+ }
+ }
// Process the other children which are not SeriesSourceNode
for (PlanNode child : node.getChildren()) {
@@ -777,10 +780,6 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
return root;
}
- private boolean isAggregationQuery() {
- return ((QueryStatement) analysis.getStatement()).isAggregationQuery();
- }
-
private boolean containsAggregationSource(TimeJoinNode node) {
for (PlanNode child : node.getChildren()) {
if (child instanceof SeriesAggregationScanNode
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
index 38741d91e0e..0bceb53eb5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
@@ -60,10 +60,7 @@ public class SubPlan {
public List<PlanFragment> getPlanFragmentList() {
List<PlanFragment> result = new ArrayList<>();
result.add(this.planFragment);
- this.children.forEach(
- child -> {
- result.addAll(child.getPlanFragmentList());
- });
+ this.children.forEach(child -> result.addAll(child.getPlanFragmentList()));
return result;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index ec63414d2f7..820a41352aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -45,6 +45,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
@@ -423,6 +424,14 @@ public class PlanGraphPrinter extends
PlanVisitor<List<String>, PlanGraphPrinter
return render(node, boxValue, context);
}
+ @Override
+ public List<String> visitLastQueryTransform(LastQueryTransformNode node,
GraphContext context) {
+ List<String> boxValue = new ArrayList<>();
+ boxValue.add(String.format("LastQueryTransform-%s",
node.getPlanNodeId().getId()));
+ boxValue.add(String.format("ViewPath: %s", node.getViewPath()));
+ return render(node, boxValue, context);
+ }
+
@Override
public List<String> visitHorizontallyConcat(HorizontallyConcatNode node,
GraphContext context) {
List<String> boxValue = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 2364aef9d7e..98be8a79971 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -78,6 +78,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
@@ -180,7 +181,8 @@ public enum PlanNodeType {
LOGICAL_VIEW_SCHEMA_SCAN((short) 77),
ALTER_LOGICAL_VIEW((short) 78),
PIPE_ENRICHED_INSERT((short) 79),
- ;
+ FORECAST((short) 80),
+ LAST_QUERY_TRANSFORM((short) 81);
public static final int BYTES = Short.BYTES;
@@ -385,6 +387,8 @@ public enum PlanNodeType {
return AlterLogicalViewNode.deserialize(buffer);
case 79:
return PipeEnrichedInsertNode.deserialize(buffer);
+ case 81:
+ return LastQueryTransformNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index e6eb22fc5a2..965e83beb15 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -78,6 +78,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
@@ -234,6 +235,10 @@ public abstract class PlanVisitor<R, C> {
return visitMultiChildProcess(node, context);
}
+ public R visitLastQueryTransform(LastQueryTransformNode node, C context) {
+ return visitSingleChildProcess(node, context);
+ }
+
public R visitMergeSort(MergeSortNode node, C context) {
return visitMultiChildProcess(node, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
index e38b851e8dd..fbba5c6d336 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
@@ -34,8 +34,11 @@ import static
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.Last
public class LastQueryCollectNode extends MultiChildProcessNode {
- public LastQueryCollectNode(PlanNodeId id) {
+ private boolean containsLastTransformNode;
+
+ public LastQueryCollectNode(PlanNodeId id, boolean
containsLastTransformNode) {
super(id);
+ this.containsLastTransformNode = containsLastTransformNode;
}
public LastQueryCollectNode(PlanNodeId id, List<PlanNode> children) {
@@ -54,7 +57,7 @@ public class LastQueryCollectNode extends
MultiChildProcessNode {
@Override
public PlanNode clone() {
- return new LastQueryCollectNode(getPlanNodeId());
+ return new LastQueryCollectNode(getPlanNodeId(),
containsLastTransformNode);
}
@Override
@@ -99,11 +102,19 @@ public class LastQueryCollectNode extends
MultiChildProcessNode {
public static LastQueryCollectNode deserialize(ByteBuffer byteBuffer) {
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new LastQueryCollectNode(planNodeId);
+ return new LastQueryCollectNode(planNodeId, false);
}
@Override
public void setChildren(List<PlanNode> children) {
this.children = children;
}
+
+ public boolean isContainsLastTransformNode() {
+ return this.containsLastTransformNode;
+ }
+
+ public void setContainsLastQueryTransformNode() {
+ this.containsLastTransformNode = true;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java
index 3c240f48903..7bdc4b245b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java
@@ -40,14 +40,14 @@ public class LastQueryMergeNode extends
MultiChildProcessNode {
// The size of this list is 2 and the first SortItem in this list has higher
priority.
private final Ordering timeseriesOrdering;
- public LastQueryMergeNode(PlanNodeId id, Ordering timeseriesOrdering) {
- super(id);
- this.timeseriesOrdering = timeseriesOrdering;
- }
+ // if children contains LastTransformNode
+ private boolean containsLastTransformNode;
- public LastQueryMergeNode(PlanNodeId id, List<PlanNode> children, Ordering
timeseriesOrdering) {
- super(id, children);
+ public LastQueryMergeNode(
+ PlanNodeId id, Ordering timeseriesOrdering, boolean
containsLastTransformNode) {
+ super(id);
this.timeseriesOrdering = timeseriesOrdering;
+ this.containsLastTransformNode = containsLastTransformNode;
}
@Override
@@ -62,7 +62,7 @@ public class LastQueryMergeNode extends MultiChildProcessNode
{
@Override
public PlanNode clone() {
- return new LastQueryMergeNode(getPlanNodeId(), timeseriesOrdering);
+ return new LastQueryMergeNode(getPlanNodeId(), timeseriesOrdering,
containsLastTransformNode);
}
@Override
@@ -135,7 +135,7 @@ public class LastQueryMergeNode extends
MultiChildProcessNode {
timeseriesOrdering =
Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
}
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new LastQueryMergeNode(planNodeId, timeseriesOrdering);
+ return new LastQueryMergeNode(planNodeId, timeseriesOrdering, false);
}
@Override
@@ -146,4 +146,12 @@ public class LastQueryMergeNode extends
MultiChildProcessNode {
public Ordering getTimeseriesOrdering() {
return timeseriesOrdering;
}
+
+ public boolean isContainsLastTransformNode() {
+ return this.containsLastTransformNode;
+ }
+
+ public void setContainsNonWritableView() {
+ this.containsLastTransformNode = true;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java
index a47810a0076..0d6ade61aad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java
@@ -46,20 +46,30 @@ public class LastQueryNode extends MultiChildProcessNode {
// which is set to null if there is no need to sort
private Ordering timeseriesOrdering;
- public LastQueryNode(PlanNodeId id, Filter timeFilter, @Nullable Ordering
timeseriesOrdering) {
+ // if children contains LastTransformNode, this variable is only used in
distribute plan
+ private boolean containsLastTransformNode;
+
+ public LastQueryNode(
+ PlanNodeId id,
+ Filter timeFilter,
+ @Nullable Ordering timeseriesOrdering,
+ boolean containsLastTransformNode) {
super(id);
this.timeFilter = timeFilter;
this.timeseriesOrdering = timeseriesOrdering;
+ this.containsLastTransformNode = containsLastTransformNode;
}
public LastQueryNode(
PlanNodeId id,
List<PlanNode> children,
Filter timeFilter,
- @Nullable Ordering timeseriesOrdering) {
+ @Nullable Ordering timeseriesOrdering,
+ boolean containsLastTransformNode) {
super(id, children);
this.timeFilter = timeFilter;
this.timeseriesOrdering = timeseriesOrdering;
+ this.containsLastTransformNode = containsLastTransformNode;
}
@Override
@@ -74,7 +84,8 @@ public class LastQueryNode extends MultiChildProcessNode {
@Override
public PlanNode clone() {
- return new LastQueryNode(getPlanNodeId(), timeFilter, timeseriesOrdering);
+ return new LastQueryNode(
+ getPlanNodeId(), timeFilter, timeseriesOrdering,
containsLastTransformNode);
}
@Override
@@ -163,7 +174,7 @@ public class LastQueryNode extends MultiChildProcessNode {
timeseriesOrdering =
Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
}
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new LastQueryNode(planNodeId, timeFilter, timeseriesOrdering);
+ return new LastQueryNode(planNodeId, timeFilter, timeseriesOrdering,
false);
}
@Override
@@ -184,6 +195,14 @@ public class LastQueryNode extends MultiChildProcessNode {
this.timeseriesOrdering = timeseriesOrdering;
}
+ public boolean isContainsLastTransformNode() {
+ return this.containsLastTransformNode;
+ }
+
+ public void setContainsLastTransformNode() {
+ this.containsLastTransformNode = true;
+ }
+
public boolean needOrderByTimeseries() {
return timeseriesOrdering != null;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
similarity index 52%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
index e38b851e8dd..108189259e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
@@ -16,13 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -32,78 +34,96 @@ import java.util.Objects;
import static
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
-public class LastQueryCollectNode extends MultiChildProcessNode {
+public class LastQueryTransformNode extends SingleChildProcessNode {
+
+ private final String viewPath;
- public LastQueryCollectNode(PlanNodeId id) {
+ private final String dataType;
+
+ public LastQueryTransformNode(PlanNodeId id, String viewPath, String
dataType) {
super(id);
+ this.viewPath = viewPath;
+ this.dataType = dataType;
}
- public LastQueryCollectNode(PlanNodeId id, List<PlanNode> children) {
- super(id, children);
+ public LastQueryTransformNode(PlanNodeId id, PlanNode aggNode, String
viewPath, String dataType) {
+ super(id, aggNode);
+ this.viewPath = viewPath;
+ this.dataType = dataType;
}
@Override
- public List<PlanNode> getChildren() {
- return children;
+ public PlanNode clone() {
+ return new LastQueryTransformNode(getPlanNodeId(), viewPath, dataType);
}
@Override
- public void addChild(PlanNode child) {
- children.add(child);
+ public List<String> getOutputColumnNames() {
+ return LAST_QUERY_HEADER_COLUMNS;
}
@Override
- public PlanNode clone() {
- return new LastQueryCollectNode(getPlanNodeId());
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.LAST_QUERY_TRANSFORM.serialize(byteBuffer);
+ ReadWriteIOUtils.write(viewPath, byteBuffer);
+ ReadWriteIOUtils.write(dataType, byteBuffer);
}
@Override
- public int allowedChildCount() {
- return CHILD_COUNT_NO_LIMIT;
+ protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ PlanNodeType.LAST_QUERY_TRANSFORM.serialize(stream);
+ ReadWriteIOUtils.write(viewPath, stream);
+ ReadWriteIOUtils.write(dataType, stream);
}
- @Override
- public List<String> getOutputColumnNames() {
- return LAST_QUERY_HEADER_COLUMNS;
+ public static LastQueryTransformNode deserialize(ByteBuffer byteBuffer) {
+ String viewPath = ReadWriteIOUtils.readString(byteBuffer);
+ String dataType = ReadWriteIOUtils.readString(byteBuffer);
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new LastQueryTransformNode(planNodeId, viewPath, dataType);
}
@Override
- public String toString() {
- return String.format("LastQueryCollectNode-%s", this.getPlanNodeId());
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitLastQueryTransform(this, context);
}
@Override
public boolean equals(Object o) {
- return super.equals(o);
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ LastQueryTransformNode that = (LastQueryTransformNode) o;
+ return viewPath.equals(that.viewPath) && dataType.equals(that.dataType);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode());
- }
-
- @Override
- public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitLastQueryCollect(this, context);
+ return Objects.hash(super.hashCode(), viewPath, dataType);
}
@Override
- protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.LAST_QUERY_COLLECT.serialize(byteBuffer);
+ public String toString() {
+ return String.format(
+ "LastQueryTransformNode-%s:[ViewPath: %s, DataType: %s]",
+ this.getPlanNodeId(), viewPath, dataType);
}
- @Override
- protected void serializeAttributes(DataOutputStream stream) throws
IOException {
- PlanNodeType.LAST_QUERY_COLLECT.serialize(stream);
+ public String getViewPath() {
+ return this.viewPath;
}
- public static LastQueryCollectNode deserialize(ByteBuffer byteBuffer) {
- PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new LastQueryCollectNode(planNodeId);
+ public String getDataType() {
+ return this.dataType;
}
- @Override
- public void setChildren(List<PlanNode> children) {
- this.children = children;
+ public String getOutputSymbolForSort() {
+ return viewPath;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java
index 7b7b5e8b500..b2197609e84 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java
@@ -87,6 +87,11 @@ public class IdentitySinkNode extends MultiChildrenSinkNode {
}
}
+ @Override
+ public String toString() {
+ return String.format("IdentitySinkNode-%s", this.getPlanNodeId());
+ }
+
public static IdentitySinkNode deserialize(ByteBuffer byteBuffer) {
int size = ReadWriteIOUtils.readInt(byteBuffer);
List<DownStreamChannelLocation> downStreamChannelLocationList = new
ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
index 28cecd5f4e1..638c1c81e3f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
@@ -55,10 +55,10 @@ public class AlignedSeriesScanNode extends SeriesSourceNode
{
// The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
private Ordering scanOrder = Ordering.ASC;
- // time filter for current series, could be null if doesn't exist
+ // time filter for current series, could be null if it doesn't exist
@Nullable private Filter timeFilter;
- // value filter for current series, could be null if doesn't exist
+ // value filter for current series, could be null if it doesn't exist
@Nullable private Filter valueFilter;
// Limit for result set. The default value is -1, which means no limit
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
index 539d90bc84d..0231def964d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
@@ -48,7 +48,6 @@ import
org.apache.iotdb.db.queryengine.execution.operator.process.fill.IFill;
import
org.apache.iotdb.db.queryengine.execution.operator.process.fill.linear.LinearFill;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.HorizontallyConcatOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.RowBasedTimeJoinOperator;
-import
org.apache.iotdb.db.queryengine.execution.operator.process.last.AbstractUpdateLastCacheOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryCollectOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryMergeOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryOperator;
@@ -346,7 +345,7 @@ public class OperatorMemoryTest {
public void lastQueryOperatorTest() {
TsBlockBuilder builder = Mockito.mock(TsBlockBuilder.class);
Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(1024L);
- List<AbstractUpdateLastCacheOperator> children = new ArrayList<>(4);
+ List<Operator> children = new ArrayList<>(4);
long expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
for (int i = 0; i < 4; i++) {
UpdateLastCacheOperator child =
Mockito.mock(UpdateLastCacheOperator.class);
@@ -376,7 +375,7 @@ public class OperatorMemoryTest {
TsBlock tsBlock = Mockito.mock(TsBlock.class);
Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(16 * 1024L);
Mockito.when(tsBlock.getPositionCount()).thenReturn(16);
- List<AbstractUpdateLastCacheOperator> children = new ArrayList<>(4);
+ List<Operator> children = new ArrayList<>(4);
for (int i = 0; i < 4; i++) {
UpdateLastCacheOperator child =
Mockito.mock(UpdateLastCacheOperator.class);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java
index b36feab3cff..87ffe9baca7 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java
@@ -143,7 +143,7 @@ public class QueryLogicalPlanUtil {
LastQueryNode lastQueryNode =
new LastQueryNode(
- queryId.genPlanNodeId(), sourceNodeList, TimeFilter.gt(100),
Ordering.ASC);
+ queryId.genPlanNodeId(), sourceNodeList, TimeFilter.gt(100),
Ordering.ASC, false);
querySQLs.add(sql);
sqlToPlanMap.put(sql, lastQueryNode);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/LastQueryTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/LastQueryTest.java
index db875ef9b38..13d1586d475 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/LastQueryTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/LastQueryTest.java
@@ -209,7 +209,7 @@ public class LastQueryTest {
}
PlanNode root =
- new LastQueryNode(context.getQueryId().genPlanNodeId(),
sourceNodeList, null, null);
+ new LastQueryNode(context.getQueryId().genPlanNodeId(),
sourceNodeList, null, null, false);
return new LogicalQueryPlan(context, root);
}
}