This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new d3d3b606f9c Add stream sort and sort elimination optimize rule
d3d3b606f9c is described below
commit d3d3b606f9cff833152bf7e22d567d425c4b9f9b
Author: Beyyes <[email protected]>
AuthorDate: Thu Jul 18 13:10:07 2024 +0800
Add stream sort and sort elimination optimize rule
---
.../plan/relational/analyzer/Analysis.java | 15 +-
.../plan/relational/planner/LogicalPlanner.java | 5 +-
.../plan/relational/planner/QueryPlanner.java | 1 +
.../distribute/DistributedPlanGenerator.java | 32 ++
.../distribute/TableDistributionPlanner.java | 4 +-
.../TableModelTypeProviderExtractor.java | 7 +
.../plan/relational/planner/node/FilterNode.java | 2 +-
.../plan/relational/planner/node/LimitNode.java | 2 +-
.../plan/relational/planner/node/OffsetNode.java | 2 +-
.../plan/relational/planner/node/OutputNode.java | 2 +-
.../plan/relational/planner/node/ProjectNode.java | 2 +-
.../plan/relational/planner/node/SortNode.java | 6 +-
.../relational/planner/node/StreamSortNode.java | 17 +
.../planner/optimizations/OptimizeFactory.java | 4 +-
.../PushLimitOffsetIntoTableScan.java | 6 +
.../planner/optimizations/SortElimination.java | 134 ++++++
.../optimizations/TransformSortToStreamSort.java | 123 +++++
.../analyzer/LimitOffsetPushDownTest.java | 11 +-
.../plan/relational/analyzer/SortTest.java | 503 +++++----------------
19 files changed, 483 insertions(+), 395 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index f913ddbf98e..095248a7391 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -169,9 +169,12 @@ public class Analysis implements IAnalysis {
private boolean finishQueryAfterAnalyze;
- // indicate is there a value filter
+ // indicate if value filter exists in query
private boolean hasValueFilter = false;
+ // indicate if sort node exists in query
+ private boolean hasSortNode = false;
+
// if emptyDataSource, there is no need to execute the query in BE
private boolean emptyDataSource = false;
@@ -585,10 +588,18 @@ public class Analysis implements IAnalysis {
return hasValueFilter;
}
- public void setHasValueFilter(boolean hasValueFilter) {
+ public void setValueFilter(boolean hasValueFilter) {
this.hasValueFilter = hasValueFilter;
}
+ public boolean hasSortNode() {
+ return hasSortNode;
+ }
+
+ public void setSortNode(boolean hasSortNode) {
+ this.hasSortNode = hasSortNode;
+ }
+
public boolean isEmptyDataSource() {
return emptyDataSource;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
index 7d778d2ebf2..c88dc681a13 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
@@ -75,8 +75,9 @@ public class LogicalPlanner {
this.metadata = metadata;
this.sessionInfo = requireNonNull(sessionInfo, "session is null");
this.warningCollector = requireNonNull(warningCollector, "warningCollector
is null");
- PlannerContext plannerContext = new PlannerContext(metadata, new
InternalTypeManager());
- this.planOptimizers = new
OptimizeFactory(plannerContext).getPlanOptimizers();
+ this.planOptimizers =
+ new OptimizeFactory(new PlannerContext(metadata, new
InternalTypeManager()))
+ .getPlanOptimizers();
}
public LogicalQueryPlan plan(Analysis analysis) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
index ae7f47f3058..a9c094d7d9a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
@@ -152,6 +152,7 @@ public class QueryPlanner {
outputs.stream().map(builder::translate).forEach(newFields::add);
builder =
builder.withScope(analysis.getScope(node.getOrderBy().orElse(null)), newFields);
+ analysis.setSortNode(true);
}
List<Expression> orderBy = analysis.getOrderByExpressions(node);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
index 61543eda7d1..f37c384e4d4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
@@ -218,6 +219,37 @@ public class DistributedPlanGenerator
return Collections.singletonList(mergeSortNode);
}
+ @Override
+ public List<PlanNode> visitStreamSort(StreamSortNode node, PlanContext
context) {
+ context.expectedOrderingScheme = node.getOrderingScheme();
+ context.hasSortNode = true;
+ nodeOrderingMap.put(node.getPlanNodeId(), node.getOrderingScheme());
+
+ List<PlanNode> childrenNodes = node.getChild().accept(this, context);
+ if (childrenNodes.size() == 1) {
+ node.setChild(childrenNodes.get(0));
+ return Collections.singletonList(node);
+ }
+
+ // may have ProjectNode above SortNode later, so use MergeSortNode but not
return SortNode list
+ MergeSortNode mergeSortNode =
+ new MergeSortNode(
+ queryId.genPlanNodeId(), node.getOrderingScheme(),
node.getOutputSymbols());
+ for (PlanNode child : childrenNodes) {
+ StreamSortNode subSortNode =
+ new StreamSortNode(
+ queryId.genPlanNodeId(),
+ child,
+ node.getOrderingScheme(),
+ false,
+ node.getStreamCompareKeyEndIndex());
+ mergeSortNode.addChild(subSortNode);
+ }
+ nodeOrderingMap.put(mergeSortNode.getPlanNodeId(),
mergeSortNode.getOrderingScheme());
+
+ return Collections.singletonList(mergeSortNode);
+ }
+
@Override
public List<PlanNode> visitFilter(FilterNode node, PlanContext context) {
List<PlanNode> childrenNodes = node.getChild().accept(this, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
index ce2c70b166b..04caab49710 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
@@ -30,8 +30,10 @@ import
org.apache.iotdb.db.queryengine.plan.relational.execution.querystats.Plan
import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushLimitOffsetIntoTableScan;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.SortElimination;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -51,7 +53,7 @@ public class TableDistributionPlanner {
this.analysis = analysis;
this.logicalQueryPlan = logicalQueryPlan;
this.mppQueryContext = mppQueryContext;
- this.optimizers = Collections.singletonList(new
PushLimitOffsetIntoTableScan());
+ this.optimizers = Arrays.asList(new PushLimitOffsetIntoTableScan(), new
SortElimination());
}
public DistributedQueryPlan plan() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
index 9065ddaa4dc..08369519559 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
@@ -119,6 +120,12 @@ public class TableModelTypeProviderExtractor {
return null;
}
+ @Override
+ public Void visitStreamSort(StreamSortNode node, Void context) {
+ node.getChild().accept(this, context);
+ return null;
+ }
+
@Override
public Void visitMergeSort(MergeSortNode node, Void context) {
node.getChildren().forEach(c -> c.accept(this, context));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java
index 52485ca9789..812979ed239 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java
@@ -50,7 +50,7 @@ public class FilterNode extends SingleChildProcessNode {
@Override
public PlanNode clone() {
- return new FilterNode(id, child, predicate);
+ return new FilterNode(id, null, predicate);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
index a0862384d87..8e7b0292b0a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
@@ -54,7 +54,7 @@ public class LimitNode extends SingleChildProcessNode {
@Override
public PlanNode clone() {
- return new LimitNode(id, child, count, tiesResolvingScheme);
+ return new LimitNode(id, null, count, tiesResolvingScheme);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
index 334e22420a9..fc363190d2b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
@@ -40,7 +40,7 @@ public class OffsetNode extends SingleChildProcessNode {
@Override
public PlanNode clone() {
- return new OffsetNode(id, child, count);
+ return new OffsetNode(id, null, count);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
index 8061dd043cb..219b68d3e72 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
@@ -62,7 +62,7 @@ public class OutputNode extends SingleChildProcessNode {
@Override
public PlanNode clone() {
- return new OutputNode(id, child, columnNames, outputSymbols);
+ return new OutputNode(id, null, columnNames, outputSymbols);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
index 0653ac161d8..05fb45ecd07 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
@@ -49,7 +49,7 @@ public class ProjectNode extends SingleChildProcessNode {
@Override
public PlanNode clone() {
- return new ProjectNode(id, child, assignments);
+ return new ProjectNode(id, null, assignments);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
index 8ba5839fa00..2e85d983fb7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
@@ -43,7 +43,7 @@ public class SortNode extends SingleChildProcessNode {
@Override
public PlanNode clone() {
- return new SortNode(id, child, orderingScheme, partial);
+ return new SortNode(id, null, orderingScheme, partial);
}
@Override
@@ -91,6 +91,10 @@ public class SortNode extends SingleChildProcessNode {
return orderingScheme;
}
+ public boolean isPartial() {
+ return this.partial;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
index b0d419b0840..7ee9dd322fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
@@ -26,11 +26,13 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
import com.google.common.base.Objects;
+import com.google.common.collect.Iterables;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
public class StreamSortNode extends SortNode {
@@ -50,11 +52,26 @@ public class StreamSortNode extends SortNode {
return streamCompareKeyEndIndex;
}
+ @Override
+ public PlanNode replaceChildren(List<PlanNode> newChildren) {
+ return new StreamSortNode(
+ id,
+ Iterables.getOnlyElement(newChildren),
+ orderingScheme,
+ partial,
+ streamCompareKeyEndIndex);
+ }
+
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitStreamSort(this, context);
}
+ @Override
+ public PlanNode clone() {
+ return new StreamSortNode(id, null, orderingScheme, partial,
streamCompareKeyEndIndex);
+ }
+
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.TABLE_STREAM_SORT_NODE.serialize(byteBuffer);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java
index 486721b46e9..08de786d49d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java
@@ -41,6 +41,7 @@ public class OptimizeFactory {
PlanOptimizer simplifyExpressionOptimizer = new SimplifyExpressions();
PlanOptimizer pushPredicateIntoTableScanOptimizer = new
PushPredicateIntoTableScan();
+ PlanOptimizer transformSortToStreamSortOptimizer = new
TransformSortToStreamSort();
Set<Rule<?>> columnPruningRules =
ImmutableSet.of(
@@ -69,7 +70,8 @@ public class OptimizeFactory {
pushPredicateIntoTableScanOptimizer,
// redo columnPrune and inlineProjections after
pushPredicateIntoTableScan
columnPruningOptimizer,
- inlineProjectionsOptimizer);
+ inlineProjectionsOptimizer,
+ transformSortToStreamSortOptimizer);
}
public List<PlanOptimizer> getPlanOptimizers() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
index 081ea33983e..a11683dde28 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
@@ -162,6 +163,11 @@ public class PushLimitOffsetIntoTableScan implements
PlanOptimizer {
return node;
}
+ @Override
+ public PlanNode visitStreamSort(StreamSortNode node, Context context) {
+ return visitSort(node, context);
+ }
+
@Override
public PlanNode visitFilter(FilterNode node, Context context) {
// If there is still a FilterNode here, it means that there are read
filter conditions that
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
new file mode 100644
index 00000000000..23bb78c8da4
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
+
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
+
+/**
+ * <b>Optimization phase:</b> Distributed plan planning.
+ *
+ * <p>This optimize rule implement the rules below.
+ * <li>When order by time and there is only one device entry in TableScanNode
below, the SortNode
+ * can be eliminated.
+ * <li>When order by all IDColumns and time, the SortNode can be eliminated.
+ */
+public class SortElimination implements PlanOptimizer {
+
+ @Override
+ public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) {
+ if (!context.getAnalysis().hasSortNode()) {
+ return plan;
+ }
+
+ return plan.accept(new Rewriter(context.getAnalysis()), new Context());
+ }
+
+ private static class Rewriter extends PlanVisitor<PlanNode, Context> {
+ private final Analysis analysis;
+
+ public Rewriter(Analysis analysis) {
+ this.analysis = analysis;
+ }
+
+ @Override
+ public PlanNode visitPlan(PlanNode node, Context context) {
+ PlanNode newNode = node.clone();
+ for (PlanNode child : node.getChildren()) {
+ newNode.addChild(child.accept(this, context));
+ }
+ return newNode;
+ }
+
+ @Override
+ public PlanNode visitSort(SortNode node, Context context) {
+ PlanNode child = node.getChild().accept(this, context);
+ TableScanNode tableScanNode = context.getTableScanNode();
+ OrderingScheme orderingScheme = node.getOrderingScheme();
+ if (tableScanNode.getDeviceEntries().size() == 1
+ &&
TIMESTAMP_STR.equalsIgnoreCase(orderingScheme.getOrderBy().get(0).getName())) {
+ return child;
+ }
+ return tryRemoveSortWhenOrderByAllIDsAndTime(node, child, tableScanNode);
+ }
+
+ @Override
+ public PlanNode visitStreamSort(StreamSortNode node, Context context) {
+ PlanNode child = node.getChild().accept(this, context);
+ TableScanNode tableScanNode = context.getTableScanNode();
+ return tryRemoveSortWhenOrderByAllIDsAndTime(node, child, tableScanNode);
+ }
+
+ @Override
+ public PlanNode visitTableScan(TableScanNode node, Context context) {
+ context.setTableScanNode(node);
+ return node;
+ }
+
+ private PlanNode tryRemoveSortWhenOrderByAllIDsAndTime(
+ SortNode sortNode, PlanNode child, TableScanNode tableScanNode) {
+ Set<Symbol> sortSymbolsBeforeTime = new HashSet<>();
+ Map<Symbol, ColumnSchema> tableColumnSchema =
+
analysis.getTableColumnSchema(tableScanNode.getQualifiedObjectName());
+ for (Symbol orderBy : sortNode.getOrderingScheme().getOrderBy()) {
+ if (!tableColumnSchema.containsKey(orderBy)
+ || tableColumnSchema.get(orderBy).getColumnCategory()
+ == TsTableColumnCategory.MEASUREMENT) {
+ return sortNode;
+ } else if (tableColumnSchema.get(orderBy).getColumnCategory()
+ == TsTableColumnCategory.TIME) {
+ break;
+ } else {
+ sortSymbolsBeforeTime.add(orderBy);
+ }
+ }
+
+ for (Map.Entry<Symbol, ColumnSchema> entry :
tableColumnSchema.entrySet()) {
+ if (entry.getValue().getColumnCategory() == TsTableColumnCategory.ID
+ && !sortSymbolsBeforeTime.contains(entry.getKey())) {
+ return sortNode;
+ }
+ }
+
+ return child;
+ }
+ }
+
+ private static class Context {
+ private TableScanNode tableScanNode;
+
+ public TableScanNode getTableScanNode() {
+ return tableScanNode;
+ }
+
+ public void setTableScanNode(TableScanNode tableScanNode) {
+ this.tableScanNode = tableScanNode;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
new file mode 100644
index 00000000000..9ea378df087
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
+
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+
+import java.util.Map;
+
+/**
+ * <b>Optimization phase:</b> Logical plan planning.
+ *
+ * <p>This optimize rule implement the rules below.
+ * <li>When the sort order is `IDColumns,Time` or `IDColumns,Others` in
SortNode, SortNode can be
+ * transformed to StreamSortNode.
+ */
+public class TransformSortToStreamSort implements PlanOptimizer {
+
+ @Override
+ public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) {
+ if (!context.getAnalysis().hasSortNode()) {
+ return plan;
+ }
+
+ return plan.accept(
+ new Rewriter(context.getAnalysis(), context.getQueryContext()), new
Context());
+ }
+
+ private static class Rewriter extends PlanVisitor<PlanNode, Context> {
+ private final Analysis analysis;
+ private final MPPQueryContext queryContext;
+
+ public Rewriter(Analysis analysis, MPPQueryContext queryContext) {
+ this.analysis = analysis;
+ this.queryContext = queryContext;
+ }
+
+ @Override
+ public PlanNode visitPlan(PlanNode node, Context context) {
+ PlanNode newNode = node.clone();
+ for (PlanNode child : node.getChildren()) {
+ newNode.addChild(child.accept(this, context));
+ }
+ return newNode;
+ }
+
+ @Override
+ public PlanNode visitSort(SortNode node, Context context) {
+ PlanNode child = node.getChild().accept(this, context);
+ TableScanNode tableScanNode = context.getTableScanNode();
+ Map<Symbol, ColumnSchema> tableColumnSchema =
+
analysis.getTableColumnSchema(tableScanNode.getQualifiedObjectName());
+
+ OrderingScheme orderingScheme = node.getOrderingScheme();
+ int streamSortIndex = 0;
+ for (Symbol orderBy : orderingScheme.getOrderBy()) {
+ if (!tableColumnSchema.containsKey(orderBy)
+ || tableColumnSchema.get(orderBy).getColumnCategory()
+ == TsTableColumnCategory.MEASUREMENT
+ || tableColumnSchema.get(orderBy).getColumnCategory() ==
TsTableColumnCategory.TIME) {
+ break;
+ } else {
+ streamSortIndex++;
+ }
+ }
+
+ if (streamSortIndex > 0) {
+ return new StreamSortNode(
+ queryContext.getQueryId().genPlanNodeId(),
+ child,
+ node.getOrderingScheme(),
+ node.isPartial(),
+ streamSortIndex);
+ }
+
+ return node;
+ }
+
+ @Override
+ public PlanNode visitTableScan(TableScanNode node, Context context) {
+ context.setTableScanNode(node);
+ return node;
+ }
+ }
+
+ private static class Context {
+ private TableScanNode tableScanNode;
+
+ public TableScanNode getTableScanNode() {
+ return tableScanNode;
+ }
+
+ public void setTableScanNode(TableScanNode tableScanNode) {
+ this.tableScanNode = tableScanNode;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
index ba8b20794a3..c612a697540 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.LogicalPlanner;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributionPlanner;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
@@ -101,7 +102,7 @@ public class LimitOffsetPushDownTest {
}
// order by all tags, limit can be pushed into TableScan,
pushLimitToEachDevice==false
- // Output - Limit - Offset - Project - MergeSort - Sort - Project - TableScan
+ // Output - Limit - Offset - Project - MergeSort - Project - TableScan
@Test
public void orderByAllTagsTest() {
sql =
@@ -121,9 +122,9 @@ public class LimitOffsetPushDownTest {
(MergeSortNode)
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(),
5);
assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
- assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+ assertTrue(mergeSortNode.getChildren().get(1) instanceof ProjectNode);
assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
- tableScanNode = (TableScanNode)
getChildrenNode(mergeSortNode.getChildren().get(1), 2);
+ tableScanNode = (TableScanNode)
getChildrenNode(mergeSortNode.getChildren().get(1), 1);
assertEquals(4, tableScanNode.getDeviceEntries().size());
assertEquals(DESC, tableScanNode.getScanOrder());
assertTrue(tableScanNode.getPushDownLimit() == 15 &&
tableScanNode.getPushDownOffset() == 0);
@@ -131,7 +132,7 @@ public class LimitOffsetPushDownTest {
tableScanNode =
(TableScanNode)
-
getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(),
3);
+
getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(),
2);
assertEquals(2, tableScanNode.getDeviceEntries().size());
assertEquals(DESC, tableScanNode.getScanOrder());
assertTrue(tableScanNode.getPushDownLimit() == 15 &&
tableScanNode.getPushDownOffset() == 0);
@@ -250,7 +251,7 @@ public class LimitOffsetPushDownTest {
assertTrue(tableScanNode.getPushDownLimit() == 0 &&
tableScanNode.getPushDownOffset() == 0);
}
- private PlanNode getChildrenNode(PlanNode root, int idx) {
+ static PlanNode getChildrenNode(PlanNode root, int idx) {
PlanNode result = root;
for (int i = 1; i <= idx; i++) {
result = result.getChildren().get(0);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
index 1fcbc31ed98..3ca16e80040 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
import org.junit.Test;
@@ -49,6 +50,7 @@ import java.util.Arrays;
import java.util.stream.Collectors;
import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.AnalyzerTest.analyzeSQL;
+import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.LimitOffsetPushDownTest.getChildrenNode;
import static
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering.ASC;
import static
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering.DESC;
import static org.junit.Assert.assertEquals;
@@ -72,6 +74,10 @@ public class SortTest {
LogicalPlanner logicalPlanner;
LogicalQueryPlan logicalQueryPlan;
PlanNode rootNode;
+ OutputNode outputNode;
+ PlanNode mergeSortNode;
+ ProjectNode projectNode;
+ StreamSortNode streamSortNode;
TableDistributionPlanner distributionPlanner;
DistributedQueryPlan distributedQueryPlan;
TableScanNode tableScanNode;
@@ -694,73 +700,40 @@ public class SortTest {
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
- // OutputNode - LimitNode - OffsetNode - ProjectNode - SortNode -
ProjectNode - FilterNode -
- // TableScanNode
+ // LogicalPlan:
`Output-Limit-Offset-Project-StreamSort-Project-Filter-TableScan`
assertTrue(rootNode instanceof OutputNode);
- assertTrue(rootNode.getChildren().get(0) instanceof LimitNode);
- assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof
OffsetNode);
- assertTrue(
- rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof ProjectNode);
- SortNode sortNode =
- (SortNode)
- rootNode
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0);
- assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
- assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof
FilterNode);
- assertTrue(
- sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof TableScanNode);
- tableScanNode =
- (TableScanNode)
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+ assertTrue(getChildrenNode(rootNode, 1) instanceof LimitNode);
+ assertTrue(getChildrenNode(rootNode, 2) instanceof OffsetNode);
+ assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode);
+ assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode);
+ streamSortNode = (StreamSortNode) getChildrenNode(rootNode, 4);
+ assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+ assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+ assertTrue(getChildrenNode(streamSortNode, 3) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
assertEquals("testdb.table1",
tableScanNode.getQualifiedObjectName().toString());
assertEquals(8, tableScanNode.getAssignments().size());
assertEquals(6, tableScanNode.getDeviceEntries().size());
assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
- // OutputNode - LimitNode - OffsetNode - ProjectNode - MergeSortNode -
SortNode - ProjectNode -
- // FilterNode -
- // TableScanNode
+ // DistributePlan:
`Output-Limit-Offset-Project-MergeSort-StreamSort-Project-Filter-TableScan`
distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
- assertTrue(
-
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0)
- instanceof OutputNode);
- OutputNode outputNode =
+ outputNode =
(OutputNode)
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0);
- assertTrue(outputNode.getChildren().get(0) instanceof LimitNode);
- assertTrue(outputNode.getChildren().get(0).getChildren().get(0) instanceof
OffsetNode);
- assertTrue(
-
outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof ProjectNode);
- MergeSortNode mergeSortNode =
- (MergeSortNode)
- outputNode
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0);
+ assertTrue(getChildrenNode(outputNode, 1) instanceof LimitNode);
+ assertTrue(getChildrenNode(outputNode, 2) instanceof OffsetNode);
+ assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode);
+ MergeSortNode mergeSortNode = (MergeSortNode) getChildrenNode(outputNode,
4);
assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
- assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+ assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode);
assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
- sortNode = (SortNode) mergeSortNode.getChildren().get(1);
- assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
- assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof
FilterNode);
- tableScanNode =
- (TableScanNode)
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+ streamSortNode = (StreamSortNode) mergeSortNode.getChildren().get(1);
+ assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+ assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+ tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
assertEquals(4, tableScanNode.getDeviceEntries().size());
assertEquals(
Arrays.asList(
@@ -773,48 +746,13 @@ public class SortTest {
.collect(Collectors.toList()));
assertEquals(DESC, tableScanNode.getScanOrder());
- // IdentitySinkNode - SortNode - ProjectNode - FilterNode - TableScanNode
- assertTrue(
- distributedQueryPlan.getFragments().get(1).getPlanNodeTree()
instanceof IdentitySinkNode);
- assertTrue(
-
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0)
- instanceof SortNode);
- assertTrue(
- distributedQueryPlan
- .getFragments()
- .get(1)
- .getPlanNodeTree()
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- instanceof ProjectNode);
- assertTrue(
- distributedQueryPlan
- .getFragments()
- .get(1)
- .getPlanNodeTree()
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- instanceof FilterNode);
- tableScanNode =
- (TableScanNode)
- distributedQueryPlan
- .getFragments()
- .get(1)
- .getPlanNodeTree()
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0);
+ // DistributePlan: `IdentitySink - StreamSort - Project - Filter -
TableScan`
+ streamSortNode =
+ (StreamSortNode)
+
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0);
+ assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+ assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+ tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
assertEquals(2, tableScanNode.getDeviceEntries().size());
assertEquals(
Arrays.asList("table1.shenzhen.B2.ZZ", "table1.shenzhen.B1.XX"),
@@ -837,70 +775,39 @@ public class SortTest {
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
- // OutputNode - LimitNode - OffsetNode - ProjectNode - SortNode -
ProjectNode - FilterNode -
- // TableScanNode
+ // LogicalPlan:
`Output-Limit-Offset-Project-StreamSort-Project-Filter-TableScan`
assertTrue(rootNode instanceof OutputNode);
- assertTrue(rootNode.getChildren().get(0) instanceof LimitNode);
- assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof
OffsetNode);
- assertTrue(
- rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof ProjectNode);
- SortNode sortNode =
- (SortNode)
- rootNode
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0);
- assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
- assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof
FilterNode);
- tableScanNode =
- (TableScanNode)
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+ assertTrue(getChildrenNode(rootNode, 1) instanceof LimitNode);
+ assertTrue(getChildrenNode(rootNode, 2) instanceof OffsetNode);
+ assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode);
+ assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode);
+ StreamSortNode streamSortNode = (StreamSortNode) getChildrenNode(rootNode,
4);
+ assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+ assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+ assertTrue(getChildrenNode(streamSortNode, 3) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
assertEquals("testdb.table1",
tableScanNode.getQualifiedObjectName().toString());
assertEquals(8, tableScanNode.getAssignments().size());
assertEquals(6, tableScanNode.getDeviceEntries().size());
assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
- // OutputNode - LimitNode - OffsetNode - ProjectNode - MergeSortNode -
SortNode - ProjectNode -
- // FilterNode -
- // TableScanNode
+ // DistributePlan:
`Output-Limit-Offset-Project-MergeSort-Project-Filter-TableScan`
distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
- assertTrue(
-
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0)
- instanceof OutputNode);
- OutputNode outputNode =
+ outputNode =
(OutputNode)
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0);
- assertTrue(outputNode.getChildren().get(0) instanceof LimitNode);
- assertTrue(outputNode.getChildren().get(0).getChildren().get(0) instanceof
OffsetNode);
- assertTrue(
-
outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof ProjectNode);
- MergeSortNode mergeSortNode =
- (MergeSortNode)
- outputNode
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0);
+ assertTrue(getChildrenNode(outputNode, 1) instanceof LimitNode);
+ assertTrue(getChildrenNode(outputNode, 2) instanceof OffsetNode);
+ assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode);
+ MergeSortNode mergeSortNode = (MergeSortNode) getChildrenNode(outputNode,
4);
assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
- assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+ assertTrue(mergeSortNode.getChildren().get(1) instanceof ProjectNode);
assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
- sortNode = (SortNode) mergeSortNode.getChildren().get(1);
- assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
- assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof
FilterNode);
- tableScanNode =
- (TableScanNode)
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+ projectNode = (ProjectNode) mergeSortNode.getChildren().get(1);
+ assertTrue(getChildrenNode(projectNode, 1) instanceof FilterNode);
+ tableScanNode = (TableScanNode) getChildrenNode(projectNode, 2);
assertEquals(4, tableScanNode.getDeviceEntries().size());
assertEquals(
Arrays.asList(
@@ -913,48 +820,12 @@ public class SortTest {
.collect(Collectors.toList()));
assertEquals(DESC, tableScanNode.getScanOrder());
- // IdentitySinkNode - SortNode - ProjectNode - FilterNode - TableScanNode
- assertTrue(
- distributedQueryPlan.getFragments().get(1).getPlanNodeTree()
instanceof IdentitySinkNode);
- assertTrue(
-
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0)
- instanceof SortNode);
- assertTrue(
- distributedQueryPlan
- .getFragments()
- .get(1)
- .getPlanNodeTree()
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- instanceof ProjectNode);
- assertTrue(
- distributedQueryPlan
- .getFragments()
- .get(1)
- .getPlanNodeTree()
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- instanceof FilterNode);
- tableScanNode =
- (TableScanNode)
- distributedQueryPlan
- .getFragments()
- .get(1)
- .getPlanNodeTree()
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0);
+ // DistributePlan: `IdentitySink-Project-Filter-TableScan`
+ projectNode =
+ (ProjectNode)
+
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0);
+ assertTrue(getChildrenNode(projectNode, 1) instanceof FilterNode);
+ tableScanNode = (TableScanNode) getChildrenNode(projectNode, 2);
assertEquals(2, tableScanNode.getDeviceEntries().size());
assertEquals(
Arrays.asList("table1.shenzhen.B2.ZZ", "table1.shenzhen.B1.XX"),
@@ -977,73 +848,41 @@ public class SortTest {
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
- // OutputNode - LimitNode - OffsetNode - ProjectNode - SortNode -
ProjectNode - FilterNode -
- // TableScanNode
+ // LogicalPlan:
`Output-Limit-Offset-Project-StreamSort-Project-Filter-TableScan`
assertTrue(rootNode instanceof OutputNode);
- assertTrue(rootNode.getChildren().get(0) instanceof LimitNode);
- assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof
OffsetNode);
- assertTrue(
- rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof ProjectNode);
- SortNode sortNode =
- (SortNode)
- rootNode
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0);
- assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
- assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof
FilterNode);
- tableScanNode =
- (TableScanNode)
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+ assertTrue(getChildrenNode(rootNode, 1) instanceof LimitNode);
+ assertTrue(getChildrenNode(rootNode, 2) instanceof OffsetNode);
+ assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode);
+ assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode);
+ streamSortNode = (StreamSortNode) getChildrenNode(rootNode, 4);
+ assertEquals(2, streamSortNode.getStreamCompareKeyEndIndex());
+ assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+ assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+ assertTrue(getChildrenNode(streamSortNode, 3) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
assertEquals("testdb.table1",
tableScanNode.getQualifiedObjectName().toString());
assertEquals(8, tableScanNode.getAssignments().size());
assertEquals(6, tableScanNode.getDeviceEntries().size());
assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
- // OutputNode - LimitNode - OffsetNode - ProjectNode - MergeSortNode -
SortNode - ProjectNode -
- // FilterNode -
- // TableScanNode
+ // DistributePlan:
`Output-Limit-Offset-Project-MergeSort-StreamSort-Project-Filter-TableScan`
distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
- assertTrue(
-
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0)
- instanceof OutputNode);
- OutputNode outputNode =
+ outputNode =
(OutputNode)
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0);
- assertTrue(outputNode.getChildren().get(0) instanceof LimitNode);
- assertTrue(outputNode.getChildren().get(0).getChildren().get(0) instanceof
OffsetNode);
- assertTrue(
-
outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof ProjectNode);
- MergeSortNode mergeSortNode =
- (MergeSortNode)
- outputNode
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0);
+ assertTrue(getChildrenNode(outputNode, 1) instanceof LimitNode);
+ assertTrue(getChildrenNode(outputNode, 2) instanceof OffsetNode);
+ assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode);
+ MergeSortNode mergeSortNode = (MergeSortNode) getChildrenNode(outputNode,
4);
assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
- assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+ assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode);
assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
- sortNode = (SortNode) mergeSortNode.getChildren().get(1);
- assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
- assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof
FilterNode);
- assertTrue(
- sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof TableScanNode);
- tableScanNode =
- (TableScanNode)
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+ streamSortNode = (StreamSortNode) mergeSortNode.getChildren().get(1);
+ assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+ assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+ tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
assertEquals(4, tableScanNode.getDeviceEntries().size());
assertEquals(
Arrays.asList(
@@ -1056,48 +895,13 @@ public class SortTest {
.collect(Collectors.toList()));
assertEquals(ASC, tableScanNode.getScanOrder());
- // IdentitySinkNode - SortNode - ProjectNode - FilterNode - TableScanNode
- assertTrue(
- distributedQueryPlan.getFragments().get(1).getPlanNodeTree()
instanceof IdentitySinkNode);
- assertTrue(
-
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0)
- instanceof SortNode);
- assertTrue(
- distributedQueryPlan
- .getFragments()
- .get(1)
- .getPlanNodeTree()
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- instanceof ProjectNode);
- assertTrue(
- distributedQueryPlan
- .getFragments()
- .get(1)
- .getPlanNodeTree()
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- instanceof FilterNode);
- tableScanNode =
- (TableScanNode)
- distributedQueryPlan
- .getFragments()
- .get(1)
- .getPlanNodeTree()
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0);
+ // DistributePlan: `IdentitySink - StreamSort - Project - Filter -
TableScan`
+ streamSortNode =
+ (StreamSortNode)
+
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0);
+ assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+ assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+ tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
assertEquals(2, tableScanNode.getDeviceEntries().size());
assertEquals(
Arrays.asList("table1.shenzhen.B2.ZZ", "table1.shenzhen.B1.XX"),
@@ -1120,70 +924,48 @@ public class SortTest {
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
- // OutputNode - LimitNode - OffsetNode - ProjectNode - SortNode -
ProjectNode - FilterNode -
- // TableScanNode
+ context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+ actualAnalysis = analyzeSQL(sql, metadata);
+ logicalQueryPlan =
+ new LogicalPlanner(context, metadata, sessionInfo,
WarningCollector.NOOP)
+ .plan(actualAnalysis);
+ rootNode = logicalQueryPlan.getRootNode();
+
+ // LogicalPlan:
`Output-Limit-Offset-Project-StreamSort-Project-Filter-TableScan`
assertTrue(rootNode instanceof OutputNode);
- assertTrue(rootNode.getChildren().get(0) instanceof LimitNode);
- assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof
OffsetNode);
- assertTrue(
- rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof ProjectNode);
- SortNode sortNode =
- (SortNode)
- rootNode
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0);
- assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
- assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof
FilterNode);
- tableScanNode =
- (TableScanNode)
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+ assertTrue(getChildrenNode(rootNode, 1) instanceof LimitNode);
+ assertTrue(getChildrenNode(rootNode, 2) instanceof OffsetNode);
+ assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode);
+ assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode);
+ streamSortNode = (StreamSortNode) getChildrenNode(rootNode, 4);
+ assertEquals(3, streamSortNode.getStreamCompareKeyEndIndex());
+ assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+ assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+ assertTrue(getChildrenNode(streamSortNode, 3) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
assertEquals("testdb.table1",
tableScanNode.getQualifiedObjectName().toString());
assertEquals(8, tableScanNode.getAssignments().size());
assertEquals(6, tableScanNode.getDeviceEntries().size());
assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
- // OutputNode - LimitNode - OffsetNode - ProjectNode - MergeSortNode -
SortNode - ProjectNode -
- // FilterNode -
- // TableScanNode
+ // DistributePlan:
`Output-Limit-Offset-Project-MergeSort-StreamSort-Project-Filter-TableScan`
distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
- assertTrue(
-
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0)
- instanceof OutputNode);
- OutputNode outputNode =
+ outputNode =
(OutputNode)
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0);
- assertTrue(outputNode.getChildren().get(0) instanceof LimitNode);
- assertTrue(outputNode.getChildren().get(0).getChildren().get(0) instanceof
OffsetNode);
- assertTrue(
-
outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
- instanceof ProjectNode);
- MergeSortNode mergeSortNode =
- (MergeSortNode)
- outputNode
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0);
+ assertTrue(getChildrenNode(outputNode, 1) instanceof LimitNode);
+ assertTrue(getChildrenNode(outputNode, 2) instanceof OffsetNode);
+ assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode);
+ MergeSortNode mergeSortNode = (MergeSortNode) getChildrenNode(outputNode,
4);
assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
- assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+ assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode);
assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
- sortNode = (SortNode) mergeSortNode.getChildren().get(1);
- assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
- assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof
FilterNode);
- tableScanNode =
- (TableScanNode)
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+ streamSortNode = (StreamSortNode) mergeSortNode.getChildren().get(1);
+ assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+ assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+ tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
assertEquals(4, tableScanNode.getDeviceEntries().size());
assertEquals(
Arrays.asList(
@@ -1196,48 +978,13 @@ public class SortTest {
.collect(Collectors.toList()));
assertEquals(ASC, tableScanNode.getScanOrder());
- // IdentitySinkNode - SortNode - ProjectNode - ProjectNode - FilterNode -
TableScanNode
- assertTrue(
- distributedQueryPlan.getFragments().get(1).getPlanNodeTree()
instanceof IdentitySinkNode);
- assertTrue(
-
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0)
- instanceof SortNode);
- assertTrue(
- distributedQueryPlan
- .getFragments()
- .get(1)
- .getPlanNodeTree()
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- instanceof ProjectNode);
- assertTrue(
- distributedQueryPlan
- .getFragments()
- .get(1)
- .getPlanNodeTree()
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- instanceof FilterNode);
- tableScanNode =
- (TableScanNode)
- distributedQueryPlan
- .getFragments()
- .get(1)
- .getPlanNodeTree()
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0)
- .getChildren()
- .get(0);
+ // DistributePlan: `IdentitySink - StreamSort - Project - Filter -
TableScan`
+ streamSortNode =
+ (StreamSortNode)
+
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0);
+ assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+ assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+ tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
assertEquals(2, tableScanNode.getDeviceEntries().size());
assertEquals(
Arrays.asList("table1.shenzhen.B2.ZZ", "table1.shenzhen.B1.XX"),