This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch support_exists_and_correlate in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8beeb0c3b49d5bd958067502f4fdf8abb72aa832 Author: lancelly <[email protected]> AuthorDate: Sun Jan 12 14:56:18 2025 +0800 add rules and join check --- .../plan/planner/TableOperatorGenerator.java | 18 +++ .../iterative/rule/RemoveRedundantExists.java | 104 ++++++++++++++ .../planner/optimizations/JoinUtils.java | 159 +++++++++++++++++++++ .../optimizations/LogicalOptimizeFactory.java | 5 +- .../optimizations/PushPredicateIntoTableScan.java | 20 ++- 5 files changed, 302 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index efd8323af07..c50fc0aca1f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -239,6 +239,7 @@ import static org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator import static org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions.updateFilterUsingTTL; import static org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder.ASC_NULLS_LAST; import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GlobalTimePredicateExtractVisitor.isTimeColumn; +import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL; import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; import static org.apache.iotdb.db.utils.constant.SqlConstant.AVG; import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT; @@ -1404,6 +1405,8 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution dataTypes); } + semanticCheckForJoin(node); + int size = node.getCriteria().size(); int[] leftJoinKeyPositions = new int[size]; for (int i = 0; i < size; i++) { @@ -1491,6 +1494,21 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution throw new IllegalStateException("Unsupported join type: " + node.getJoinType()); } + private void semanticCheckForJoin(JoinNode node) { + try { + checkArgument( + !node.getFilter().isPresent() || node.getFilter().get().equals(TRUE_LITERAL), + String.format( + "Filter is not supported in %s. Filter is %s.", + node.getJoinType(), node.getFilter().get())); + checkArgument( + !node.getCriteria().isEmpty(), + String.format("%s must have join keys.", node.getJoinType())); + } catch (IllegalArgumentException e) { + throw new SemanticException(e.getMessage()); + } + } + private BiFunction<Column, Integer, Column> buildUpdateLastRowFunction(Type joinKeyType) { switch (joinKeyType.getTypeEnum()) { case INT32: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/RemoveRedundantExists.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/RemoveRedundantExists.java new file mode 100644 index 00000000000..8af7e07af92 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/RemoveRedundantExists.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Cardinality; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.applyNode; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.QueryCardinalityUtil.extractCardinality; +import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.FALSE_LITERAL; +import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL; + +/** + * Given: + * + * <pre> + * - Apply [X.*, e = EXISTS (true)] + * - X + * - S with cardinality >= 1 + * </pre> + * + * <p>Produces: + * + * <pre> + * - Project [X.*, e = true] + * - X + * </pre> + * + * <p>Given: + * + * <pre> + * - Apply [X.*, e = EXISTS (true)] + * - X + * - S with cardinality = 0 + * </pre> + * + * <p>Produces: + * + * <pre> + * - Project [X.*, e = false] + * - X + * </pre> + */ +public class RemoveRedundantExists implements Rule<ApplyNode> { + private static final Pattern<ApplyNode> PATTERN = + applyNode() + .matching( + node -> + node.getSubqueryAssignments().values().stream() + .allMatch(ApplyNode.Exists.class::isInstance)); + + @Override + public Pattern<ApplyNode> getPattern() { + return PATTERN; + } + + @Override + public Result apply(ApplyNode node, Captures captures, Context context) { + Assignments.Builder assignments = Assignments.builder(); + assignments.putIdentities(node.getInput().getOutputSymbols()); + + Cardinality subqueryCardinality = extractCardinality(node.getSubquery(), context.getLookup()); + Expression result; + if (subqueryCardinality.isEmpty()) { + result = FALSE_LITERAL; + } else if (subqueryCardinality.isAtLeastScalar()) { + result = TRUE_LITERAL; + } else { + return Result.empty(); + } + + for (Symbol output : node.getSubqueryAssignments().keySet()) { + assignments.put(output, result); + } + + return Result.ofPlanNode( + new ProjectNode( + context.getIdAllocator().genPlanNodeId(), node.getInput(), assignments.build())); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/JoinUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/JoinUtils.java index 8d317cc09d8..80e1b586995 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/JoinUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/JoinUtils.java @@ -335,4 +335,163 @@ public class JoinUtils { return postJoinPredicate; } } + + static OuterJoinPushDownResult processLimitedOuterJoin( + Metadata metadata, + Expression inheritedPredicate, + Expression outerEffectivePredicate, + Expression innerEffectivePredicate, + Expression joinPredicate, + Collection<Symbol> outerSymbols, + Collection<Symbol> innerSymbols) { + checkArgument( + outerSymbols.containsAll(extractUnique(outerEffectivePredicate)), + "outerEffectivePredicate must only contain symbols from outerSymbols"); + checkArgument( + innerSymbols.containsAll(extractUnique(innerEffectivePredicate)), + "innerEffectivePredicate must only contain symbols from innerSymbols"); + + ImmutableList.Builder<Expression> outerPushdownConjuncts = ImmutableList.builder(); + ImmutableList.Builder<Expression> innerPushdownConjuncts = ImmutableList.builder(); + ImmutableList.Builder<Expression> postJoinConjuncts = ImmutableList.builder(); + ImmutableList.Builder<Expression> joinConjuncts = ImmutableList.builder(); + + // Strip out non-deterministic conjuncts + extractConjuncts(inheritedPredicate).stream() + .filter(expression -> !isDeterministic(expression)) + .forEach(postJoinConjuncts::add); + inheritedPredicate = filterDeterministicConjuncts(inheritedPredicate); + + outerEffectivePredicate = filterDeterministicConjuncts(outerEffectivePredicate); + innerEffectivePredicate = filterDeterministicConjuncts(innerEffectivePredicate); + extractConjuncts(joinPredicate).stream() + .filter(expression -> !isDeterministic(expression)) + .forEach(joinConjuncts::add); + joinPredicate = filterDeterministicConjuncts(joinPredicate); + + // Generate equality inferences + EqualityInference inheritedInference = new EqualityInference(metadata, inheritedPredicate); + EqualityInference outerInference = + new EqualityInference(metadata, inheritedPredicate, outerEffectivePredicate); + + Set<Symbol> innerScope = ImmutableSet.copyOf(innerSymbols); + Set<Symbol> outerScope = ImmutableSet.copyOf(outerSymbols); + + EqualityInference.EqualityPartition equalityPartition = + inheritedInference.generateEqualitiesPartitionedBy(outerScope); + Expression outerOnlyInheritedEqualities = + combineConjuncts(equalityPartition.getScopeEqualities()); + EqualityInference potentialNullSymbolInference = + new EqualityInference( + metadata, + outerOnlyInheritedEqualities, + outerEffectivePredicate, + innerEffectivePredicate, + joinPredicate); + + // Push outer and join equalities into the inner side. For example: + // SELECT * FROM nation LEFT OUTER JOIN region ON nation.regionkey = region.regionkey and + // nation.name = region.name WHERE nation.name = 'blah' + + EqualityInference potentialNullSymbolInferenceWithoutInnerInferred = + new EqualityInference( + metadata, outerOnlyInheritedEqualities, outerEffectivePredicate, joinPredicate); + innerPushdownConjuncts.addAll( + potentialNullSymbolInferenceWithoutInnerInferred + .generateEqualitiesPartitionedBy(innerScope) + .getScopeEqualities()); + + // TODO: we can further improve simplifying the equalities by considering other relationships + // from the outer side + EqualityInference.EqualityPartition joinEqualityPartition = + new EqualityInference(metadata, joinPredicate).generateEqualitiesPartitionedBy(innerScope); + innerPushdownConjuncts.addAll(joinEqualityPartition.getScopeEqualities()); + joinConjuncts + .addAll(joinEqualityPartition.getScopeComplementEqualities()) + .addAll(joinEqualityPartition.getScopeStraddlingEqualities()); + + // Add the equalities from the inferences back in + outerPushdownConjuncts.addAll(equalityPartition.getScopeEqualities()); + postJoinConjuncts.addAll(equalityPartition.getScopeComplementEqualities()); + postJoinConjuncts.addAll(equalityPartition.getScopeStraddlingEqualities()); + + // See if we can push inherited predicates down + EqualityInference.nonInferrableConjuncts(metadata, inheritedPredicate) + .forEach( + conjunct -> { + Expression outerRewritten = outerInference.rewrite(conjunct, outerScope); + if (outerRewritten != null) { + outerPushdownConjuncts.add(outerRewritten); + + // A conjunct can only be pushed down into an inner side if it can be rewritten in + // terms of the outer side + Expression innerRewritten = + potentialNullSymbolInference.rewrite(outerRewritten, innerScope); + if (innerRewritten != null) { + innerPushdownConjuncts.add(innerRewritten); + } + } else { + postJoinConjuncts.add(conjunct); + } + }); + + // See if we can push down any outer effective predicates to the inner side + EqualityInference.nonInferrableConjuncts(metadata, outerEffectivePredicate) + .map(conjunct -> potentialNullSymbolInference.rewrite(conjunct, innerScope)) + .filter(Objects::nonNull) + .forEach(innerPushdownConjuncts::add); + + // See if we can push down join predicates to the inner side + EqualityInference.nonInferrableConjuncts(metadata, joinPredicate) + .forEach( + conjunct -> { + Expression innerRewritten = + potentialNullSymbolInference.rewrite(conjunct, innerScope); + if (innerRewritten != null) { + innerPushdownConjuncts.add(innerRewritten); + } else { + joinConjuncts.add(conjunct); + } + }); + + return new OuterJoinPushDownResult( + combineConjuncts(outerPushdownConjuncts.build()), + combineConjuncts(innerPushdownConjuncts.build()), + combineConjuncts(joinConjuncts.build()), + combineConjuncts(postJoinConjuncts.build())); + } + + static class OuterJoinPushDownResult { + private final Expression outerJoinPredicate; + private final Expression innerJoinPredicate; + private final Expression joinPredicate; + private final Expression postJoinPredicate; + + private OuterJoinPushDownResult( + Expression outerJoinPredicate, + Expression innerJoinPredicate, + Expression joinPredicate, + Expression postJoinPredicate) { + this.outerJoinPredicate = outerJoinPredicate; + this.innerJoinPredicate = innerJoinPredicate; + this.joinPredicate = joinPredicate; + this.postJoinPredicate = postJoinPredicate; + } + + public Expression getOuterJoinPredicate() { + return outerJoinPredicate; + } + + public Expression getInnerJoinPredicate() { + return innerJoinPredicate; + } + + public Expression getJoinPredicate() { + return joinPredicate; + } + + public Expression getPostJoinPredicate() { + return postJoinPredicate; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java index ca27877079d..834fa51d281 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java @@ -56,6 +56,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Pu import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushLimitThroughProject; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveDuplicateConditions; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveRedundantEnforceSingleRowNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveRedundantExists; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveRedundantIdentityProjections; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveTrivialFilters; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveUnreferencedScalarApplyNodes; @@ -183,8 +184,8 @@ public class LogicalOptimizeFactory { // new ReplaceRedundantJoinWithSource(), // new RemoveRedundantJoin(), // new ReplaceRedundantJoinWithProject(), - new RemoveRedundantEnforceSingleRowNode() - // new RemoveRedundantExists(), + new RemoveRedundantEnforceSingleRowNode(), + new RemoveRedundantExists() // new RemoveRedundantWindow(), // new SingleDistinctAggregationToGroupBy(), // new MergeLimitWithDistinct(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index 286c53d91b7..629dfb59fd0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@ -105,6 +105,7 @@ import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizati import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.extractJoinPredicate; import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.joinEqualityExpression; import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.processInnerJoin; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.processLimitedOuterJoin; import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.QueryCardinalityUtil.extractCardinality; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.EQUAL; @@ -578,6 +579,21 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { postJoinPredicate = innerJoinPushDownResult.getPostJoinPredicate(); newJoinPredicate = innerJoinPushDownResult.getJoinPredicate(); break; + case LEFT: + JoinUtils.OuterJoinPushDownResult leftOuterJoinPushDownResult = + processLimitedOuterJoin( + metadata, + inheritedPredicate, + leftEffectivePredicate, + rightEffectivePredicate, + joinPredicate, + node.getLeftChild().getOutputSymbols(), + node.getRightChild().getOutputSymbols()); + leftPredicate = leftOuterJoinPushDownResult.getOuterJoinPredicate(); + rightPredicate = leftOuterJoinPushDownResult.getInnerJoinPredicate(); + postJoinPredicate = leftOuterJoinPushDownResult.getPostJoinPredicate(); + newJoinPredicate = leftOuterJoinPushDownResult.getJoinPredicate(); + break; case FULL: leftPredicate = TRUE_LITERAL; rightPredicate = TRUE_LITERAL; @@ -585,8 +601,8 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { newJoinPredicate = joinPredicate; break; default: - throw new IllegalStateException( - "Only support INNER JOIN and FULL OUTER JOIN in current version"); + throw new IllegalArgumentException( + "Unsupported join type in predicate push down: " + node.getJoinType().name()); } // newJoinPredicate = simplifyExpression(newJoinPredicate);
