This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/join
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/beyyes/join by this push:
new 9f97f462353 add MetadataExpressionTransformForJoin for join operation
9f97f462353 is described below
commit 9f97f4623539a6d1d1ccfd8e30a31fc10380d135
Author: Beyyes <[email protected]>
AuthorDate: Tue Aug 20 17:46:53 2024 +0800
add MetadataExpressionTransformForJoin for join operation
---
.../plan/relational/planner/EqualityInference.java | 2 +-
.../plan/relational/planner/PredicateUtils.java | 217 ---------------------
.../distribute/TableDistributedPlanGenerator.java | 7 +
.../plan/relational/planner/ir/AstUtils.java | 48 -----
.../ir/MetadataExpressionTransformForJoin.java | 44 +++++
.../relational/planner/node/TableScanNode.java | 4 +
.../optimizations/PushPredicateIntoTableScan.java | 144 +++++++++++---
7 files changed, 168 insertions(+), 298 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/EqualityInference.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/EqualityInference.java
index 1ca72118c5c..8caf1246fc9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/EqualityInference.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/EqualityInference.java
@@ -31,9 +31,9 @@ import java.util.stream.Stream;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;
-import static
org.apache.iotdb.db.queryengine.plan.relational.planner.PredicateUtils.extractConjuncts;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.DeterminismEvaluator.isDeterministic;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.ExpressionNodeInliner.replaceExpression;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.extractConjuncts;
/**
* Makes equality based inferences to rewrite Expressions and generate
equality sets in terms of
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PredicateUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PredicateUtils.java
index 1c07128d62c..62531541c4e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PredicateUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PredicateUtils.java
@@ -29,11 +29,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral;
import org.apache.tsfile.utils.Pair;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
import static org.apache.iotdb.commons.conf.IoTDBConstant.TIME;
import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
@@ -199,217 +195,4 @@ public class PredicateUtils {
&& ((Identifier) timeExpression).getValue().equalsIgnoreCase(TIME)
&& valueExpression instanceof LongLiteral;
}
-
- // private static boolean checkBetweenConstantSatisfy(Expression e1,
Expression e2) {
- // return e1.isConstantOperand()
- // && e2.isConstantOperand()
- // && ((ConstantOperand) e1).getDataType() == TSDataType.INT64
- // && ((ConstantOperand) e2).getDataType() == TSDataType.INT64
- // && (Long.parseLong(((ConstantOperand) e1).getValueString())
- // <= Long.parseLong(((ConstantOperand) e2).getValueString()));
- // }
-
- /**
- * Check if the given expression contains time filter.
- *
- * @param predicate given expression
- * @return true if the given expression contains time filter
- */
- // public static boolean checkIfTimeFilterExist(Expression predicate) {
- // return new TimeFilterExistChecker().process(predicate, null);
- // }
-
- /**
- * Recursively removes all use of the not() operator in a predicate by
replacing all instances of
- * not(x) with the inverse(x),
- *
- * <p>eg: not(and(eq(), not(eq(y))) -> or(notEq(), eq(y))
- *
- * <p>The returned predicate should have the same meaning as the original,
but without the use of
- * the not() operator.
- *
- * <p>See also {@link PredicateUtils#reversePredicate(Expression)}, which is
used to do the
- * inversion.
- *
- * @param predicate the predicate to remove not() from
- * @return the predicate with all not() operators removed
- */
- // public static Expression predicateRemoveNot(Expression predicate) {
- // if (predicate.getExpressionType().equals(ExpressionType.LOGIC_AND)) {
- // return ExpressionFactory.and(
- // predicateRemoveNot(((BinaryExpression)
predicate).getLeftExpression()),
- // predicateRemoveNot(((BinaryExpression)
predicate).getRightExpression()));
- // } else if
(predicate.getExpressionType().equals(ExpressionType.LOGIC_OR)) {
- // return ExpressionFactory.or(
- // predicateRemoveNot(((BinaryExpression)
predicate).getLeftExpression()),
- // predicateRemoveNot(((BinaryExpression)
predicate).getRightExpression()));
- // } else if
(predicate.getExpressionType().equals(ExpressionType.LOGIC_NOT)) {
- // return reversePredicate(((LogicNotExpression)
predicate).getExpression());
- // }
- // return predicate;
- // }
-
- /**
- * Converts a predicate to its logical inverse. The returned predicate
should be equivalent to
- * not(p), but without the use of a not() operator.
- *
- * <p>See also {@link PredicateUtils#predicateRemoveNot(Expression)}, which
can remove the use of
- * all not() operators without inverting the overall predicate.
- *
- * @param predicate given predicate
- * @return the predicate after reversing
- */
- // public static Expression reversePredicate(Expression predicate) {
- // return new ReversePredicateVisitor().process(predicate, null);
- // }
-
- /**
- * Simplify the given predicate (Remove the NULL and TRUE/FALSE expression).
- *
- * @param predicate given predicate
- * @return the predicate after simplifying
- */
- // public static Expression simplifyPredicate(Expression predicate) {
- // return new PredicateSimplifier().process(predicate, null);
- // }
-
- /**
- * Convert the given predicate to time filter.
- *
- * <p>Note: the supplied predicate must not contain any instances of the
not() operator as this is
- * not supported by this filter. The supplied predicate should first be run
through {@link
- * PredicateUtils#predicateRemoveNot(Expression)} to rewrite it in a form
that doesn't make use of
- * the not() operator.
- *
- * @param predicate given predicate
- * @return the time filter converted from the given predicate
- */
- // public static Filter convertPredicateToTimeFilter(Expression predicate) {
- // if (predicate == null) {
- // return null;
- // }
- // return predicate.accept(new ConvertPredicateToTimeFilterVisitor(),
null);
- // }
-
- // public static Filter convertPredicateToFilter(
- // Expression predicate,
- // List<String> allMeasurements,
- // boolean isBuildPlanUseTemplate,
- // TypeProvider typeProvider) {
- // if (predicate == null) {
- // return null;
- // }
- // return predicate.accept(
- // new ConvertPredicateToFilterVisitor(),
- // new ConvertPredicateToFilterVisitor.Context(
- // allMeasurements, isBuildPlanUseTemplate, typeProvider));
- // }
-
- /**
- * Combine the given conjuncts into a single expression using "and".
- *
- * @param conjuncts given conjuncts
- * @return the expression combined by the given conjuncts
- */
- public static Expression combineConjuncts(List<Expression> conjuncts) {
- if (conjuncts.size() == 1) {
- return conjuncts.get(0);
- }
- return constructRightDeepTreeWithAnd(conjuncts);
- }
-
- private static Expression constructRightDeepTreeWithAnd(List<Expression>
conjuncts) {
- // TODO: consider other structures of tree
- if (conjuncts.size() == 2) {
- return and(conjuncts.get(0), conjuncts.get(1));
- } else {
- return and(
- conjuncts.get(0), constructRightDeepTreeWithAnd(conjuncts.subList(1,
conjuncts.size())));
- }
- }
-
- public static Expression removeDuplicateConjunct(Expression predicate) {
- if (predicate == null) {
- return null;
- }
- Set<Expression> conjuncts = new HashSet<>();
- extractConjuncts(predicate, conjuncts);
- return combineConjuncts(new ArrayList<>(conjuncts));
- }
-
- public static List<Expression> extractConjuncts(Expression predicate) {
- Set<Expression> conjuncts = new HashSet<>();
- extractConjuncts(predicate, conjuncts);
- return new ArrayList<>(conjuncts);
- }
-
- private static void extractConjuncts(Expression predicate, Set<Expression>
conjuncts) {
- if (predicate instanceof LogicalExpression
- && ((LogicalExpression) predicate).getOperator() == AND) {
- extractConjuncts(((LogicalExpression) predicate).getTerms().get(0),
conjuncts);
- extractConjuncts(((LogicalExpression) predicate).getTerms().get(1),
conjuncts);
- } else {
- conjuncts.add(predicate);
- }
- }
-
- /**
- * Extract the source symbol (full path for non-aligned path, device path
for aligned path) from
- * the given predicate. If the predicate contains multiple source symbols,
return null.
- *
- * @param predicate given predicate
- * @return the source symbol extracted from the given predicate
- */
- // public static PartialPath extractPredicateSourceSymbol(Expression
predicate) {
- // List<Expression> sourceExpressions =
ExpressionAnalyzer.searchSourceExpressions(predicate);
- // Set<PartialPath> sourcePaths =
- // sourceExpressions.stream()
- // .map(expression -> ((TimeSeriesOperand) expression).getPath())
- // .collect(Collectors.toSet());
- // Iterator<PartialPath> pathIterator = sourcePaths.iterator();
- // MeasurementPath firstPath = (MeasurementPath) pathIterator.next();
- //
- // if (sourcePaths.size() == 1) {
- // // only contain one source path, can be push down
- // return firstPath.isUnderAlignedEntity() ? firstPath.getDevicePath()
: firstPath;
- // }
- //
- // // sourcePaths contain more than one path, can be push down when
- // // these paths under on aligned device
- // if (!firstPath.isUnderAlignedEntity()) {
- // return null;
- // }
- // PartialPath checkedDevice = firstPath.getDevicePath();
- // while (pathIterator.hasNext()) {
- // MeasurementPath path = (MeasurementPath) pathIterator.next();
- // if (!path.isUnderAlignedEntity() ||
!path.getDevicePath().equals(checkedDevice)) {
- // return null;
- // }
- // }
- // return checkedDevice;
- // }
-
- /**
- * Check if the given predicate can be pushed down from FilterNode to
ScanNode.
- *
- * <p>The predicate <b>cannot</b> be pushed down if it satisfies the
following conditions:
- * <li>predicate contains IS_NULL
- *
- * @param predicate given predicate
- * @return true if the given predicate can be pushed down to source
- */
- // public static boolean predicateCanPushDownToSource(Expression predicate)
{
- // return new PredicateCanPushDownToSourceChecker().process(predicate,
null);
- // }
-
- /**
- * Check if the given predicate can be pushed into ScanOperator and execute
using the {@link
- * Filter} interface.
- *
- * @param predicate given predicate
- * @return true if the given predicate can be pushed into ScanOperator
- */
- // public static boolean predicateCanPushIntoScan(Expression predicate) {
- // return new PredicatePushIntoScanChecker().process(predicate, null);
- // }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 321585f5d12..f19b97f83e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -36,6 +36,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
@@ -318,6 +319,12 @@ public class TableDistributedPlanGenerator
return resultNodeList;
}
+ @Override
+ public List<PlanNode> visitJoin(JoinNode node, PlanContext context) {
+ List<PlanNode> leftChildrenNodes = node.getLeftChild().accept(this,
context);
+ List<PlanNode> rightChildrenNodes = node.getRightChild().accept(this,
context);
+ }
+
@Override
public List<PlanNode> visitTableScan(TableScanNode node, PlanContext
context) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/AstUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/AstUtils.java
index 7e47072947c..41c909c4274 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/AstUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/AstUtils.java
@@ -1,17 +1,12 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.ir;
-import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
-import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;
-import com.google.common.collect.ImmutableList;
import com.google.common.graph.SuccessorsFunction;
import com.google.common.graph.Traverser;
import java.util.List;
-import java.util.OptionalInt;
import java.util.function.BiFunction;
-import java.util.function.Function;
import java.util.stream.Stream;
import static com.google.common.collect.Streams.stream;
@@ -55,48 +50,5 @@ public final class AstUtils {
return true;
}
- /**
- * Computes a hash of the given AST by applying the provided subtree hasher
at each level.
- *
- * <p>If the hasher returns a non-empty {@link OptionalInt}, the value is
treated as the hash for
- * the subtree at that node. Otherwise, the hashes of its children are
computed and combined.
- */
- public static int treeHash(Node node, Function<Node, OptionalInt>
subtreeHasher) {
- OptionalInt hash = subtreeHasher.apply(node);
-
- if (hash.isPresent()) {
- return hash.getAsInt();
- }
-
- List<? extends Node> children = node.getChildren();
-
- int result = node.getClass().hashCode();
- for (Node element : children) {
- result = 31 * result + treeHash(element, subtreeHasher);
- }
-
- return result;
- }
-
- public static List<Expression> extractConjuncts(Expression expression) {
- ImmutableList.Builder<Expression> resultBuilder = ImmutableList.builder();
- extractPredicates(LogicalExpression.Operator.AND, expression,
resultBuilder);
- return resultBuilder.build();
- }
-
- private static void extractPredicates(
- LogicalExpression.Operator operator,
- Expression expression,
- ImmutableList.Builder<Expression> resultBuilder) {
- if (expression instanceof LogicalExpression
- && ((LogicalExpression) expression).getOperator() == operator) {
- for (Expression term : ((LogicalExpression) expression).getTerms()) {
- extractPredicates(operator, term, resultBuilder);
- }
- } else {
- resultBuilder.add(expression);
- }
- }
-
private AstUtils() {}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/MetadataExpressionTransformForJoin.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/MetadataExpressionTransformForJoin.java
new file mode 100644
index 00000000000..b36c488662f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/MetadataExpressionTransformForJoin.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.relational.planner.ir;
+
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference;
+
+import java.util.Map;
+
+public class MetadataExpressionTransformForJoin {
+
+ public static Expression transform(
+ Expression expression, Map<Symbol, ColumnSchema> tableAssignments) {
+ return new Visitor().process(expression, new Context(tableAssignments));
+ }
+
+ private static class Visitor extends RewritingVisitor<Context> {
+ @Override
+ protected Expression visitSymbolReference(SymbolReference node, Context
context) {
+ return new
SymbolReference(context.tableAssignments.get(Symbol.of(node.getName())).getName());
+ }
+ }
+
+ public static class Context {
+ Map<Symbol, ColumnSchema> tableAssignments;
+
+ public Context(Map<Symbol, ColumnSchema> tableAssignments) {
+ this.tableAssignments = tableAssignments;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
index 0e575f5dbd5..770ab775c05 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
@@ -371,6 +371,10 @@ public class TableScanNode extends SourceNode {
this.outputSymbols = outputSymbols;
}
+ public void setAssignments(Map<Symbol, ColumnSchema> assignments) {
+ this.assignments = assignments;
+ }
+
public Map<Symbol, ColumnSchema> getAssignments() {
return this.assignments;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index 9fc49530418..2a7648c6e63 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -20,6 +20,7 @@ import
org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
@@ -38,6 +39,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.EqualityInference;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.MetadataExpressionTransformForJoin;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
@@ -50,6 +52,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Pair;
@@ -73,11 +76,11 @@ import static
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory
import static
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.MEASUREMENT;
import static
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TIME;
import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.getTimePartitionSlotList;
-import static
org.apache.iotdb.db.queryengine.plan.relational.planner.PredicateUtils.combineConjuncts;
-import static
org.apache.iotdb.db.queryengine.plan.relational.planner.PredicateUtils.extractConjuncts;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor.extractUnique;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.DeterminismEvaluator.isDeterministic;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GlobalTimePredicateExtractVisitor.extractGlobalTimeFilter;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.combineConjuncts;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.extractConjuncts;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.filterDeterministicConjuncts;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode.JoinType.FULL;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode.JoinType.INNER;
@@ -126,7 +129,9 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
private final Analysis analysis;
private final Metadata metadata;
private Expression predicate;
- private SymbolAllocator symbolAllocator;
+ private final SymbolAllocator symbolAllocator;
+ private final QueryId queryId;
+ private boolean hasJoinNode;
Rewriter(
MPPQueryContext queryContext,
@@ -137,6 +142,7 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
this.analysis = analysis;
this.metadata = metadata;
this.symbolAllocator = symbolAllocator;
+ this.queryId = queryContext.getQueryId();
}
@Override
@@ -229,20 +235,21 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
}
// do index scan after expressionCanPushDown is processed
- tableMetadataIndexScan(tableScanNode,
splitExpression.getMetadataExpressions());
+ PlanNode resultNode =
+ tableMetadataIndexScan(tableScanNode,
splitExpression.getMetadataExpressions());
// exist expressions can not push down to scan operator
if (!splitExpression.getExpressionsCannotPushDown().isEmpty()) {
List<Expression> expressions =
splitExpression.getExpressionsCannotPushDown();
return new FilterNode(
- queryContext.getQueryId().genPlanNodeId(),
- tableScanNode,
+ queryId.genPlanNodeId(),
+ resultNode,
expressions.size() == 1
? expressions.get(0)
: new LogicalExpression(LogicalExpression.Operator.AND,
expressions));
}
- return tableScanNode;
+ return resultNode;
}
private SplitExpression splitPredicate(TableScanNode node) {
@@ -295,6 +302,7 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
@Override
public PlanNode visitJoin(JoinNode node, Void context) {
+ hasJoinNode = true;
Expression inheritedPredicate = predicate;
// See if we can rewrite outer joins in terms of a plain inner join
@@ -382,19 +390,47 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
// leftPredicate = combineConjuncts(metadata, leftPredicate,
combineConjuncts(metadata,
// dynamicFiltersResult.getPredicates()));
- PlanNode leftSource = node.getLeftChild();
- PlanNode rightSource = node.getRightChild();
+ PlanNode leftSource;
+ PlanNode rightSource;
boolean equiJoinClausesUnmodified =
ImmutableSet.copyOf(equiJoinClauses).equals(ImmutableSet.copyOf(node.getCriteria()));
+ // TODO(beyyes) make the judgement code tidy
if (!equiJoinClausesUnmodified) {
- // leftSource = context.rewrite(new
ProjectNode(queryContext.getQueryId().genPlanNodeId(),
- // node.getLeftChild(), leftProjections.build()), leftPredicate);
- // rightSource = context.rewrite(new
ProjectNode(queryContext.getQueryId().genPlanNodeId(),
- // node.getRightChild(), rightProjections.build()), rightPredicate);
+ ProjectNode projectNode =
+ new ProjectNode(queryId.genPlanNodeId(), node.getLeftChild(),
leftProjections.build());
+ if (TRUE_LITERAL.equals(leftPredicate)) {
+ leftSource = projectNode.accept(this, context);
+ } else {
+ FilterNode filterNode =
+ new FilterNode(queryId.genPlanNodeId(), projectNode,
leftPredicate);
+ leftSource = filterNode.accept(this, context);
+ }
+
+ projectNode =
+ new ProjectNode(
+ queryId.genPlanNodeId(), node.getRightChild(),
rightProjections.build());
+ if (TRUE_LITERAL.equals(rightPredicate)) {
+ rightSource = projectNode.accept(this, context);
+ } else {
+ FilterNode filterNode =
+ new FilterNode(queryId.genPlanNodeId(), projectNode,
rightPredicate);
+ rightSource = filterNode.accept(this, context);
+ }
} else {
- // leftSource = context.rewrite(node.getLeftChild(), leftPredicate);
- // rightSource = context.rewrite(node.getRightChild(), rightPredicate);
- // TODO rewrite
+ if (TRUE_LITERAL.equals(leftPredicate)) {
+ leftSource = node.getLeftChild().accept(this, context);
+ } else {
+ FilterNode filterNode =
+ new FilterNode(queryId.genPlanNodeId(), node.getLeftChild(),
leftPredicate);
+ leftSource = filterNode.accept(this, context);
+ }
+ if (TRUE_LITERAL.equals(rightPredicate)) {
+ rightSource = node.getRightChild().accept(this, context);
+ } else {
+ FilterNode filterNode =
+ new FilterNode(queryId.genPlanNodeId(), node.getRightChild(),
rightPredicate);
+ rightSource = filterNode.accept(this, context);
+ }
}
Optional<Expression> newJoinFilter =
Optional.of(combineConjuncts(joinFilter));
@@ -403,12 +439,12 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
}
if (node.getJoinType() == INNER && newJoinFilter.isPresent() &&
equiJoinClauses.isEmpty()) {
+ throw new IllegalStateException("INNER JOIN only support
equiJoinClauses");
// if we do not have any equi conjunct we do not pushdown non-equality
condition into
// inner join, so we plan execution as nested-loops-join followed by
filter instead
// hash join.
- // todo: remove the code when we have support for filter function in
nested loop join
// postJoinPredicate = combineConjuncts(postJoinPredicate,
newJoinFilter.get());
- newJoinFilter = Optional.empty();
+ // newJoinFilter = Optional.empty();
}
boolean filtersEquivalent =
@@ -419,9 +455,8 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
if (leftSource != node.getLeftChild()
|| rightSource != node.getRightChild()
|| !filtersEquivalent
- ||
// !dynamicFilters.equals(node.getDynamicFilters()) ||
- !equiJoinClausesUnmodified) {
+ || !equiJoinClausesUnmodified) {
leftSource =
new ProjectNode(
queryContext.getQueryId().genPlanNodeId(), leftSource,
leftProjections.build());
@@ -726,8 +761,7 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
@Override
public PlanNode visitTableScan(TableScanNode node, Void context) {
- tableMetadataIndexScan(node, Collections.emptyList());
- return node;
+ return tableMetadataIndexScan(node, Collections.emptyList());
}
@Override
@@ -741,30 +775,70 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
}
/** Get deviceEntries and DataPartition used in TableScan. */
- private void tableMetadataIndexScan(TableScanNode node, List<Expression>
metadataExpressions) {
+ private PlanNode tableMetadataIndexScan(
+ TableScanNode tableScanNode, List<Expression> metadataExpressions) {
+ // for join operator, columnSymbols in TableScanNode is renamed, which
adds suffix for origin
+ // column name,
+ // add a new ProjectNode above TableScanNode.
+ boolean tableScanNodeColumnsRenamed = false;
+ for (Map.Entry<Symbol, ColumnSchema> entry :
tableScanNode.getAssignments().entrySet()) {
+ Symbol columnSymbol = entry.getKey();
+ ColumnSchema columnSchema = entry.getValue();
+ if (!columnSymbol.getName().equals(columnSchema.getName())) {
+ tableScanNodeColumnsRenamed = true;
+ break;
+ }
+ }
+ ProjectNode newProjectNode = null;
+ if (tableScanNodeColumnsRenamed) {
+ metadataExpressions.replaceAll(
+ expression1 ->
+ MetadataExpressionTransformForJoin.transform(
+ expression1, tableScanNode.getAssignments()));
+
+ Assignments.Builder projectAssignments = Assignments.builder();
+ ImmutableMap.Builder<Symbol, ColumnSchema> tableScanAssignments =
ImmutableMap.builder();
+ for (Map.Entry<Symbol, ColumnSchema> entry :
tableScanNode.getAssignments().entrySet()) {
+ Symbol columnSymbol = entry.getKey();
+ ColumnSchema columnSchema = entry.getValue();
+ projectAssignments.put(columnSymbol, new
SymbolReference(columnSchema.getName()));
+ tableScanAssignments.put(Symbol.of(columnSchema.getName()),
columnSchema);
+ }
+ newProjectNode =
+ new ProjectNode(queryId.genPlanNodeId(), tableScanNode,
projectAssignments.build());
+ tableScanNode.setAssignments(tableScanAssignments.build());
+ }
+
List<String> attributeColumns = new ArrayList<>();
int attributeIndex = 0;
- for (Symbol columnName : node.getAssignments().keySet()) {
- if
(ATTRIBUTE.equals(node.getAssignments().get(columnName).getColumnCategory())) {
- attributeColumns.add(columnName.getName());
- node.getIdAndAttributeIndexMap().put(columnName, attributeIndex++);
+ for (Map.Entry<Symbol, ColumnSchema> entry :
tableScanNode.getAssignments().entrySet()) {
+ Symbol columnSymbol = entry.getKey();
+ ColumnSchema columnSchema = entry.getValue();
+ if (ATTRIBUTE.equals(columnSchema.getColumnCategory())) {
+ attributeColumns.add(columnSchema.getName());
+ tableScanNode.getIdAndAttributeIndexMap().put(columnSymbol,
attributeIndex++);
}
}
List<DeviceEntry> deviceEntries =
metadata.indexScan(
- node.getQualifiedObjectName(), metadataExpressions,
attributeColumns, queryContext);
- node.setDeviceEntries(deviceEntries);
+ tableScanNode.getQualifiedObjectName(),
+ metadataExpressions,
+ attributeColumns,
+ queryContext);
+ tableScanNode.setDeviceEntries(deviceEntries);
if (deviceEntries.isEmpty()) {
analysis.setFinishQueryAfterAnalyze();
analysis.setEmptyDataSource(true);
} else {
Filter timeFilter =
- node.getTimePredicate()
+ tableScanNode
+ .getTimePredicate()
.map(value -> value.accept(new
ConvertPredicateToTimeFilterVisitor(), null))
.orElse(null);
- node.setTimeFilter(timeFilter);
- String treeModelDatabase = "root." +
node.getQualifiedObjectName().getDatabaseName();
+ tableScanNode.setTimeFilter(timeFilter);
+ String treeModelDatabase =
+ "root." + tableScanNode.getQualifiedObjectName().getDatabaseName();
DataPartition dataPartition =
fetchDataPartitionByDevices(treeModelDatabase, deviceEntries,
timeFilter);
@@ -780,6 +854,12 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
analysis.upsertDataPartition(dataPartition);
}
}
+
+ if (tableScanNodeColumnsRenamed) {
+ return newProjectNode;
+ } else {
+ return tableScanNode;
+ }
}
private DataPartition fetchDataPartitionByDevices(