This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/topk
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/beyyes/topk by this push:
new 9dd4718a05b add impl
9dd4718a05b is described below
commit 9dd4718a05b6f4fdf0b4d8c81735f17609066dc5
Author: Beyyes <[email protected]>
AuthorDate: Thu Jul 18 18:14:37 2024 +0800
add impl
---
.../plan/planner/TableOperatorGenerator.java | 2 +-
.../plan/relational/planner/QueryPlanner.java | 6 +-
.../distribute/DistributedPlanGenerator.java | 50 +++++++++--
.../distribute/TableDistributionPlanner.java | 3 +-
.../TableModelTypeProviderExtractor.java | 7 ++
.../rule/MergeLimitOverProjectWithSort.java | 98 ++++++++++++++++++++++
.../planner/iterative/rule/MergeLimitWithSort.java | 60 +++++++++++++
.../iterative/rule/PruneTableScanColumns.java | 5 +-
.../planner/iterative/rule/PruneTopKColumns.java | 31 +++++++
.../plan/relational/planner/node/Patterns.java | 9 +-
.../plan/relational/planner/node/SortNode.java | 31 +++++--
.../relational/planner/node/StreamSortNode.java | 10 ++-
.../relational/planner/node/TableScanNode.java | 30 ++++++-
.../plan/relational/planner/node/TopKNode.java | 8 +-
.../planner/optimizations/OptimizeFactory.java | 25 ++++--
.../PushLimitOffsetIntoTableScan.java | 16 +++-
.../optimizations/TransformSortToStreamSort.java | 9 ++
.../analyzer/LimitOffsetPushDownTest.java | 58 ++++++++-----
.../plan/relational/analyzer/SortTest.java | 84 ++++---------------
.../plan/relational/analyzer/TestPlanBuilder.java | 10 ++-
20 files changed, 418 insertions(+), 134 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 0a6c26634da..54faac3d78d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -653,7 +653,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
dataTypes,
getComparatorForTable(
node.getOrderingScheme().getOrderingList(), sortItemIndexList,
sortItemDataTypeList),
- node.getCount(),
+ (int) node.getCount(),
node.isChildrenDataInOrder());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
index a9c094d7d9a..0a49d7c0492 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
@@ -279,7 +279,11 @@ public class QueryPlanner {
return subPlan.withNewRoot(
new SortNode(
- queryIdAllocator.genPlanNodeId(), subPlan.getRoot(),
orderingScheme.get(), false));
+ queryIdAllocator.genPlanNodeId(),
+ subPlan.getRoot(),
+ orderingScheme.get(),
+ false,
+ false));
}
private PlanBuilder offset(PlanBuilder subPlan, Optional<Offset> offset) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
index f37c384e4d4..4c4e3cf1935 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
@@ -42,6 +42,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
@@ -64,6 +65,7 @@ import static
org.apache.iotdb.commons.schema.SchemaConstant.ROOT;
import static
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.TableDeviceSchemaValidator.parseDeviceIdArray;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction;
import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
+import static org.apache.tsfile.utils.Preconditions.checkArgument;
/** This class is used to generate distributed plan for table model. */
public class DistributedPlanGenerator
@@ -193,10 +195,42 @@ public class DistributedPlanGenerator
return resultNodeList;
}
+ @Override
+ public List<PlanNode> visitTopK(TopKNode node, PlanContext context) {
+ context.expectedOrderingScheme = node.getOrderingScheme();
+ context.hasSortProperty = true;
+ nodeOrderingMap.put(node.getPlanNodeId(), node.getOrderingScheme());
+
+ checkArgument(
+ node.getChildren().size() == 1, "Size of TopKNode can only be 1 in
logical plan.");
+ List<PlanNode> childrenNodes = node.getChildren().get(0).accept(this,
context);
+ if (childrenNodes.size() == 1) {
+ node.setChildren(Collections.singletonList(childrenNodes.get(0)));
+ return Collections.singletonList(node);
+ }
+
+ TopKNode newTopKNode = (TopKNode) node.clone();
+ for (int i = 0; i < childrenNodes.size(); i++) {
+ PlanNode child = childrenNodes.get(i);
+ TopKNode subTopKNode =
+ new TopKNode(
+ queryId.genPlanNodeId(),
+ Collections.singletonList(child),
+ node.getOrderingScheme(),
+ node.getCount(),
+ node.getOutputSymbols(),
+ node.isChildrenDataInOrder());
+ newTopKNode.addChild(subTopKNode);
+ }
+ nodeOrderingMap.put(newTopKNode.getPlanNodeId(),
newTopKNode.getOrderingScheme());
+
+ return Collections.singletonList(newTopKNode);
+ }
+
@Override
public List<PlanNode> visitSort(SortNode node, PlanContext context) {
context.expectedOrderingScheme = node.getOrderingScheme();
- context.hasSortNode = true;
+ context.hasSortProperty = true;
nodeOrderingMap.put(node.getPlanNodeId(), node.getOrderingScheme());
List<PlanNode> childrenNodes = node.getChild().accept(this, context);
@@ -211,7 +245,7 @@ public class DistributedPlanGenerator
queryId.genPlanNodeId(), node.getOrderingScheme(),
node.getOutputSymbols());
for (PlanNode child : childrenNodes) {
SortNode subSortNode =
- new SortNode(queryId.genPlanNodeId(), child,
node.getOrderingScheme(), false);
+ new SortNode(queryId.genPlanNodeId(), child,
node.getOrderingScheme(), false, false);
mergeSortNode.addChild(subSortNode);
}
nodeOrderingMap.put(mergeSortNode.getPlanNodeId(),
mergeSortNode.getOrderingScheme());
@@ -222,7 +256,7 @@ public class DistributedPlanGenerator
@Override
public List<PlanNode> visitStreamSort(StreamSortNode node, PlanContext
context) {
context.expectedOrderingScheme = node.getOrderingScheme();
- context.hasSortNode = true;
+ context.hasSortProperty = true;
nodeOrderingMap.put(node.getPlanNodeId(), node.getOrderingScheme());
List<PlanNode> childrenNodes = node.getChild().accept(this, context);
@@ -242,6 +276,7 @@ public class DistributedPlanGenerator
child,
node.getOrderingScheme(),
false,
+ node.isOrderByAllIdsAndTime(),
node.getStreamCompareKeyEndIndex());
mergeSortNode.addChild(subSortNode);
}
@@ -309,7 +344,10 @@ public class DistributedPlanGenerator
node.getIdAndAttributeIndexMap(),
node.getScanOrder(),
node.getTimePredicate().orElse(null),
- node.getPushDownPredicate());
+ node.getPushDownPredicate(),
+ node.getPushDownLimit(),
+ node.getPushDownOffset(),
+ node.isPushLimitToEachDevice());
scanNode.setRegionReplicaSet(regionReplicaSet);
return scanNode;
});
@@ -335,7 +373,7 @@ public class DistributedPlanGenerator
}
context.mostUsedDataRegion = mostUsedDataRegion;
- if (!context.hasSortNode) {
+ if (!context.hasSortProperty) {
return resultTableScanNodeList;
}
@@ -531,7 +569,7 @@ public class DistributedPlanGenerator
public static class PlanContext {
final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
boolean hasExchangeNode = false;
- boolean hasSortNode = false;
+ boolean hasSortProperty = false;
OrderingScheme expectedOrderingScheme;
TRegionReplicaSet mostUsedDataRegion;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
index 04caab49710..4a0de5ccb8b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
@@ -29,7 +29,6 @@ import
org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
import
org.apache.iotdb.db.queryengine.plan.relational.execution.querystats.PlanOptimizersStatsCollector;
import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer;
-import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushLimitOffsetIntoTableScan;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.SortElimination;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
@@ -53,7 +52,7 @@ public class TableDistributionPlanner {
this.analysis = analysis;
this.logicalQueryPlan = logicalQueryPlan;
this.mppQueryContext = mppQueryContext;
- this.optimizers = Arrays.asList(new PushLimitOffsetIntoTableScan(), new
SortElimination());
+ this.optimizers = Arrays.asList(new SortElimination());
}
public DistributedQueryPlan plan() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
index 08369519559..1309d364398 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.tsfile.read.common.type.BooleanType;
@@ -120,6 +121,12 @@ public class TableModelTypeProviderExtractor {
return null;
}
+ @Override
+ public Void visitTopK(TopKNode node, Void context) {
+ node.getChildren().forEach(c -> c.accept(this, context));
+ return null;
+ }
+
@Override
public Void visitStreamSort(StreamSortNode node, Void context) {
node.getChild().accept(this, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeLimitOverProjectWithSort.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeLimitOverProjectWithSort.java
new file mode 100644
index 00000000000..cc78b745b72
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeLimitOverProjectWithSort.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import com.google.common.collect.ImmutableList;
+
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.limit;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.project;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.sort;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source;
+import static
org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture;
+
+/**
+ * Transforms:
+ *
+ * <pre>
+ * - Limit (limit = x)
+ * - Project (identity, narrowing)
+ * - Sort (order by a, b)
+ * </pre>
+ *
+ * Into:
+ *
+ * <pre>
+ * - Project (identity, narrowing)
+ * - TopN (limit = x, order by a, b)
+ * </pre>
+ *
+ * Applies to LimitNode without ties only.
+ */
+public class MergeLimitOverProjectWithSort implements Rule<LimitNode> {
+ private static final Capture<ProjectNode> PROJECT = newCapture();
+ private static final Capture<SortNode> SORT = newCapture();
+
+ private static final Pattern<LimitNode> PATTERN =
+ limit()
+ // .matching(limit -> !limit.isWithTies())
+ .with(
+ source()
+ .matching(
+ project()
+ .capturedAs(PROJECT)
+ .matching(ProjectNode::isIdentity)
+ .with(source().matching(sort().capturedAs(SORT)))));
+
+ @Override
+ public Pattern<LimitNode> getPattern() {
+ return PATTERN;
+ }
+
+ @Override
+ public Result apply(LimitNode parent, Captures captures, Context context) {
+ ProjectNode project = captures.get(PROJECT);
+ SortNode sort = captures.get(SORT);
+
+ if (sort instanceof StreamSortNode) {
+ return Result.empty();
+ }
+
+ return Result.ofPlanNode(
+ project.replaceChildren(
+ ImmutableList.of(
+ new TopKNode(
+ parent.getPlanNodeId(),
+ sort.getChildren(),
+ sort.getOrderingScheme(),
+ parent.getCount(),
+ parent.getOutputSymbols(),
+ sort.isOrderByAllIdsAndTime()))));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeLimitWithSort.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeLimitWithSort.java
new file mode 100644
index 00000000000..49913558f38
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeLimitWithSort.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.limit;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.sort;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source;
+import static
org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture;
+
+public class MergeLimitWithSort implements Rule<LimitNode> {
+ private static final Capture<SortNode> CHILD = newCapture();
+
+ private static final Pattern<LimitNode> PATTERN =
+ limit()
+ // .matching(limit -> !limit.isWithTies())
+ .with(source().matching(sort().capturedAs(CHILD)));
+
+ @Override
+ public Pattern<LimitNode> getPattern() {
+ return PATTERN;
+ }
+
+ @Override
+ public Result apply(LimitNode parent, Captures captures, Context context) {
+ SortNode child = captures.get(CHILD);
+
+ if (child instanceof StreamSortNode) {
+ return Result.empty();
+ }
+
+ return Result.ofPlanNode(
+ new TopKNode(
+ parent.getPlanNodeId(),
+ child.getChildren(),
+ child.getOrderingScheme(),
+ parent.getCount(),
+ parent.getOutputSymbols(),
+ child.isOrderByAllIdsAndTime()));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java
index d086a685b4c..6afca034475 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java
@@ -97,6 +97,9 @@ public class PruneTableScanColumns extends
ProjectOffPushDownRule<TableScanNode>
node.getIdAndAttributeIndexMap(),
node.getScanOrder(),
node.getTimePredicate().orElse(null),
- node.getPushDownPredicate()));
+ node.getPushDownPredicate(),
+ node.getPushDownLimit(),
+ node.getPushDownOffset(),
+ node.isPushLimitToEachDevice()));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTopKColumns.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTopKColumns.java
new file mode 100644
index 00000000000..e95b1efa16b
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTopKColumns.java
@@ -0,0 +1,31 @@
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
+
+import com.google.common.collect.Streams;
+
+import java.util.Optional;
+import java.util.Set;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.restrictChildOutputs;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.topK;
+
+public class PruneTopKColumns extends ProjectOffPushDownRule<TopKNode> {
+ public PruneTopKColumns() {
+ super(topK());
+ }
+
+ @Override
+ protected Optional<PlanNode> pushDownProjectOff(
+ Context context, TopKNode topKNode, Set<Symbol> referencedOutputs) {
+ Set<Symbol> prunedTopNInputs =
+ Streams.concat(
+ referencedOutputs.stream(),
topKNode.getOrderingScheme().getOrderBy().stream())
+ .collect(toImmutableSet());
+
+ return restrictChildOutputs(context.getIdAllocator(), topKNode,
prunedTopNInputs);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
index 489df2f8366..541db20fe00 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
@@ -160,14 +160,13 @@ public final class Patterns {
return typeOf(TableScanNode.class);
}
- /*public static Pattern<TableWriterNode> tableWriterNode()
- {
- return typeOf(TableWriterNode.class);
+ public static Pattern<TopKNode> topK() {
+ return typeOf(TopKNode.class);
}
- public static Pattern<TopNNode> topN()
+ /*public static Pattern<TableWriterNode> tableWriterNode()
{
- return typeOf(TopNNode.class);
+ return typeOf(TableWriterNode.class);
}
public static Pattern<UnionNode> union()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
index 2e85d983fb7..67e9ad4f2aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
@@ -34,16 +34,24 @@ import java.util.List;
public class SortNode extends SingleChildProcessNode {
protected final OrderingScheme orderingScheme;
protected final boolean partial;
-
- public SortNode(PlanNodeId id, PlanNode child, OrderingScheme scheme,
boolean partial) {
+ // when order by all ids and time, this sort node can be eliminated in a way
+ protected boolean orderByAllIdsAndTime;
+
+ public SortNode(
+ PlanNodeId id,
+ PlanNode child,
+ OrderingScheme scheme,
+ boolean partial,
+ boolean orderByAllIdsAndTime) {
super(id, child);
this.orderingScheme = scheme;
this.partial = partial;
+ this.orderByAllIdsAndTime = orderByAllIdsAndTime;
}
@Override
public PlanNode clone() {
- return new SortNode(id, null, orderingScheme, partial);
+ return new SortNode(id, null, orderingScheme, partial,
orderByAllIdsAndTime);
}
@Override
@@ -60,21 +68,21 @@ public class SortNode extends SingleChildProcessNode {
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.TABLE_SORT_NODE.serialize(byteBuffer);
orderingScheme.serialize(byteBuffer);
- ReadWriteIOUtils.write(partial, byteBuffer);
+ ReadWriteIOUtils.write(orderByAllIdsAndTime, byteBuffer);
}
@Override
protected void serializeAttributes(DataOutputStream stream) throws
IOException {
PlanNodeType.TABLE_SORT_NODE.serialize(stream);
orderingScheme.serialize(stream);
- ReadWriteIOUtils.write(partial, stream);
+ ReadWriteIOUtils.write(orderByAllIdsAndTime, stream);
}
public static SortNode deserialize(ByteBuffer byteBuffer) {
OrderingScheme orderingScheme = OrderingScheme.deserialize(byteBuffer);
boolean partial = ReadWriteIOUtils.readBool(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new SortNode(planNodeId, null, orderingScheme, partial);
+ return new SortNode(planNodeId, null, orderingScheme, partial, false);
}
@Override
@@ -84,7 +92,8 @@ public class SortNode extends SingleChildProcessNode {
@Override
public PlanNode replaceChildren(List<PlanNode> newChildren) {
- return new SortNode(id, Iterables.getOnlyElement(newChildren),
orderingScheme, partial);
+ return new SortNode(
+ id, Iterables.getOnlyElement(newChildren), orderingScheme, partial,
orderByAllIdsAndTime);
}
public OrderingScheme getOrderingScheme() {
@@ -95,6 +104,14 @@ public class SortNode extends SingleChildProcessNode {
return this.partial;
}
+ public boolean isOrderByAllIdsAndTime() {
+ return orderByAllIdsAndTime;
+ }
+
+ public void setOrderByAllIdsAndTime(boolean orderByAllIdsAndTime) {
+ this.orderByAllIdsAndTime = orderByAllIdsAndTime;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
index 7ee9dd322fa..8a35a80bf22 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
@@ -43,8 +43,9 @@ public class StreamSortNode extends SortNode {
PlanNode child,
OrderingScheme scheme,
boolean partial,
+ boolean orderByAllIdsAndTime,
int streamCompareKeyEndIndex) {
- super(id, child, scheme, partial);
+ super(id, child, scheme, partial, orderByAllIdsAndTime);
this.streamCompareKeyEndIndex = streamCompareKeyEndIndex;
}
@@ -59,6 +60,7 @@ public class StreamSortNode extends SortNode {
Iterables.getOnlyElement(newChildren),
orderingScheme,
partial,
+ orderByAllIdsAndTime,
streamCompareKeyEndIndex);
}
@@ -69,7 +71,8 @@ public class StreamSortNode extends SortNode {
@Override
public PlanNode clone() {
- return new StreamSortNode(id, null, orderingScheme, partial,
streamCompareKeyEndIndex);
+ return new StreamSortNode(
+ id, null, orderingScheme, partial, orderByAllIdsAndTime,
streamCompareKeyEndIndex);
}
@Override
@@ -93,7 +96,8 @@ public class StreamSortNode extends SortNode {
boolean partial = ReadWriteIOUtils.readBool(byteBuffer);
int streamCompareKeyEndIndex = ReadWriteIOUtils.read(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new StreamSortNode(planNodeId, null, orderingScheme, partial,
streamCompareKeyEndIndex);
+ return new StreamSortNode(
+ planNodeId, null, orderingScheme, partial, false,
streamCompareKeyEndIndex);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
index 98f21b2b9a8..770ab775c05 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
@@ -118,7 +118,10 @@ public class TableScanNode extends SourceNode {
Map<Symbol, Integer> idAndAttributeIndexMap,
Ordering scanOrder,
Expression timePredicate,
- Expression pushDownPredicate) {
+ Expression pushDownPredicate,
+ long pushDownLimit,
+ long pushDownOffset,
+ boolean pushLimitToEachDevice) {
super(id);
this.qualifiedObjectName = qualifiedObjectName;
this.outputSymbols = outputSymbols;
@@ -128,6 +131,9 @@ public class TableScanNode extends SourceNode {
this.scanOrder = scanOrder;
this.timePredicate = timePredicate;
this.pushDownPredicate = pushDownPredicate;
+ this.pushDownLimit = pushDownLimit;
+ this.pushDownOffset = pushDownOffset;
+ this.pushLimitToEachDevice = pushLimitToEachDevice;
}
@Override
@@ -154,7 +160,10 @@ public class TableScanNode extends SourceNode {
idAndAttributeIndexMap,
scanOrder,
timePredicate,
- pushDownPredicate);
+ pushDownPredicate,
+ pushDownLimit,
+ pushDownOffset,
+ pushLimitToEachDevice);
}
@Override
@@ -214,6 +223,10 @@ public class TableScanNode extends SourceNode {
} else {
ReadWriteIOUtils.write(false, byteBuffer);
}
+
+ ReadWriteIOUtils.write(pushDownLimit, byteBuffer);
+ ReadWriteIOUtils.write(pushDownOffset, byteBuffer);
+ ReadWriteIOUtils.write(pushLimitToEachDevice, byteBuffer);
}
@Override
@@ -264,6 +277,10 @@ public class TableScanNode extends SourceNode {
} else {
ReadWriteIOUtils.write(false, stream);
}
+
+ ReadWriteIOUtils.write(pushDownLimit, stream);
+ ReadWriteIOUtils.write(pushDownOffset, stream);
+ ReadWriteIOUtils.write(pushLimitToEachDevice, stream);
}
public static TableScanNode deserialize(ByteBuffer byteBuffer) {
@@ -314,6 +331,10 @@ public class TableScanNode extends SourceNode {
pushDownPredicate = Expression.deserialize(byteBuffer);
}
+ long pushDownLimit = ReadWriteIOUtils.readLong(byteBuffer);
+ long pushDownOffset = ReadWriteIOUtils.readLong(byteBuffer);
+ boolean pushLimitToEachDevice = ReadWriteIOUtils.readBool(byteBuffer);
+
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
return new TableScanNode(
@@ -325,7 +346,10 @@ public class TableScanNode extends SourceNode {
idAndAttributeIndexMap,
scanOrder,
timePredicate,
- pushDownPredicate);
+ pushDownPredicate,
+ pushDownLimit,
+ pushDownOffset,
+ pushLimitToEachDevice);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java
index 221bcc8c892..4625bc2eae8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java
@@ -42,7 +42,7 @@ public class TopKNode extends MultiChildProcessNode {
private final OrderingScheme orderingScheme;
- private final int count;
+ private final long count;
private final List<Symbol> outputSymbols;
@@ -51,7 +51,7 @@ public class TopKNode extends MultiChildProcessNode {
public TopKNode(
PlanNodeId id,
OrderingScheme scheme,
- int count,
+ long count,
List<Symbol> outputSymbols,
boolean childrenDataInOrder) {
super(id);
@@ -65,7 +65,7 @@ public class TopKNode extends MultiChildProcessNode {
PlanNodeId id,
List<PlanNode> children,
OrderingScheme scheme,
- int count,
+ long count,
List<Symbol> outputSymbols,
boolean childrenDataInOrder) {
super(id, children);
@@ -142,7 +142,7 @@ public class TopKNode extends MultiChildProcessNode {
return orderingScheme;
}
- public int getCount() {
+ public long getCount() {
return count;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java
index 72ca95bd0e4..2a66a20b6d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java
@@ -18,6 +18,8 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Iterati
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.RuleStatsRecorder;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.InlineProjections;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitOverProjectWithSort;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitWithSort;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneFilterColumns;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneLimitColumns;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneOffsetColumns;
@@ -25,6 +27,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Pr
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneProjectColumns;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneSortColumns;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneTableScanColumns;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneTopKColumns;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushLimitThroughOffset;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushLimitThroughProject;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveRedundantIdentityProjections;
@@ -53,7 +56,8 @@ public class OptimizeFactory {
new PruneOutputSourceColumns(),
new PruneProjectColumns(),
new PruneSortColumns(),
- new PruneTableScanColumns(plannerContext.getMetadata()));
+ new PruneTableScanColumns(plannerContext.getMetadata()),
+ new PruneTopKColumns());
IterativeOptimizer columnPruningOptimizer =
new IterativeOptimizer(plannerContext, new RuleStatsRecorder(),
columnPruningRules);
@@ -64,10 +68,19 @@ public class OptimizeFactory {
ImmutableSet.of(
new InlineProjections(plannerContext), new
RemoveRedundantIdentityProjections()));
- Set<Rule<?>> limitPushdownRules =
- ImmutableSet.of(new PushLimitThroughOffset(), new
PushLimitThroughProject());
IterativeOptimizer limitPushdownOptimizer =
- new IterativeOptimizer(plannerContext, new RuleStatsRecorder(),
limitPushdownRules);
+ new IterativeOptimizer(
+ plannerContext,
+ new RuleStatsRecorder(),
+ ImmutableSet.of(new PushLimitThroughOffset(), new
PushLimitThroughProject()));
+
+ PlanOptimizer pushLimitOffsetIntoTableScanOptimizer = new
PushLimitOffsetIntoTableScan();
+
+ IterativeOptimizer topKOptimizer =
+ new IterativeOptimizer(
+ plannerContext,
+ new RuleStatsRecorder(),
+ ImmutableSet.of(new MergeLimitWithSort(), new
MergeLimitOverProjectWithSort()));
this.planOptimizers =
ImmutableList.of(
@@ -79,7 +92,9 @@ public class OptimizeFactory {
columnPruningOptimizer,
inlineProjectionsOptimizer,
limitPushdownOptimizer,
- transformSortToStreamSortOptimizer);
+ pushLimitOffsetIntoTableScanOptimizer,
+ transformSortToStreamSortOptimizer,
+ topKOptimizer);
}
public List<PlanOptimizer> getPlanOptimizers() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
index a11683dde28..ed5faef0f35 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
@@ -43,7 +44,7 @@ import static
org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
/**
* <b>Optimization phase:</b> Distributed plan planning.
*
- * <p>The LIMIT OFFSET condition can be pushed down to the SeriesScanNode,
when the following
+ * <p>The LIMIT OFFSET condition can be pushed down to the TableScanNode, when
the following
* conditions are met:
* <li>Time series query (not aggregation query).
* <li>The query expressions are all scalar expression.
@@ -87,9 +88,10 @@ public class PushLimitOffsetIntoTableScan implements
PlanOptimizer {
@Override
public PlanNode visitOffset(OffsetNode node, Context context) {
context.setOffset(node.getCount());
- if (context.getLimit() > 0) {
- context.setLimit(context.getLimit() + context.getOffset());
- }
+ // already use rule {@link PushLimitThroughOffset}
+ // if (context.getLimit() > 0) {
+ // context.setLimit(context.getLimit() + context.getOffset());
+ // }
node.setChild(node.getChild().accept(this, context));
return node;
}
@@ -163,6 +165,12 @@ public class PushLimitOffsetIntoTableScan implements
PlanOptimizer {
return node;
}
+ @Override
+ public PlanNode visitTopK(TopKNode node, Context context) {
+ throw new IllegalStateException(
+ "TopKNode must be appeared after PushLimitOffsetIntoTableScan");
+ }
+
@Override
public PlanNode visitStreamSort(StreamSortNode node, Context context) {
return visitSort(node, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
index 9ea378df087..b9c7fe2d81a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
@@ -91,11 +91,20 @@ public class TransformSortToStreamSort implements
PlanOptimizer {
}
if (streamSortIndex > 0) {
+ boolean orderByAllIdsAndTime = true;
+ for (Map.Entry<Symbol, ColumnSchema> entry :
tableColumnSchema.entrySet()) {
+ if (entry.getValue().getColumnCategory() == TsTableColumnCategory.ID
+ && !orderingScheme.getOrderings().containsKey(entry.getKey())) {
+ orderByAllIdsAndTime = false;
+ }
+ }
+
return new StreamSortNode(
queryContext.getQueryId().genPlanNodeId(),
child,
node.getOrderingScheme(),
node.isPartial(),
+ orderByAllIdsAndTime,
streamSortIndex);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
index c612a697540..b636257ce0c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
@@ -27,10 +27,12 @@ import
org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import org.apache.iotdb.db.queryengine.plan.relational.planner.LogicalPlanner;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributionPlanner;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
-import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
import org.junit.Test;
@@ -65,7 +67,6 @@ public class LimitOffsetPushDownTest {
TableScanNode tableScanNode;
// without sort operation, limit can be pushed into TableScan,
pushLimitToEachDevice==false
- // Output - Project - Limit - Offset - Collect - TableScan
@Test
public void noOrderByTest() {
sql = "SELECT time, tag3, cast(s2 AS double) FROM table1 where s1>1 offset
5 limit 10";
@@ -75,8 +76,11 @@ public class LimitOffsetPushDownTest {
new LogicalPlanner(context, metadata, sessionInfo,
WarningCollector.NOOP)
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
+ // LogicalPlan: `Output - Project - Offset - Limit - TableScan`
+ assertTrue(getChildrenNode(rootNode, 3) instanceof LimitNode);
assertTrue(getChildrenNode(rootNode, 4) instanceof TableScanNode);
+ // DistributePlan: `Output - Project - Offset - Limit - Collect -
TableScan`
distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
@@ -102,7 +106,6 @@ public class LimitOffsetPushDownTest {
}
// order by all tags, limit can be pushed into TableScan,
pushLimitToEachDevice==false
- // Output - Limit - Offset - Project - MergeSort - Project - TableScan
@Test
public void orderByAllTagsTest() {
sql =
@@ -113,8 +116,10 @@ public class LimitOffsetPushDownTest {
new LogicalPlanner(context, metadata, sessionInfo,
WarningCollector.NOOP)
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
+ // LogicalPlan: `Output - Offset - Limit - Project - StreamSort - Project
- TableScan`
assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
+ // DistributePlan: `Output - Offset - Limit - Project - MergeSort -
Project - TableScan`
distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
@@ -140,7 +145,6 @@ public class LimitOffsetPushDownTest {
}
// order by some tags, limit can be pushed into TableScan,
pushLimitToEachDevice==true
- // Output - Limit - Offset - Project - MergeSort - Sort - Project - TableScan
@Test
public void orderBySomeTagsTest() {
sql =
@@ -151,8 +155,12 @@ public class LimitOffsetPushDownTest {
new LogicalPlanner(context, metadata, sessionInfo,
WarningCollector.NOOP)
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
+ // LogicalPlan: `Output - Offset - Limit - Project - StreamSort - Project
- TableScan`
+ assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode);
assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
+ // DistributePlan: `Output - Offset - Limit - Project - MergeSort -
StreamSort - Project -
+ // TableScan`
distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
@@ -160,7 +168,7 @@ public class LimitOffsetPushDownTest {
(MergeSortNode)
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(),
5);
assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
- assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+ assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode);
assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
tableScanNode = (TableScanNode)
getChildrenNode(mergeSortNode.getChildren().get(1), 2);
assertEquals(4, tableScanNode.getDeviceEntries().size());
@@ -178,7 +186,6 @@ public class LimitOffsetPushDownTest {
}
// order by time, limit can be pushed into TableScan,
pushLimitToEachDevice==true
- // Output - Limit - Offset - Project - MergeSort - Sort - Project - TableScan
@Test
public void orderByTimeTest() {
sql =
@@ -188,19 +195,22 @@ public class LimitOffsetPushDownTest {
logicalQueryPlan =
new LogicalPlanner(context, metadata, sessionInfo,
WarningCollector.NOOP)
.plan(actualAnalysis);
+ // LogicalPlan: `Output - Offset - Project - TopK - Project - TableScan`
rootNode = logicalQueryPlan.getRootNode();
- assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
+ assertTrue(getChildrenNode(rootNode, 3) instanceof TopKNode);
+ assertTrue(getChildrenNode(rootNode, 5) instanceof TableScanNode);
+ // DistributePlan-1 `Identity - Output - Offset - Project - TopK -
{Exchange + TopK + Exchange}
+ // DistributePlan-2 `Identity - TopK - Project - TableScan`
distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
- MergeSortNode mergeSortNode =
- (MergeSortNode)
-
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(),
5);
- assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
- assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
- assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
- tableScanNode = (TableScanNode)
getChildrenNode(mergeSortNode.getChildren().get(1), 2);
+ TopKNode topKNode =
+ (TopKNode)
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(),
4);
+ assertTrue(topKNode.getChildren().get(0) instanceof ExchangeNode);
+ assertTrue(topKNode.getChildren().get(1) instanceof TopKNode);
+ assertTrue(topKNode.getChildren().get(2) instanceof ExchangeNode);
+ tableScanNode = (TableScanNode)
getChildrenNode(topKNode.getChildren().get(1), 2);
assertEquals(4, tableScanNode.getDeviceEntries().size());
assertEquals(DESC, tableScanNode.getScanOrder());
assertTrue(tableScanNode.getPushDownLimit() == 15 &&
tableScanNode.getPushDownOffset() == 0);
@@ -216,7 +226,6 @@ public class LimitOffsetPushDownTest {
}
// order by others, limit can not be pushed into TableScan
- // Output - Limit - Offset - Project - MergeSort - Sort - Project - TableScan
@Test
public void orderByOthersTest() {
sql =
@@ -226,19 +235,22 @@ public class LimitOffsetPushDownTest {
logicalQueryPlan =
new LogicalPlanner(context, metadata, sessionInfo,
WarningCollector.NOOP)
.plan(actualAnalysis);
+ // LogicalPlan: `Output - Offset - Project - TopK - Project - TableScan`
rootNode = logicalQueryPlan.getRootNode();
- assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
+ assertTrue(getChildrenNode(rootNode, 3) instanceof TopKNode);
+ assertTrue(getChildrenNode(rootNode, 5) instanceof TableScanNode);
+ // DistributePlan-1 `Identity - Output - Offset - Project - TopK -
{Exchange + TopK + Exchange}
+ // DistributePlan-2 `Identity - TopK - Project - TableScan`
distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
- MergeSortNode mergeSortNode =
- (MergeSortNode)
-
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(),
5);
- assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
- assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
- assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
- tableScanNode = (TableScanNode)
getChildrenNode(mergeSortNode.getChildren().get(1), 2);
+ TopKNode topKNode =
+ (TopKNode)
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(),
4);
+ assertTrue(topKNode.getChildren().get(0) instanceof ExchangeNode);
+ assertTrue(topKNode.getChildren().get(1) instanceof TopKNode);
+ assertTrue(topKNode.getChildren().get(2) instanceof ExchangeNode);
+ tableScanNode = (TableScanNode)
getChildrenNode(topKNode.getChildren().get(1), 2);
assertEquals(4, tableScanNode.getDeviceEntries().size());
assertEquals(ASC, tableScanNode.getScanOrder());
assertTrue(tableScanNode.getPushDownLimit() == 0 &&
tableScanNode.getPushDownOffset() == 0);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
index 2242ebb74ca..d66b2c4899e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
@@ -42,6 +42,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
import org.junit.Test;
@@ -82,34 +83,6 @@ public class SortTest {
DistributedQueryPlan distributedQueryPlan;
TableScanNode tableScanNode;
- /*
- * order by time, others, some_ids
- *
- * IdentitySinkNode-33
- * └──OutputNode-8
- * └──LimitNode-7
- * └──OffsetNode-6
- * └──ProjectNode
- * └──MergeSortNode-25
- * ├──ExchangeNode-29:
[SourceAddress:192.0.12.1/test_query.2.0/31]
- * ├──SortNode-27
- * │ └──ProjectNode-23
- * │ └──FilterNode-17
- * │ └──TableScanNode-14
- * └──ExchangeNode-30:
[SourceAddress:192.0.10.1/test_query.3.0/32]
- *
- * IdentitySinkNode-31
- * └──SortNode-26
- * └──ProjectNode-22
- * └──FilterNode-16
- * └──TableScanNode-13
- *
- * IdentitySinkNode-31
- * └──SortNode-26
- * └──ProjectNode-19
- * └──FilterNode-16
- * └──TableScanNode-13
- */
@Test
public void timeOthersSomeIDColumnSortTest() {
sql =
@@ -122,32 +95,22 @@ public class SortTest {
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
+ // LogicalPlan: `Output - Offset - Project - TopK - Project - FilterNode -
TableScan`
assertTrue(rootNode instanceof OutputNode);
assertTrue(getChildrenNode(rootNode, 1) instanceof OffsetNode);
- assertTrue(getChildrenNode(rootNode, 2) instanceof LimitNode);
- assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode);
- SortNode sortNode =
- (SortNode)
- rootNode
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0);
- // TODO(beyyes) merge parent and child ProjectNode into one, parent
contains all children
- assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
- assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof
FilterNode);
- tableScanNode =
- (TableScanNode)
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+ assertTrue(getChildrenNode(rootNode, 2) instanceof ProjectNode);
+ assertTrue(getChildrenNode(rootNode, 3) instanceof TopKNode);
+ assertTrue(getChildrenNode(rootNode, 4) instanceof ProjectNode);
+ assertTrue(getChildrenNode(rootNode, 5) instanceof FilterNode);
+ assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(rootNode, 6);
assertEquals("testdb.table1",
tableScanNode.getQualifiedObjectName().toString());
assertEquals(8, tableScanNode.getAssignments().size());
assertEquals(6, tableScanNode.getDeviceEntries().size());
assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
- // Output - Limit - Offset -Project - MergeSort - Sort - Project - Filter
- TableScan
+ // DistributePlan-1 `Identity - Output - Offset - Project - TopK -
{Exchange + TopK + Exchange}
+ // DistributePlan-2 `Identity - TopK - Project - Filter - TableScan`
distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
@@ -158,25 +121,12 @@ public class SortTest {
(OutputNode)
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0);
assertTrue(getChildrenNode(outputNode, 1) instanceof OffsetNode);
- assertTrue(getChildrenNode(outputNode, 2) instanceof LimitNode);
- assertTrue(
-
outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof ProjectNode);
- MergeSortNode mergeSortNode =
- (MergeSortNode)
- outputNode
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0);
- assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
- assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
- assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
- sortNode = (SortNode) mergeSortNode.getChildren().get(1);
+ assertTrue(getChildrenNode(outputNode, 3) instanceof TopKNode);
+ TopKNode topKNode = (TopKNode) getChildrenNode(outputNode, 3);
+ assertTrue(topKNode.getChildren().get(0) instanceof ExchangeNode);
+ assertTrue(topKNode.getChildren().get(1) instanceof TopKNode);
+ assertTrue(topKNode.getChildren().get(2) instanceof ExchangeNode);
+ SortNode sortNode = (SortNode) topKNode.getChildren().get(1);
assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof
FilterNode);
tableScanNode =
@@ -194,7 +144,7 @@ public class SortTest {
assertEquals(DESC, tableScanNode.getScanOrder());
assertTrue(tableScanNode.getPushDownLimit() == 0 &&
tableScanNode.getPushDownOffset() == 0);
- // IdentitySink - Sort - Project - Filter - TableScan
+ // IdentitySink - TopK - Project - Filter - TableScan
assertTrue(
distributedQueryPlan.getFragments().get(1).getPlanNodeTree()
instanceof IdentitySinkNode);
assertTrue(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java
index bcf776321f0..4101d092efa 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java
@@ -81,7 +81,10 @@ public class TestPlanBuilder {
Map<Symbol, Integer> idAndAttributeIndexMap,
Ordering scanOrder,
Expression timePredicate,
- Expression pushDownPredicate) {
+ Expression pushDownPredicate,
+ long pushDownLimit,
+ long pushDownOffset,
+ boolean pushLimitToEachDevice) {
this.root =
new TableScanNode(
new PlanNodeId(id),
@@ -92,7 +95,10 @@ public class TestPlanBuilder {
idAndAttributeIndexMap,
scanOrder,
timePredicate,
- pushDownPredicate);
+ pushDownPredicate,
+ pushDownLimit,
+ pushDownOffset,
+ pushLimitToEachDevice);
return this;
}
}