This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new f09cfe11625 Add LimitOffsetPushDown optimize rule
f09cfe11625 is described below
commit f09cfe1162553c9515586c7b5f1c01720ae9e9db
Author: Beyyes <[email protected]>
AuthorDate: Tue Jul 9 19:48:52 2024 +0800
Add LimitOffsetPushDown optimize rule
---
.../plan/relational/analyzer/Analysis.java | 15 ++
.../plan/relational/planner/QueryPlanner.java | 14 +-
.../plan/relational/planner/RelationPlanner.java | 16 +-
.../plan/relational/planner/TranslationMap.java | 27 +-
.../distribute/TableDistributionPlanner.java | 10 +
.../plan/relational/planner/node/CollectNode.java | 5 +-
.../relational/planner/node/TableScanNode.java | 12 +
.../planner/optimizations/LimitOffsetPushDown.java | 268 ++++++++++++++++++++
.../optimizations/PushPredicateIntoTableScan.java | 30 ++-
.../planner/optimizations/SimplifyExpressions.java | 1 +
.../analyzer/LimitOffsetPushDownTest.java | 273 +++++++++++++++++++++
.../plan/relational/analyzer/SortTest.java | 34 ++-
.../plan/relational/analyzer/TestPlanBuilder.java | 98 ++++++++
13 files changed, 743 insertions(+), 60 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index 25a8e6edc7f..f913ddbf98e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -30,8 +30,10 @@ import
org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySour
import
org.apache.iotdb.db.queryengine.plan.execution.memory.TableModelStatementMemorySourceContext;
import
org.apache.iotdb.db.queryengine.plan.execution.memory.TableModelStatementMemorySourceVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
import org.apache.iotdb.db.queryengine.plan.relational.security.Identity;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AllColumns;
@@ -68,6 +70,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -154,6 +157,9 @@ public class Analysis implements IAnalysis {
private final Set<NodeRef<Relation>> aliasedRelations = new
LinkedHashSet<>();
+ private final Map<QualifiedObjectName, Map<Symbol, ColumnSchema>>
tableColumnSchemas =
+ new HashMap<>();
+
private DataPartition dataPartition;
// only be used in write plan and won't be used in query
@@ -566,6 +572,15 @@ public class Analysis implements IAnalysis {
return aliasedRelations.contains(NodeRef.of(relation));
}
+ public void addTableSchema(
+ QualifiedObjectName qualifiedObjectName, Map<Symbol, ColumnSchema>
tableColumnSchema) {
+ tableColumnSchemas.put(qualifiedObjectName, tableColumnSchema);
+ }
+
+ public Map<Symbol, ColumnSchema> getTableColumnSchema(QualifiedObjectName
qualifiedObjectName) {
+ return tableColumnSchemas.get(qualifiedObjectName);
+ }
+
public boolean hasValueFilter() {
return hasValueFilter;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
index 60e2a46371d..8456c03a52f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
@@ -91,7 +91,7 @@ public class QueryPlanner {
.collect(toImmutableList());
List<Expression> orderBy = analysis.getOrderByExpressions(query);
- if (orderBy.size() > 0) {
+ if (!orderBy.isEmpty()) {
builder =
builder.appendProjections(
Iterables.concat(orderBy, outputs), symbolAllocator,
queryContext);
@@ -134,7 +134,8 @@ public class QueryPlanner {
// Add projections for aggregations required by ORDER BY. After this
step, grouping keys and
// translated
// aggregations are visible.
- List<Expression> orderByAggregates =
analysis.getOrderByAggregates(node.getOrderBy().get());
+ List<Expression> orderByAggregates =
+ analysis.getOrderByAggregates(node.getOrderBy().orElse(null));
builder = builder.appendProjections(orderByAggregates,
symbolAllocator, queryContext);
}
@@ -146,16 +147,15 @@ public class QueryPlanner {
// The new scope is the composite of the fields from the FROM and SELECT
clause (local nested
// scopes). Fields from the bottom of
// the scope stack need to be placed first to match the expected layout
for nested scopes.
- List<Symbol> newFields = new ArrayList<>();
- newFields.addAll(builder.getTranslations().getFieldSymbolsList());
+ List<Symbol> newFields = new
ArrayList<>(builder.getTranslations().getFieldSymbolsList());
outputs.stream().map(builder::translate).forEach(newFields::add);
- builder = builder.withScope(analysis.getScope(node.getOrderBy().get()),
newFields);
+ builder =
builder.withScope(analysis.getScope(node.getOrderBy().orElse(null)), newFields);
}
List<Expression> orderBy = analysis.getOrderByExpressions(node);
- if (orderBy.size() > 0) {
+ if (!orderBy.isEmpty()) {
builder =
builder.appendProjections(
Iterables.concat(orderBy, outputs), symbolAllocator,
queryContext);
@@ -220,7 +220,7 @@ public class QueryPlanner {
if (node.getFrom().isPresent()) {
RelationPlan relationPlan =
new RelationPlanner(analysis, symbolAllocator, queryContext,
session, recursiveSubqueries)
- .process(node.getFrom().get(), null);
+ .process(node.getFrom().orElse(null), null);
return newPlanBuilder(relationPlan, analysis);
} else {
throw new IllegalStateException("From clause must not by empty");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index d85fcc9056c..8b974707298 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -125,16 +125,19 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
throw new IllegalStateException("Table " + table.getName() + " has no
prefix!");
}
+ QualifiedObjectName qualifiedObjectName =
+ new QualifiedObjectName(
+
qualifiedName.getPrefix().map(QualifiedName::toString).orElse(null),
+ qualifiedName.getSuffix());
+ Map<Symbol, ColumnSchema> tableColumnSchema = symbolToColumnSchema.build();
+ analysis.addTableSchema(qualifiedObjectName, tableColumnSchema);
TableScanNode tableScanNode =
new TableScanNode(
idAllocator.genPlanNodeId(),
- new QualifiedObjectName(
-
qualifiedName.getPrefix().map(QualifiedName::toString).orElse(null),
- qualifiedName.getSuffix()),
+ qualifiedObjectName,
outputSymbols,
- symbolToColumnSchema.build(),
+ tableColumnSchema,
idAndAttributeIndexMap);
-
return new RelationPlan(tableScanNode, scope, outputSymbols);
// Collection<Field> fields =
analysis.getMaterializedViewStorageTableFields(node);
@@ -156,7 +159,6 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
throw new IllegalStateException("Unsupported node type: " +
node.getClass().getName());
}
- // ================================ Implemented later
=====================================
@Override
protected RelationPlan visitTableSubquery(TableSubquery node, Void context) {
RelationPlan plan = process(node.getQuery(), context);
@@ -164,6 +166,8 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
return new RelationPlan(plan.getRoot(), analysis.getScope(node),
plan.getFieldMappings());
}
+ // ================================ Implemented later
=====================================
+
@Override
protected RelationPlan visitValues(Values node, Void context) {
throw new IllegalStateException("Values is not supported in current
version.");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TranslationMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TranslationMap.java
index ea1a78a5573..aa29710a20b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TranslationMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TranslationMap.java
@@ -226,18 +226,16 @@ public class TranslationMap {
public Expression rewriteFieldReference(
FieldReference node, Void context, ExpressionTreeRewriter<Void>
treeRewriter) {
Optional<SymbolReference> mapped = tryGetMapping(node);
- if (mapped.isPresent()) {
- return mapped.get();
- }
-
- return getSymbolForColumn(node)
- .map(Symbol::toSymbolReference)
- .orElseThrow(
- () ->
- new IllegalStateException(
- format(
- "No symbol mapping for node '%s' (%s)",
- node, node.getFieldIndex())));
+ return mapped.orElseGet(
+ () ->
+ getSymbolForColumn(node)
+ .map(Symbol::toSymbolReference)
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ format(
+ "No symbol mapping for node '%s' (%s)",
+ node, node.getFieldIndex()))));
}
@Override
@@ -266,7 +264,7 @@ public class TranslationMap {
List<Expression> newArguments =
node.getArguments().stream()
- .map(argument -> rewrite(argument))
+ .map(TranslationMap.this::rewrite)
.collect(Collectors.toList());
return new FunctionCall(node.getName(), node.isDistinct(),
newArguments);
}
@@ -323,8 +321,7 @@ public class TranslationMap {
private static void verifyAstExpression(Expression astExpression) {
verify(
- AstUtil.preOrder(astExpression)
- .noneMatch(expression -> expression instanceof SymbolReference),
+
AstUtil.preOrder(astExpression).noneMatch(SymbolReference.class::isInstance),
"symbol references are not allowed");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
index 70a87f06dec..b47eb51db94 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
@@ -27,6 +27,8 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNo
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.LimitOffsetPushDown;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.TablePlanOptimizer;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
import java.util.Collections;
@@ -42,12 +44,14 @@ public class TableDistributionPlanner {
private final Analysis analysis;
private final LogicalQueryPlan logicalQueryPlan;
private final MPPQueryContext mppQueryContext;
+ private final List<TablePlanOptimizer> optimizers;
public TableDistributionPlanner(
Analysis analysis, LogicalQueryPlan logicalQueryPlan, MPPQueryContext
mppQueryContext) {
this.analysis = analysis;
this.logicalQueryPlan = logicalQueryPlan;
this.mppQueryContext = mppQueryContext;
+ this.optimizers = Collections.singletonList(new LimitOffsetPushDown());
}
public DistributedQueryPlan plan() {
@@ -59,6 +63,12 @@ public class TableDistributionPlanner {
.genResult(logicalQueryPlan.getRootNode(), planContext);
checkArgument(distributedPlanResult.size() == 1, "Root node must return
only one");
+ // distribute plan optimize rule
+ this.optimizers.forEach(
+ optimizer ->
+ optimizer.optimize(
+ distributedPlanResult.get(0), analysis, null, null,
mppQueryContext));
+
// add exchange node for distributed plan
PlanNode outputNodeWithExchange =
new AddExchangeNodes(mppQueryContext)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
index 0dc0a4cb570..7920216428d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
@@ -33,7 +33,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
-/** CollectNode output the content of children. */
+/**
+ * CollectNode output the content of children. Normally it will output the
child one by one, but in
+ * some cases, while some children are blocked, it may output the content of
other children.
+ */
public class CollectNode extends MultiChildProcessNode {
public CollectNode(PlanNodeId id) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
index c4283bb654a..9207bfe5512 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
@@ -377,14 +377,26 @@ public class TableScanNode extends SourceNode {
return this.pushDownLimit;
}
+ public void setPushDownLimit(long pushDownLimit) {
+ this.pushDownLimit = pushDownLimit;
+ }
+
public long getPushDownOffset() {
return this.pushDownOffset;
}
+ public void setPushDownOffset(long pushDownOffset) {
+ this.pushDownOffset = pushDownOffset;
+ }
+
public Expression getPushDownPredicate() {
return this.pushDownPredicate;
}
+ public void setPushLimitToEachDevice(boolean pushLimitToEachDevice) {
+ this.pushLimitToEachDevice = pushLimitToEachDevice;
+ }
+
public void setPushDownPredicate(@Nullable Expression pushDownPredicate) {
this.pushDownPredicate = pushDownPredicate;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LimitOffsetPushDown.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LimitOffsetPushDown.java
new file mode 100644
index 00000000000..4bf01ac1f72
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LimitOffsetPushDown.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
+
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction;
+import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
+
+/**
+ * <b>Optimization phase:</b> Distributed plan planning.
+ *
+ * <p>The LIMIT OFFSET condition can be pushed down to the SeriesScanNode,
when the following
+ * conditions are met:
+ * <li>Time series query (not aggregation query).
+ * <li>The query expressions are all scalar expression.
+ * <li>Order by all IDs, limit can be pushed down, set
pushDownToEachDevice==false
+ * <li>Order by some IDs or order by time, limit can be pushed down, set
pushDownToEachDevice==true
+ */
+public class LimitOffsetPushDown implements TablePlanOptimizer {
+
+ @Override
+ public PlanNode optimize(
+ PlanNode planNode,
+ Analysis analysis,
+ Metadata metadata,
+ SessionInfo sessionInfo,
+ MPPQueryContext context) {
+ if (!(analysis.getStatement() instanceof Query)) {
+ return planNode;
+ }
+
+ return planNode.accept(new Rewriter(), new Context(analysis));
+ }
+
+ private static class Rewriter extends PlanVisitor<PlanNode, Context> {
+
+ @Override
+ public PlanNode visitPlan(PlanNode node, Context context) {
+ PlanNode newNode = node.clone();
+ for (PlanNode child : node.getChildren()) {
+ newNode.addChild(child.accept(this, context));
+ }
+ return newNode;
+ }
+
+ @Override
+ public PlanNode visitOutput(OutputNode node, Context context) {
+ return visitPlan(node, context);
+ }
+
+ @Override
+ public PlanNode visitLimit(LimitNode node, Context context) {
+ context.setLimit(node.getCount());
+ node.setChild(node.getChild().accept(this, context));
+ return node;
+ }
+
+ @Override
+ public PlanNode visitOffset(OffsetNode node, Context context) {
+ context.setOffset(node.getCount());
+ if (context.getLimit() > 0) {
+ context.setLimit(context.getLimit() + context.getOffset());
+ }
+ node.setChild(node.getChild().accept(this, context));
+ return node;
+ }
+
+ @Override
+ public PlanNode visitCollect(CollectNode node, Context context) {
+ PlanNode newNode = node.clone();
+ for (PlanNode child : node.getChildren()) {
+ newNode.addChild(child.accept(this, context));
+ }
+ return newNode;
+ }
+
+ @Override
+ public PlanNode visitProject(ProjectNode node, Context context) {
+ for (Expression expression : node.getAssignments().getMap().values()) {
+ if (containsDiffFunction(expression)) {
+ context.setEnablePushDown(false);
+ return node;
+ }
+ }
+ return visitPlan(node, context);
+ }
+
+ @Override
+ public PlanNode visitSort(SortNode node, Context context) {
+ Context newContext =
+ new Context(
+ context.analysis,
+ context.getLimit(),
+ context.getOffset(),
+ context.isEnablePushDown(),
+ context.canPushLimitToEachDevice());
+ PlanNode child = node.getChild().accept(this, newContext);
+ if (!newContext.isEnablePushDown()) {
+ return node;
+ }
+
+ OrderingScheme orderingScheme = node.getOrderingScheme();
+ TableScanNode tableScanNode = newContext.getTableScanNode();
+ Map<Symbol, ColumnSchema> tableColumnSchema =
+
context.getAnalysis().getTableColumnSchema(tableScanNode.getQualifiedObjectName());
+ Set<Symbol> sortSymbols = new HashSet<>();
+ for (Symbol orderBy : orderingScheme.getOrderBy()) {
+ if (TIMESTAMP_STR.equalsIgnoreCase(orderBy.getName())) {
+ break;
+ }
+
+ // order by measurement or expression, can not push down limit
+ if (!tableColumnSchema.containsKey(orderBy)
+ || tableColumnSchema.get(orderBy).getColumnCategory()
+ == TsTableColumnCategory.MEASUREMENT) {
+ tableScanNode.setPushDownLimit(0);
+ tableScanNode.setPushDownOffset(0);
+ return node;
+ }
+
+ sortSymbols.add(orderBy);
+ }
+
+ boolean pushLimitToEachDevice = false;
+ for (Map.Entry<Symbol, ColumnSchema> entry :
tableColumnSchema.entrySet()) {
+ if (entry.getValue().getColumnCategory() == TsTableColumnCategory.ID
+ && !sortSymbols.contains(entry.getKey())) {
+ pushLimitToEachDevice = true;
+ break;
+ }
+ }
+ tableScanNode.setPushLimitToEachDevice(pushLimitToEachDevice);
+ node.setChild(child);
+ return node;
+ }
+
+ @Override
+ public PlanNode visitFilter(FilterNode node, Context context) {
+ // If there is still a FilterNode here, it means that there are read
filter conditions that
+ // cannot be pushed
+ // down to TableScan.
+ context.setEnablePushDown(false);
+ return node;
+ }
+
+ @Override
+ public PlanNode visitTableScan(TableScanNode node, Context context) {
+ context.setTableScanNode(node);
+ if (context.getLimit() > 0) {
+ node.setPushDownLimit(context.getLimit());
+ }
+ // TODO only one data region, pushDownOffset can be set
+ // if (context.getOffset() > 0) {
+ // node.setPushDownOffset(context.getOffset());
+ // }
+ if (context.canPushLimitToEachDevice()) {
+ node.setPushLimitToEachDevice(true);
+ }
+ return node;
+ }
+ }
+
+ private static class Context {
+ private final Analysis analysis;
+ private long limit;
+ private long offset;
+ private boolean enablePushDown = true;
+ private boolean pushLimitToEachDevice = false;
+ private TableScanNode tableScanNode;
+
+ public Context(Analysis analysis) {
+ this.analysis = analysis;
+ }
+
+ public Context(
+ Analysis analysis,
+ long limit,
+ long offset,
+ boolean enablePushDown,
+ boolean pushLimitToEachDevice) {
+ this.analysis = analysis;
+ this.limit = limit;
+ this.offset = offset;
+ this.enablePushDown = enablePushDown;
+ this.pushLimitToEachDevice = pushLimitToEachDevice;
+ }
+
+ public Analysis getAnalysis() {
+ return analysis;
+ }
+
+ public long getLimit() {
+ return limit;
+ }
+
+ public void setLimit(long limit) {
+ this.limit = limit;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public boolean isEnablePushDown() {
+ return enablePushDown;
+ }
+
+ public void setEnablePushDown(boolean enablePushDown) {
+ this.enablePushDown = enablePushDown;
+ }
+
+ public boolean canPushLimitToEachDevice() {
+ return pushLimitToEachDevice;
+ }
+
+ public void setPushLimitToE5achDevice(boolean pushLimitToEachDevice) {
+ this.pushLimitToEachDevice = pushLimitToEachDevice;
+ }
+
+ public TableScanNode getTableScanNode() {
+ return tableScanNode;
+ }
+
+ public void setTableScanNode(TableScanNode tableScanNode) {
+ this.tableScanNode = tableScanNode;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index 98b10ab23ce..63d55214779 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -55,21 +55,25 @@ import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.getTim
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GlobalTimePredicateExtractVisitor.extractGlobalTimeFilter;
/**
- * After the optimized rule {@link SimplifyExpressions} finished, predicate
expression in FilterNode
- * has been transformed to conjunctive normal forms(CNF).
+ * <b>Optimization phase:</b> Logical plan planning.
*
- * <p>1. In this class, we examine each expression in CNFs, determine how to
use it, in metadata
- * query, or pushed down into ScanOperators, or it can only be used in
FilterNode above with
- * TableScanNode.
- * <li>For metadata query expressions, it will be used in {@code
tableIndexScan} method to generate
- * the deviceEntries and DataPartition used for TableScanNode.
- * <li>For expressions which can be pushed into TableScanNode, we will execute
{@code
- * extractGlobalTimeFilter}, to extract the timePredicate and
pushDownValuePredicate.
- * <li>Expression which can not be pushed down into TableScanNode, will be
used in the FilterNode
- * above of TableScanNode.
+ * <p>After the optimized rule {@link SimplifyExpressions} finished, predicate
expression in
+ * FilterNode has been transformed to conjunctive normal forms(CNF).
*
- * <p>Notice that, when aggregation, multi-table, join are introduced,
this optimization rule
- * need to be adapted.
+ * <p>In this class, we examine each expression in CNFs, determine how to use
it, in metadata query,
+ * or pushed down into ScanOperators, or it can only be used in FilterNode
above with TableScanNode.
+ *
+ * <ul>
+ * <li>For metadata query expressions, it will be used in {@code
tableIndexScan} method to
+ * generate the deviceEntries and DataPartition used for TableScanNode.
+ * <li>For expressions which can be pushed into TableScanNode, we will
execute {@code
+ * extractGlobalTimeFilter}, to extract the timePredicate and
pushDownValuePredicate.
+ * <li>Expression which can not be pushed down into TableScanNode, will be
used in the FilterNode
+ * above of TableScanNode.
+ * </ul>
+ *
+ * <p>Notice that, when aggregation, multi-table, join are introduced, this
optimization rule need
+ * to be adapted.
*/
public class PushPredicateIntoTableScan implements TablePlanOptimizer {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
index e6bf1ee22c2..2b0256ee2d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.ExtractCommonPredicatesExpressionRewriter.extractCommonPredicates;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.NormalizeOrExpressionRewriter.normalizeOrExpression;
+/** <b>Optimization phase:</b> Logical plan planning. */
public class SimplifyExpressions implements TablePlanOptimizer {
@Override
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
new file mode 100644
index 00000000000..3cdaa96bb16
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.LogicalPlanner;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributionPlanner;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PruneUnUsedColumns;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.RemoveRedundantIdentityProjections;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.SimplifyExpressions;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.TablePlanOptimizer;
+
+import org.junit.Test;
+
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.AnalyzerTest.analyzeSQL;
+import static
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering.ASC;
+import static
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering.DESC;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LimitOffsetPushDownTest {
+ QueryId queryId = new QueryId("test_query");
+ SessionInfo sessionInfo =
+ new SessionInfo(
+ 1L,
+ "iotdb-user",
+ ZoneId.systemDefault(),
+ IoTDBConstant.ClientVersion.V_1_0,
+ "db",
+ IClientSession.SqlDialect.TABLE);
+ Metadata metadata = new TestMatadata();
+ String sql;
+ Analysis actualAnalysis;
+ MPPQueryContext context;
+ LogicalPlanner logicalPlanner;
+ LogicalQueryPlan logicalQueryPlan;
+ PlanNode rootNode;
+ TableDistributionPlanner distributionPlanner;
+ DistributedQueryPlan distributedQueryPlan;
+ TableScanNode tableScanNode;
+ List<TablePlanOptimizer> planOptimizerList =
+ Arrays.asList(
+ new SimplifyExpressions(),
+ new PruneUnUsedColumns(),
+ new PushPredicateIntoTableScan(),
+ new RemoveRedundantIdentityProjections());
+
+ // without sort operation, limit can be pushed into TableScan,
pushLimitToEachDevice==false
+ // Output - Project - Limit - Offset - Collect - TableScan
+ @Test
+ public void noOrderByTest() {
+ sql = "SELECT time, tag3, cast(s2 AS double) FROM table1 where s1>1 offset
5 limit 10";
+ context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+ actualAnalysis = analyzeSQL(sql, metadata);
+ logicalQueryPlan =
+ new LogicalPlanner(context, metadata, sessionInfo, planOptimizerList,
WarningCollector.NOOP)
+ .plan(actualAnalysis);
+ rootNode = logicalQueryPlan.getRootNode();
+ assertTrue(getChildrenNode(rootNode, 4) instanceof TableScanNode);
+
+ distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
+ distributedQueryPlan = distributionPlanner.plan();
+ assertEquals(3, distributedQueryPlan.getFragments().size());
+ CollectNode collectNode =
+ (CollectNode)
+
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(),
5);
+ assertTrue(collectNode.getChildren().get(0) instanceof ExchangeNode);
+ assertTrue(collectNode.getChildren().get(1) instanceof TableScanNode);
+ assertTrue(collectNode.getChildren().get(2) instanceof ExchangeNode);
+ tableScanNode = (TableScanNode) collectNode.getChildren().get(1);
+ assertEquals(4, tableScanNode.getDeviceEntries().size());
+ assertEquals(ASC, tableScanNode.getScanOrder());
+ assertTrue(tableScanNode.getPushDownLimit() == 15 &&
tableScanNode.getPushDownOffset() == 0);
+ assertFalse(tableScanNode.isPushLimitToEachDevice());
+
+ tableScanNode =
+ (TableScanNode)
+
getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(),
1);
+ assertEquals(2, tableScanNode.getDeviceEntries().size());
+ assertEquals(ASC, tableScanNode.getScanOrder());
+ assertTrue(tableScanNode.getPushDownLimit() == 15 &&
tableScanNode.getPushDownOffset() == 0);
+ assertFalse(tableScanNode.isPushLimitToEachDevice());
+ }
+
+ // order by all tags, limit can be pushed into TableScan,
pushLimitToEachDevice==false
+ // Output - Limit - Offset - MergeSort - Sort - Project - Project - TableScan
+ @Test
+ public void orderByAllTagsTest() {
+ sql =
+ "SELECT time, tag3, cast(s2 AS double) FROM table1 where s1>1 order by
tag2 desc, tag1 asc, attr1 desc, tag3 desc, time desc, s1+s3 asc offset 5 limit
10";
+ context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+ actualAnalysis = analyzeSQL(sql, metadata);
+ logicalQueryPlan =
+ new LogicalPlanner(context, metadata, sessionInfo, planOptimizerList,
WarningCollector.NOOP)
+ .plan(actualAnalysis);
+ rootNode = logicalQueryPlan.getRootNode();
+ assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
+
+ distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
+ distributedQueryPlan = distributionPlanner.plan();
+ assertEquals(3, distributedQueryPlan.getFragments().size());
+ MergeSortNode mergeSortNode =
+ (MergeSortNode)
+
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(),
4);
+ assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
+ assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+ assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
+ tableScanNode = (TableScanNode)
getChildrenNode(mergeSortNode.getChildren().get(1), 3);
+ assertEquals(4, tableScanNode.getDeviceEntries().size());
+ assertEquals(DESC, tableScanNode.getScanOrder());
+ assertTrue(tableScanNode.getPushDownLimit() == 15 &&
tableScanNode.getPushDownOffset() == 0);
+ assertFalse(tableScanNode.isPushLimitToEachDevice());
+
+ tableScanNode =
+ (TableScanNode)
+
getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(),
4);
+ assertEquals(2, tableScanNode.getDeviceEntries().size());
+ assertEquals(DESC, tableScanNode.getScanOrder());
+ assertTrue(tableScanNode.getPushDownLimit() == 15 &&
tableScanNode.getPushDownOffset() == 0);
+ assertFalse(tableScanNode.isPushLimitToEachDevice());
+ }
+
+ // order by some tags, limit can be pushed into TableScan,
pushLimitToEachDevice==true
+ // Output - Limit - Offset - MergeSort - Sort - Project - Project - TableScan
+ @Test
+ public void orderBySomeTagsTest() {
+ sql =
+ "SELECT time, tag3, cast(s2 AS double) FROM table1 where s1>1 order by
tag2 desc, attr1 desc, tag3 desc, time desc, s1+s3 asc offset 5 limit 10";
+ context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+ actualAnalysis = analyzeSQL(sql, metadata);
+ logicalQueryPlan =
+ new LogicalPlanner(context, metadata, sessionInfo, planOptimizerList,
WarningCollector.NOOP)
+ .plan(actualAnalysis);
+ rootNode = logicalQueryPlan.getRootNode();
+ assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
+
+ distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
+ distributedQueryPlan = distributionPlanner.plan();
+ assertEquals(3, distributedQueryPlan.getFragments().size());
+ MergeSortNode mergeSortNode =
+ (MergeSortNode)
+
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(),
4);
+ assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
+ assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+ assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
+ tableScanNode = (TableScanNode)
getChildrenNode(mergeSortNode.getChildren().get(1), 3);
+ assertEquals(4, tableScanNode.getDeviceEntries().size());
+ assertEquals(DESC, tableScanNode.getScanOrder());
+ assertTrue(tableScanNode.getPushDownLimit() == 15 &&
tableScanNode.getPushDownOffset() == 0);
+ assertTrue(tableScanNode.isPushLimitToEachDevice());
+
+ tableScanNode =
+ (TableScanNode)
+
getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(),
4);
+ assertEquals(2, tableScanNode.getDeviceEntries().size());
+ assertEquals(DESC, tableScanNode.getScanOrder());
+ assertTrue(tableScanNode.getPushDownLimit() == 15 &&
tableScanNode.getPushDownOffset() == 0);
+ assertTrue(tableScanNode.isPushLimitToEachDevice());
+ }
+
+ // order by time, limit can be pushed into TableScan,
pushLimitToEachDevice==true
+ // Output - Limit - Offset - MergeSort - Sort - Project - Project - TableScan
+ @Test
+ public void orderByTimeTest() {
+ sql =
+ "SELECT time, tag3, cast(s2 AS double) FROM table1 where s1>1 order by
time desc, tag2 asc, s1+s3 asc offset 5 limit 10";
+ context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+ actualAnalysis = analyzeSQL(sql, metadata);
+ logicalQueryPlan =
+ new LogicalPlanner(context, metadata, sessionInfo, planOptimizerList,
WarningCollector.NOOP)
+ .plan(actualAnalysis);
+ rootNode = logicalQueryPlan.getRootNode();
+ assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
+
+ distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
+ distributedQueryPlan = distributionPlanner.plan();
+ assertEquals(3, distributedQueryPlan.getFragments().size());
+ MergeSortNode mergeSortNode =
+ (MergeSortNode)
+
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(),
4);
+ assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
+ assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+ assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
+ tableScanNode = (TableScanNode)
getChildrenNode(mergeSortNode.getChildren().get(1), 3);
+ assertEquals(4, tableScanNode.getDeviceEntries().size());
+ assertEquals(DESC, tableScanNode.getScanOrder());
+ assertTrue(tableScanNode.getPushDownLimit() == 15 &&
tableScanNode.getPushDownOffset() == 0);
+ assertTrue(tableScanNode.isPushLimitToEachDevice());
+
+ tableScanNode =
+ (TableScanNode)
+
getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(),
4);
+ assertEquals(2, tableScanNode.getDeviceEntries().size());
+ assertEquals(DESC, tableScanNode.getScanOrder());
+ assertTrue(tableScanNode.getPushDownLimit() == 15 &&
tableScanNode.getPushDownOffset() == 0);
+ assertTrue(tableScanNode.isPushLimitToEachDevice());
+ }
+
+ // order by others, limit can not be pushed into TableScan
+ // Output - Limit - Offset - MergeSort - Sort - Project - Project - TableScan
+ @Test
+ public void orderByOthersTest() {
+ sql =
+ "SELECT time, tag3, cast(s2 AS double) FROM table1 where s1>1 order by
s1 desc, tag2 desc, attr1 desc, tag3 desc, time desc, s1+s3 asc offset 5 limit
10";
+ context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+ actualAnalysis = analyzeSQL(sql, metadata);
+ logicalQueryPlan =
+ new LogicalPlanner(context, metadata, sessionInfo, planOptimizerList,
WarningCollector.NOOP)
+ .plan(actualAnalysis);
+ rootNode = logicalQueryPlan.getRootNode();
+ assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
+
+ distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
+ distributedQueryPlan = distributionPlanner.plan();
+ assertEquals(3, distributedQueryPlan.getFragments().size());
+ MergeSortNode mergeSortNode =
+ (MergeSortNode)
+
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(),
4);
+ assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
+ assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+ assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
+ tableScanNode = (TableScanNode)
getChildrenNode(mergeSortNode.getChildren().get(1), 3);
+ assertEquals(4, tableScanNode.getDeviceEntries().size());
+ assertEquals(ASC, tableScanNode.getScanOrder());
+ assertTrue(tableScanNode.getPushDownLimit() == 0 &&
tableScanNode.getPushDownOffset() == 0);
+
+ tableScanNode =
+ (TableScanNode)
+
getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(),
4);
+ assertEquals(2, tableScanNode.getDeviceEntries().size());
+ assertEquals(ASC, tableScanNode.getScanOrder());
+ assertTrue(tableScanNode.getPushDownLimit() == 0 &&
tableScanNode.getPushDownOffset() == 0);
+ }
+
+ private PlanNode getChildrenNode(PlanNode root, int idx) {
+ PlanNode result = root;
+ for (int i = 1; i <= idx; i++) {
+ result = result.getChildren().get(0);
+ }
+ return result;
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
index b5a71999a3e..aa539c1cf19 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
@@ -122,7 +122,7 @@ public class SortTest {
@Test
public void timeOthersSomeIDColumnSortTest() {
sql =
- "SELECT time, tag3, tag1, cast(s2 as double), s2+s3, attr1 FROM table1
"
+ "SELECT time, tag3, substring(tag1, 1), cast(s2 as double), s2+s3,
attr1 FROM table1 "
+ "where s1>1 and s1+s3>0 and cast(s1 as double)>1.0 order by time
desc, s1+s2 asc, tag2 asc, tag1 desc offset 5 limit 10";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
actualAnalysis = analyzeSQL(sql, metadata);
@@ -131,8 +131,6 @@ public class SortTest {
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
- // OutputNode - LimitNode - OffsetNode - SortNode - ProjectNode -
ProjectNode - FilterNode -
- // TableScanNode
assertTrue(rootNode instanceof OutputNode);
assertTrue(rootNode.getChildren().get(0) instanceof LimitNode);
assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof
OffsetNode);
@@ -141,7 +139,7 @@ public class SortTest {
instanceof SortNode);
SortNode sortNode =
(SortNode)
rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
- // TODO(beyyes) if these two ProjectNode can be merged into one?
+ // TODO(beyyes) merge parent and child ProjectNode into one, parent
contains all children
assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof
ProjectNode);
assertTrue(
@@ -162,7 +160,7 @@ public class SortTest {
.getChildren()
.get(0);
assertEquals("testdb.table1",
tableScanNode.getQualifiedObjectName().toString());
- // TODO(beyyes) fix the case prune unUsed Columns when the child of
SortNode is ProjectNode
+ // TODO(beyyes) fix the case prune unused columns when the child of
SortNode is ProjectNode
// assertEquals(
// Arrays.asList("time", "tag1", "tag2", "attr1", "s1", "s2"),
// tableScanNode.getOutputColumnNames());
@@ -170,9 +168,7 @@ public class SortTest {
assertEquals(6, tableScanNode.getDeviceEntries().size());
assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
- // OutputNode - LimitNode - OffsetNode - MergeSortNode - SortNode -
ProjectNode - ProjectNode -
- // FilterNode -
- // TableScanNode
+ // Output - Limit - Offset - MergeSort - Sort - Project - Project - Filter
- TableScan
distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
@@ -228,8 +224,9 @@ public class SortTest {
.map(d -> d.getDeviceID().toString())
.collect(Collectors.toList()));
assertEquals(DESC, tableScanNode.getScanOrder());
+ assertTrue(tableScanNode.getPushDownLimit() == 0 &&
tableScanNode.getPushDownOffset() == 0);
- // IdentitySinkNode - SortNode - ProjectNode - ProjectNode - FilterNode -
TableScanNode
+ // IdentitySink - Sort - Project - Project - Filter - TableScan
assertTrue(
distributedQueryPlan.getFragments().get(1).getPlanNodeTree()
instanceof IdentitySinkNode);
assertTrue(
@@ -298,6 +295,7 @@ public class SortTest {
.map(d -> d.getDeviceID().toString())
.collect(Collectors.toList()));
assertEquals(DESC, tableScanNode.getScanOrder());
+ assertTrue(tableScanNode.getPushDownLimit() == 0 &&
tableScanNode.getPushDownOffset() == 0);
// TODO(beyyes) add only one device entry optimization and verifies
}
@@ -348,10 +346,9 @@ public class SortTest {
assertEquals(9, tableScanNode.getAssignments().size());
assertEquals(6, tableScanNode.getDeviceEntries().size());
assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
+ assertTrue(tableScanNode.getPushDownLimit() == 0 &&
tableScanNode.getPushDownOffset() == 0);
- // OutputNode - LimitNode - OffsetNode - MergeSortNode - SortNode -
ProjectNode - ProjectNode -
- // FilterNode -
- // TableScanNode
+ // Output - Limit - Offset - MergeSort - Sort - Project - Project - Filter
- TableScan
distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
@@ -407,8 +404,9 @@ public class SortTest {
.map(d -> d.getDeviceID().toString())
.collect(Collectors.toList()));
assertEquals(DESC, tableScanNode.getScanOrder());
+ assertTrue(tableScanNode.getPushDownLimit() == 0 &&
tableScanNode.getPushDownOffset() == 0);
- // IdentitySinkNode - SortNode - ProjectNode - ProjectNode - FilterNode -
TableScanNode
+ // IdentitySink - Sort - Project - Project - Filter - TableScan
assertTrue(
distributedQueryPlan.getFragments().get(1).getPlanNodeTree()
instanceof IdentitySinkNode);
assertTrue(
@@ -477,6 +475,7 @@ public class SortTest {
.map(d -> d.getDeviceID().toString())
.collect(Collectors.toList()));
assertEquals(DESC, tableScanNode.getScanOrder());
+ assertTrue(tableScanNode.getPushDownLimit() == 0 &&
tableScanNode.getPushDownOffset() == 0);
}
// order by time, some_ids, others
@@ -492,8 +491,7 @@ public class SortTest {
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
- // OutputNode - LimitNode - OffsetNode - SortNode - ProjectNode -
ProjectNode - FilterNode -
- // TableScanNode
+ // Output - Limit - Offset - Sort - Project - Project - Filter - TableScan
assertTrue(rootNode instanceof OutputNode);
assertTrue(rootNode.getChildren().get(0) instanceof LimitNode);
assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof
OffsetNode);
@@ -525,10 +523,8 @@ public class SortTest {
assertEquals(9, tableScanNode.getAssignments().size());
assertEquals(6, tableScanNode.getDeviceEntries().size());
assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
+ assertTrue(tableScanNode.getPushDownLimit() == 0 &&
tableScanNode.getPushDownOffset() == 0);
- // OutputNode - LimitNode - OffsetNode - MergeSortNode - SortNode -
ProjectNode - ProjectNode -
- // FilterNode -
- // TableScanNode
distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
@@ -584,6 +580,7 @@ public class SortTest {
.map(d -> d.getDeviceID().toString())
.collect(Collectors.toList()));
assertEquals(DESC, tableScanNode.getScanOrder());
+ assertTrue(tableScanNode.getPushDownLimit() == 0 &&
tableScanNode.getPushDownOffset() == 0);
// IdentitySinkNode - SortNode - ProjectNode - ProjectNode - FilterNode -
TableScanNode
assertTrue(
@@ -654,6 +651,7 @@ public class SortTest {
.map(d -> d.getDeviceID().toString())
.collect(Collectors.toList()));
assertEquals(DESC, tableScanNode.getScanOrder());
+ assertTrue(tableScanNode.getPushDownLimit() == 0 &&
tableScanNode.getPushDownOffset() == 0);
}
// order by time, all_ids, others
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java
new file mode 100644
index 00000000000..bcf776321f0
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+
+import java.util.List;
+import java.util.Map;
+
+public class TestPlanBuilder {
+
+ private PlanNode root;
+
+ public TestPlanBuilder() {}
+
+ public PlanNode getRoot() {
+ return root;
+ }
+
+ public TestPlanBuilder output(String id, List<String> columnNames,
List<Symbol> outputSymbols) {
+ this.root = new OutputNode(new PlanNodeId(id), this.root, columnNames,
outputSymbols);
+ return this;
+ }
+
+ public TestPlanBuilder limit(String id, long count) {
+ this.root = new LimitNode(new PlanNodeId(id), this.root, count, null);
+ return this;
+ }
+
+ public TestPlanBuilder offset(String id, long count) {
+ this.root = new OffsetNode(new PlanNodeId(id), this.root, count);
+ return this;
+ }
+
+ public TestPlanBuilder project(String id, Assignments assignments) {
+ this.root = new ProjectNode(new PlanNodeId(id), this.root, assignments);
+ return this;
+ }
+
+ public TestPlanBuilder filter(String id, Expression predicate) {
+ this.root = new FilterNode(new PlanNodeId(id), this.root, predicate);
+ return this;
+ }
+
+ public TestPlanBuilder tableScan(
+ String id,
+ QualifiedObjectName qualifiedObjectName,
+ List<Symbol> outputSymbols,
+ Map<Symbol, ColumnSchema> assignments,
+ List<DeviceEntry> deviceEntries,
+ Map<Symbol, Integer> idAndAttributeIndexMap,
+ Ordering scanOrder,
+ Expression timePredicate,
+ Expression pushDownPredicate) {
+ this.root =
+ new TableScanNode(
+ new PlanNodeId(id),
+ qualifiedObjectName,
+ outputSymbols,
+ assignments,
+ deviceEntries,
+ idAndAttributeIndexMap,
+ scanOrder,
+ timePredicate,
+ pushDownPredicate);
+ return this;
+ }
+}