asolimando commented on code in PR #4267: URL: https://github.com/apache/calcite/pull/4267#discussion_r2025234307
########## core/src/main/java/org/apache/calcite/rel/rules/ExpandDisjuctionRule.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ControlFlowException; +import org.apache.calcite.util.ImmutableBitSet; + +import com.google.common.collect.Lists; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Rule to expand disjuction in condition of a {@link Filter} or {@link Join}, + * It makes sense to make this optimization part of the predicate pushdown. For example: + * + * <blockquote><pre>{@code + * select t1.name from t1, t2 + * where t1.id = t2.id + * and ( + * (t1.id > 20 and t2.height < 50) + * or + * (t1.weight < 200 and t2.sales > 100) + * ) + * }</pre></blockquote> + * + * <p>we can expand to obtain the condition + * + * <blockquote><pre>{@code Review Comment: I know it's more verbose, but I think it's easier to follow if we show the rewrite proposed for the whole query, can you please add that here? It's 100% clear upon reading the code, but people should be able to get a precise understanding of what to expect from the doc alone. ########## core/src/main/java/org/apache/calcite/rel/rules/ExpandDisjuctionRule.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ControlFlowException; +import org.apache.calcite.util.ImmutableBitSet; + +import com.google.common.collect.Lists; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Rule to expand disjuction in condition of a {@link Filter} or {@link Join}, + * It makes sense to make this optimization part of the predicate pushdown. For example: + * + * <blockquote><pre>{@code + * select t1.name from t1, t2 + * where t1.id = t2.id + * and ( + * (t1.id > 20 and t2.height < 50) + * or + * (t1.weight < 200 and t2.sales > 100) + * ) + * }</pre></blockquote> + * + * <p>we can expand to obtain the condition + * + * <blockquote><pre>{@code + * t1.id > 20 or t1.weight < 200 + * t2.height < 50 or t2.sales > 100 + * }</pre></blockquote> + * + * <p>new generated predicates are redundant, but they could be pushed down to + * scan operator of t1/t2 and reduce the cardinality. + * + * <p>This rule should only be applied once to avoid generate same redundant expression and + * it should be used before {@link CoreRules#FILTER_INTO_JOIN} and {@link CoreRules#JOIN_CONDITION_PUSH}. + * + * @see CoreRules#EXPAND_FILTER_DISJUCTION + * @see CoreRules#EXPAND_JOIN_DISJUCTION + */ [email protected] +public class ExpandDisjuctionRule + extends RelRule<ExpandDisjuctionRule.Config> + implements TransformationRule { + + /** + * Creates a ExpandDisjuctionRule. + */ + protected ExpandDisjuctionRule(Config config) { + super(config); + } + + @Override public void onMatch(RelOptRuleCall call) { + config.matchHandler().accept(this, call); + } + + /** + * Expand predicates for condition of a {@link Filter} or {@link Join}. + * + * @param condition Condition to be expanded in Filter or Join + * @param relNode The Filter or Join node + * @param fieldList The field list of the Filter or Join inputs. Build the referenced columns + * in the predicate as RexInputRef and find out which table the RexInputRef + * comes from via {@link RelMetadataQuery#getExpressionLineage} Review Comment: ```suggestion * comes from {@link RelMetadataQuery#getExpressionLineage} ``` ########## core/src/main/java/org/apache/calcite/rel/rules/ExpandDisjuctionRule.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ControlFlowException; +import org.apache.calcite.util.ImmutableBitSet; + +import com.google.common.collect.Lists; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Rule to expand disjuction in condition of a {@link Filter} or {@link Join}, + * It makes sense to make this optimization part of the predicate pushdown. For example: + * + * <blockquote><pre>{@code + * select t1.name from t1, t2 + * where t1.id = t2.id + * and ( + * (t1.id > 20 and t2.height < 50) + * or + * (t1.weight < 200 and t2.sales > 100) + * ) + * }</pre></blockquote> + * + * <p>we can expand to obtain the condition + * + * <blockquote><pre>{@code + * t1.id > 20 or t1.weight < 200 + * t2.height < 50 or t2.sales > 100 + * }</pre></blockquote> + * + * <p>new generated predicates are redundant, but they could be pushed down to + * scan operator of t1/t2 and reduce the cardinality. + * + * <p>This rule should only be applied once to avoid generate same redundant expression and + * it should be used before {@link CoreRules#FILTER_INTO_JOIN} and {@link CoreRules#JOIN_CONDITION_PUSH}. + * + * @see CoreRules#EXPAND_FILTER_DISJUCTION + * @see CoreRules#EXPAND_JOIN_DISJUCTION + */ [email protected] +public class ExpandDisjuctionRule + extends RelRule<ExpandDisjuctionRule.Config> + implements TransformationRule { + + /** + * Creates a ExpandDisjuctionRule. + */ + protected ExpandDisjuctionRule(Config config) { + super(config); + } + + @Override public void onMatch(RelOptRuleCall call) { + config.matchHandler().accept(this, call); + } + + /** + * Expand predicates for condition of a {@link Filter} or {@link Join}. + * + * @param condition Condition to be expanded in Filter or Join + * @param relNode The Filter or Join node + * @param fieldList The field list of the Filter or Join inputs. Build the referenced columns + * in the predicate as RexInputRef and find out which table the RexInputRef + * comes from via {@link RelMetadataQuery#getExpressionLineage} + * @param relBuilder Builder + * @param mq RelMetadataQuery, which is used to get the expression lineage + * + * @return Expanded predicates + */ + private RexNode apply( + RexNode condition, + RelNode relNode, + List<RelDataTypeField> fieldList, + RelBuilder relBuilder, + RelMetadataQuery mq) { + HashMap<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef = new HashMap<>(); + ImmutableBitSet columnBits = RelOptUtil.InputFinder.bits(condition); + if (columnBits.isEmpty()) { + return condition; + } + // Trace which table does the column referenced in the condition come from + for (int columnBit : columnBits.asList()) { + Set<RexNode> exprLineage = + mq.getExpressionLineage( + relNode, + RexInputRef.of(columnBit, fieldList)); + // If mq.getExpressionLineage cannot get result, skip it + if (exprLineage == null) { + continue; + } + Set<RexTableInputRef.RelTableRef> relTableRefs = + RexUtil.gatherTableReferences(Lists.newArrayList(exprLineage)); + // If the column come from multiple tables, skip it + if (relTableRefs.isEmpty() || relTableRefs.size() > 1) { + continue; + } + inputRefToTableRef.put(columnBit, relTableRefs.iterator().next()); + } + + ExpandDisjuctionHelper expandHelper = + new ExpandDisjuctionHelper(inputRefToTableRef, relBuilder, config.processLimit()); + Map<RexTableInputRef.RelTableRef, RexNode> expandResult = expandHelper.expand(condition); + RexNode newCondition = condition; + for (RexNode expandCondition : expandResult.values()) { + newCondition = relBuilder.and(newCondition, expandCondition); + } + return newCondition; + } + + private static void matchFilter(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Filter filter = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + RexNode newCondition = + rule.apply( + filter.getCondition(), + filter, + filter.getRowType().getFieldList(), + relBuilder, + mq); + if (newCondition.equals(filter.getCondition())) { + return; + } + Filter newFilter = filter.copy(filter.getTraitSet(), filter.getInput(), newCondition); + call.transformTo(newFilter); + } + + private static void matchJoin(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Join join = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + List<RelDataTypeField> fieldList = + Lists.newArrayList(join.getLeft().getRowType().getFieldList()); + fieldList.addAll(join.getRight().getRowType().getFieldList()); + RexNode newCondition = + rule.apply( + join.getCondition(), + join, + fieldList, + relBuilder, + mq); + if (newCondition.equals(join.getCondition())) { + return; + } + Join newJoin = + join.copy( + join.getTraitSet(), + newCondition, + join.getLeft(), + join.getRight(), + join.getJoinType(), + join.isSemiJoinDone()); + call.transformTo(newJoin); + } + + /** + * Helper class to expand predicates. + */ + private static class ExpandDisjuctionHelper { + + private final Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef; + + private final RelBuilder relBuilder; + + private final int maxProcessLimitNodes; + + private int currentCount; + + private ExpandDisjuctionHelper( + Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef, + RelBuilder relBuilder, + int maxProcessLimitNodes) { + this.inputRefToTableRef = inputRefToTableRef; + this.relBuilder = relBuilder; + this.maxProcessLimitNodes = maxProcessLimitNodes; + } + + private Map<RexTableInputRef.RelTableRef, RexNode> expand(RexNode condition) { + try { + this.currentCount = 0; + return expandDeep(condition); + } catch (OverLimitException e) { + return new HashMap<>(); + } + } + + /** + * Expand predicates recursively that can be pushed down to single table. + * + * @param condition Predicate to be expanded + * @return Additional predicates that can be pushed down for each table + */ + private Map<RexTableInputRef.RelTableRef, RexNode> expandDeep(RexNode condition) { + incrementAndCheck(); + Map<RexTableInputRef.RelTableRef, RexNode> additionalConditions = new HashMap<>(); + ImmutableBitSet inputRefs = RelOptUtil.InputFinder.bits(condition); + if (inputRefs.isEmpty()) { + return additionalConditions; + } + RexTableInputRef.RelTableRef tableRef = inputRefsBelongOneTable(inputRefs); + // The condition already belongs to one table, return it directly + if (tableRef != null) { + additionalConditions.put(tableRef, condition); + return additionalConditions; + } + + // Recursively expand the expression according to whether it is a conjunction + // or a disjunction. If it is neither a disjunction nor a conjunction, it cannot + // be expanded further and an empty Map is returned. + switch (condition.getKind()) { + case AND: + List<RexNode> andOperands = RexUtil.flattenAnd(((RexCall) condition).getOperands()); + for (RexNode andOperand : andOperands) { + Map<RexTableInputRef.RelTableRef, RexNode> operandResult = expandDeep(andOperand); + mergeAnd(additionalConditions, operandResult); + } + break; + case OR: + List<RexNode> orOperands = RexUtil.flattenOr(((RexCall) condition).getOperands()); + additionalConditions.putAll(expandDeep(orOperands.get(0))); Review Comment: Nit: can't you handle the case for element 0 in the for loop as well? ########## core/src/main/java/org/apache/calcite/rel/rules/ExpandDisjuctionRule.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ControlFlowException; +import org.apache.calcite.util.ImmutableBitSet; + +import com.google.common.collect.Lists; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Rule to expand disjuction in condition of a {@link Filter} or {@link Join}, + * It makes sense to make this optimization part of the predicate pushdown. For example: + * + * <blockquote><pre>{@code + * select t1.name from t1, t2 + * where t1.id = t2.id + * and ( + * (t1.id > 20 and t2.height < 50) + * or + * (t1.weight < 200 and t2.sales > 100) + * ) + * }</pre></blockquote> + * + * <p>we can expand to obtain the condition + * + * <blockquote><pre>{@code + * t1.id > 20 or t1.weight < 200 + * t2.height < 50 or t2.sales > 100 + * }</pre></blockquote> + * + * <p>new generated predicates are redundant, but they could be pushed down to + * scan operator of t1/t2 and reduce the cardinality. + * + * <p>This rule should only be applied once to avoid generate same redundant expression and + * it should be used before {@link CoreRules#FILTER_INTO_JOIN} and {@link CoreRules#JOIN_CONDITION_PUSH}. + * + * @see CoreRules#EXPAND_FILTER_DISJUCTION + * @see CoreRules#EXPAND_JOIN_DISJUCTION + */ [email protected] +public class ExpandDisjuctionRule + extends RelRule<ExpandDisjuctionRule.Config> + implements TransformationRule { + + /** + * Creates a ExpandDisjuctionRule. + */ + protected ExpandDisjuctionRule(Config config) { + super(config); + } + + @Override public void onMatch(RelOptRuleCall call) { + config.matchHandler().accept(this, call); + } + + /** + * Expand predicates for condition of a {@link Filter} or {@link Join}. + * + * @param condition Condition to be expanded in Filter or Join + * @param relNode The Filter or Join node + * @param fieldList The field list of the Filter or Join inputs. Build the referenced columns + * in the predicate as RexInputRef and find out which table the RexInputRef + * comes from via {@link RelMetadataQuery#getExpressionLineage} + * @param relBuilder Builder + * @param mq RelMetadataQuery, which is used to get the expression lineage + * + * @return Expanded predicates + */ + private RexNode apply( + RexNode condition, + RelNode relNode, + List<RelDataTypeField> fieldList, + RelBuilder relBuilder, + RelMetadataQuery mq) { + HashMap<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef = new HashMap<>(); + ImmutableBitSet columnBits = RelOptUtil.InputFinder.bits(condition); + if (columnBits.isEmpty()) { + return condition; + } + // Trace which table does the column referenced in the condition come from + for (int columnBit : columnBits.asList()) { + Set<RexNode> exprLineage = + mq.getExpressionLineage( + relNode, + RexInputRef.of(columnBit, fieldList)); + // If mq.getExpressionLineage cannot get result, skip it + if (exprLineage == null) { + continue; + } + Set<RexTableInputRef.RelTableRef> relTableRefs = + RexUtil.gatherTableReferences(Lists.newArrayList(exprLineage)); + // If the column come from multiple tables, skip it + if (relTableRefs.isEmpty() || relTableRefs.size() > 1) { + continue; + } + inputRefToTableRef.put(columnBit, relTableRefs.iterator().next()); + } + + ExpandDisjuctionHelper expandHelper = + new ExpandDisjuctionHelper(inputRefToTableRef, relBuilder, config.processLimit()); + Map<RexTableInputRef.RelTableRef, RexNode> expandResult = expandHelper.expand(condition); + RexNode newCondition = condition; + for (RexNode expandCondition : expandResult.values()) { + newCondition = relBuilder.and(newCondition, expandCondition); + } + return newCondition; + } + + private static void matchFilter(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Filter filter = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + RexNode newCondition = + rule.apply( + filter.getCondition(), + filter, + filter.getRowType().getFieldList(), + relBuilder, + mq); + if (newCondition.equals(filter.getCondition())) { + return; + } + Filter newFilter = filter.copy(filter.getTraitSet(), filter.getInput(), newCondition); + call.transformTo(newFilter); + } + + private static void matchJoin(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Join join = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + List<RelDataTypeField> fieldList = + Lists.newArrayList(join.getLeft().getRowType().getFieldList()); + fieldList.addAll(join.getRight().getRowType().getFieldList()); + RexNode newCondition = + rule.apply( + join.getCondition(), + join, + fieldList, + relBuilder, + mq); + if (newCondition.equals(join.getCondition())) { + return; + } + Join newJoin = + join.copy( + join.getTraitSet(), + newCondition, + join.getLeft(), + join.getRight(), + join.getJoinType(), + join.isSemiJoinDone()); + call.transformTo(newJoin); + } + + /** + * Helper class to expand predicates. + */ + private static class ExpandDisjuctionHelper { + + private final Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef; + + private final RelBuilder relBuilder; + + private final int maxProcessLimitNodes; + + private int currentCount; + + private ExpandDisjuctionHelper( + Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef, + RelBuilder relBuilder, + int maxProcessLimitNodes) { + this.inputRefToTableRef = inputRefToTableRef; + this.relBuilder = relBuilder; + this.maxProcessLimitNodes = maxProcessLimitNodes; + } + + private Map<RexTableInputRef.RelTableRef, RexNode> expand(RexNode condition) { + try { + this.currentCount = 0; + return expandDeep(condition); + } catch (OverLimitException e) { + return new HashMap<>(); + } + } + + /** + * Expand predicates recursively that can be pushed down to single table. + * + * @param condition Predicate to be expanded + * @return Additional predicates that can be pushed down for each table + */ + private Map<RexTableInputRef.RelTableRef, RexNode> expandDeep(RexNode condition) { + incrementAndCheck(); + Map<RexTableInputRef.RelTableRef, RexNode> additionalConditions = new HashMap<>(); + ImmutableBitSet inputRefs = RelOptUtil.InputFinder.bits(condition); + if (inputRefs.isEmpty()) { + return additionalConditions; + } + RexTableInputRef.RelTableRef tableRef = inputRefsBelongOneTable(inputRefs); + // The condition already belongs to one table, return it directly + if (tableRef != null) { + additionalConditions.put(tableRef, condition); + return additionalConditions; + } + + // Recursively expand the expression according to whether it is a conjunction + // or a disjunction. If it is neither a disjunction nor a conjunction, it cannot + // be expanded further and an empty Map is returned. + switch (condition.getKind()) { + case AND: + List<RexNode> andOperands = RexUtil.flattenAnd(((RexCall) condition).getOperands()); + for (RexNode andOperand : andOperands) { + Map<RexTableInputRef.RelTableRef, RexNode> operandResult = expandDeep(andOperand); + mergeAnd(additionalConditions, operandResult); + } + break; + case OR: + List<RexNode> orOperands = RexUtil.flattenOr(((RexCall) condition).getOperands()); + additionalConditions.putAll(expandDeep(orOperands.get(0))); + for (int i = 1; i < orOperands.size(); i++) { + Map<RexTableInputRef.RelTableRef, RexNode> operandResult = + expandDeep(orOperands.get(i)); + mergeOr(additionalConditions, operandResult); + } + break; + default: + break; + } + + return additionalConditions; + } + + private RexTableInputRef.@Nullable RelTableRef inputRefsBelongOneTable( + ImmutableBitSet inputRefs) { + RexTableInputRef.RelTableRef tableRef = inputRefToTableRef.get(inputRefs.nth(0)); + if (tableRef == null) { + return null; + } + for (int inputBit : inputRefs.asList()) { + RexTableInputRef.RelTableRef inputBitTableRef = inputRefToTableRef.get(inputBit); + if (inputBitTableRef == null || !inputBitTableRef.equals(tableRef)) { Review Comment: ```suggestion if (!tableRef.equals(inputBitTableRef)) { ``` you know from line 275 that `tableRef` is not null, `equals` will return `false` in case `inputBitTableRef` is `null` or different, so you can handle both cases in a single shot ########## core/src/main/java/org/apache/calcite/rel/rules/ExpandDisjuctionRule.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ControlFlowException; +import org.apache.calcite.util.ImmutableBitSet; + +import com.google.common.collect.Lists; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Rule to expand disjuction in condition of a {@link Filter} or {@link Join}, + * It makes sense to make this optimization part of the predicate pushdown. For example: + * + * <blockquote><pre>{@code + * select t1.name from t1, t2 + * where t1.id = t2.id + * and ( + * (t1.id > 20 and t2.height < 50) + * or + * (t1.weight < 200 and t2.sales > 100) + * ) + * }</pre></blockquote> + * + * <p>we can expand to obtain the condition + * + * <blockquote><pre>{@code + * t1.id > 20 or t1.weight < 200 + * t2.height < 50 or t2.sales > 100 + * }</pre></blockquote> + * + * <p>new generated predicates are redundant, but they could be pushed down to + * scan operator of t1/t2 and reduce the cardinality. + * + * <p>This rule should only be applied once to avoid generate same redundant expression and + * it should be used before {@link CoreRules#FILTER_INTO_JOIN} and {@link CoreRules#JOIN_CONDITION_PUSH}. + * + * @see CoreRules#EXPAND_FILTER_DISJUCTION + * @see CoreRules#EXPAND_JOIN_DISJUCTION + */ [email protected] +public class ExpandDisjuctionRule + extends RelRule<ExpandDisjuctionRule.Config> + implements TransformationRule { + + /** + * Creates a ExpandDisjuctionRule. + */ + protected ExpandDisjuctionRule(Config config) { + super(config); + } + + @Override public void onMatch(RelOptRuleCall call) { + config.matchHandler().accept(this, call); + } + + /** + * Expand predicates for condition of a {@link Filter} or {@link Join}. + * + * @param condition Condition to be expanded in Filter or Join + * @param relNode The Filter or Join node + * @param fieldList The field list of the Filter or Join inputs. Build the referenced columns + * in the predicate as RexInputRef and find out which table the RexInputRef + * comes from via {@link RelMetadataQuery#getExpressionLineage} + * @param relBuilder Builder + * @param mq RelMetadataQuery, which is used to get the expression lineage + * + * @return Expanded predicates + */ + private RexNode apply( + RexNode condition, + RelNode relNode, + List<RelDataTypeField> fieldList, + RelBuilder relBuilder, + RelMetadataQuery mq) { + HashMap<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef = new HashMap<>(); + ImmutableBitSet columnBits = RelOptUtil.InputFinder.bits(condition); + if (columnBits.isEmpty()) { + return condition; + } + // Trace which table does the column referenced in the condition come from + for (int columnBit : columnBits.asList()) { + Set<RexNode> exprLineage = + mq.getExpressionLineage( + relNode, + RexInputRef.of(columnBit, fieldList)); + // If mq.getExpressionLineage cannot get result, skip it + if (exprLineage == null) { + continue; + } + Set<RexTableInputRef.RelTableRef> relTableRefs = + RexUtil.gatherTableReferences(Lists.newArrayList(exprLineage)); + // If the column come from multiple tables, skip it + if (relTableRefs.isEmpty() || relTableRefs.size() > 1) { + continue; + } + inputRefToTableRef.put(columnBit, relTableRefs.iterator().next()); + } + + ExpandDisjuctionHelper expandHelper = + new ExpandDisjuctionHelper(inputRefToTableRef, relBuilder, config.processLimit()); + Map<RexTableInputRef.RelTableRef, RexNode> expandResult = expandHelper.expand(condition); + RexNode newCondition = condition; + for (RexNode expandCondition : expandResult.values()) { + newCondition = relBuilder.and(newCondition, expandCondition); + } + return newCondition; + } + + private static void matchFilter(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Filter filter = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + RexNode newCondition = + rule.apply( + filter.getCondition(), + filter, + filter.getRowType().getFieldList(), + relBuilder, + mq); + if (newCondition.equals(filter.getCondition())) { + return; + } + Filter newFilter = filter.copy(filter.getTraitSet(), filter.getInput(), newCondition); + call.transformTo(newFilter); + } + + private static void matchJoin(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Join join = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + List<RelDataTypeField> fieldList = + Lists.newArrayList(join.getLeft().getRowType().getFieldList()); + fieldList.addAll(join.getRight().getRowType().getFieldList()); + RexNode newCondition = + rule.apply( + join.getCondition(), + join, + fieldList, + relBuilder, + mq); + if (newCondition.equals(join.getCondition())) { + return; + } + Join newJoin = + join.copy( + join.getTraitSet(), + newCondition, + join.getLeft(), + join.getRight(), + join.getJoinType(), + join.isSemiJoinDone()); + call.transformTo(newJoin); + } + + /** + * Helper class to expand predicates. + */ + private static class ExpandDisjuctionHelper { + + private final Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef; + + private final RelBuilder relBuilder; + + private final int maxProcessLimitNodes; + + private int currentCount; + + private ExpandDisjuctionHelper( + Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef, + RelBuilder relBuilder, + int maxProcessLimitNodes) { + this.inputRefToTableRef = inputRefToTableRef; + this.relBuilder = relBuilder; + this.maxProcessLimitNodes = maxProcessLimitNodes; + } + + private Map<RexTableInputRef.RelTableRef, RexNode> expand(RexNode condition) { + try { + this.currentCount = 0; + return expandDeep(condition); + } catch (OverLimitException e) { + return new HashMap<>(); + } + } + + /** + * Expand predicates recursively that can be pushed down to single table. + * + * @param condition Predicate to be expanded + * @return Additional predicates that can be pushed down for each table + */ + private Map<RexTableInputRef.RelTableRef, RexNode> expandDeep(RexNode condition) { + incrementAndCheck(); + Map<RexTableInputRef.RelTableRef, RexNode> additionalConditions = new HashMap<>(); + ImmutableBitSet inputRefs = RelOptUtil.InputFinder.bits(condition); + if (inputRefs.isEmpty()) { + return additionalConditions; + } + RexTableInputRef.RelTableRef tableRef = inputRefsBelongOneTable(inputRefs); + // The condition already belongs to one table, return it directly + if (tableRef != null) { + additionalConditions.put(tableRef, condition); + return additionalConditions; + } + + // Recursively expand the expression according to whether it is a conjunction + // or a disjunction. If it is neither a disjunction nor a conjunction, it cannot + // be expanded further and an empty Map is returned. + switch (condition.getKind()) { + case AND: + List<RexNode> andOperands = RexUtil.flattenAnd(((RexCall) condition).getOperands()); + for (RexNode andOperand : andOperands) { + Map<RexTableInputRef.RelTableRef, RexNode> operandResult = expandDeep(andOperand); + mergeAnd(additionalConditions, operandResult); + } + break; + case OR: + List<RexNode> orOperands = RexUtil.flattenOr(((RexCall) condition).getOperands()); + additionalConditions.putAll(expandDeep(orOperands.get(0))); + for (int i = 1; i < orOperands.size(); i++) { + Map<RexTableInputRef.RelTableRef, RexNode> operandResult = + expandDeep(orOperands.get(i)); + mergeOr(additionalConditions, operandResult); + } + break; + default: + break; + } + + return additionalConditions; + } + + private RexTableInputRef.@Nullable RelTableRef inputRefsBelongOneTable( + ImmutableBitSet inputRefs) { + RexTableInputRef.RelTableRef tableRef = inputRefToTableRef.get(inputRefs.nth(0)); + if (tableRef == null) { + return null; + } + for (int inputBit : inputRefs.asList()) { + RexTableInputRef.RelTableRef inputBitTableRef = inputRefToTableRef.get(inputBit); + if (inputBitTableRef == null || !inputBitTableRef.equals(tableRef)) { + return null; + } + } + return tableRef; + } + + /** + * For additional predicates of each operand in conjuction, all of them should be retained and + * use 'AND' to combine expressions. + * Review Comment: Can you provide a simple example in the javadoc explaining what the method does? similarly for `mergeOr`. ########## core/src/main/java/org/apache/calcite/rel/rules/ExpandDisjuctionRule.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ControlFlowException; +import org.apache.calcite.util.ImmutableBitSet; + +import com.google.common.collect.Lists; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Rule to expand disjuction in condition of a {@link Filter} or {@link Join}, + * It makes sense to make this optimization part of the predicate pushdown. For example: + * + * <blockquote><pre>{@code + * select t1.name from t1, t2 + * where t1.id = t2.id + * and ( + * (t1.id > 20 and t2.height < 50) + * or + * (t1.weight < 200 and t2.sales > 100) + * ) + * }</pre></blockquote> + * + * <p>we can expand to obtain the condition + * + * <blockquote><pre>{@code + * t1.id > 20 or t1.weight < 200 + * t2.height < 50 or t2.sales > 100 + * }</pre></blockquote> + * + * <p>new generated predicates are redundant, but they could be pushed down to + * scan operator of t1/t2 and reduce the cardinality. + * + * <p>This rule should only be applied once to avoid generate same redundant expression and + * it should be used before {@link CoreRules#FILTER_INTO_JOIN} and {@link CoreRules#JOIN_CONDITION_PUSH}. + * + * @see CoreRules#EXPAND_FILTER_DISJUCTION + * @see CoreRules#EXPAND_JOIN_DISJUCTION + */ [email protected] +public class ExpandDisjuctionRule + extends RelRule<ExpandDisjuctionRule.Config> + implements TransformationRule { + + /** + * Creates a ExpandDisjuctionRule. + */ + protected ExpandDisjuctionRule(Config config) { + super(config); + } + + @Override public void onMatch(RelOptRuleCall call) { + config.matchHandler().accept(this, call); + } + + /** + * Expand predicates for condition of a {@link Filter} or {@link Join}. + * + * @param condition Condition to be expanded in Filter or Join + * @param relNode The Filter or Join node + * @param fieldList The field list of the Filter or Join inputs. Build the referenced columns + * in the predicate as RexInputRef and find out which table the RexInputRef + * comes from via {@link RelMetadataQuery#getExpressionLineage} + * @param relBuilder Builder + * @param mq RelMetadataQuery, which is used to get the expression lineage + * + * @return Expanded predicates + */ + private RexNode apply( + RexNode condition, + RelNode relNode, + List<RelDataTypeField> fieldList, + RelBuilder relBuilder, + RelMetadataQuery mq) { + HashMap<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef = new HashMap<>(); Review Comment: Prefer interface to concrete classes unless there is a good reason not to, you might consider using the `final` modifier to stress the fact that the variable is not going to be overwritten later on (the method is long enough that such hint helps), in this and other cases where relevant. ########## core/src/main/java/org/apache/calcite/rel/rules/ExpandDisjuctionRule.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ControlFlowException; +import org.apache.calcite.util.ImmutableBitSet; + +import com.google.common.collect.Lists; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Rule to expand disjuction in condition of a {@link Filter} or {@link Join}, + * It makes sense to make this optimization part of the predicate pushdown. For example: + * + * <blockquote><pre>{@code + * select t1.name from t1, t2 + * where t1.id = t2.id + * and ( + * (t1.id > 20 and t2.height < 50) + * or + * (t1.weight < 200 and t2.sales > 100) + * ) + * }</pre></blockquote> + * + * <p>we can expand to obtain the condition + * + * <blockquote><pre>{@code + * t1.id > 20 or t1.weight < 200 + * t2.height < 50 or t2.sales > 100 + * }</pre></blockquote> + * + * <p>new generated predicates are redundant, but they could be pushed down to + * scan operator of t1/t2 and reduce the cardinality. + * + * <p>This rule should only be applied once to avoid generate same redundant expression and + * it should be used before {@link CoreRules#FILTER_INTO_JOIN} and {@link CoreRules#JOIN_CONDITION_PUSH}. + * + * @see CoreRules#EXPAND_FILTER_DISJUCTION + * @see CoreRules#EXPAND_JOIN_DISJUCTION + */ [email protected] +public class ExpandDisjuctionRule + extends RelRule<ExpandDisjuctionRule.Config> + implements TransformationRule { + + /** + * Creates a ExpandDisjuctionRule. + */ + protected ExpandDisjuctionRule(Config config) { + super(config); + } + + @Override public void onMatch(RelOptRuleCall call) { + config.matchHandler().accept(this, call); + } + + /** + * Expand predicates for condition of a {@link Filter} or {@link Join}. + * + * @param condition Condition to be expanded in Filter or Join + * @param relNode The Filter or Join node + * @param fieldList The field list of the Filter or Join inputs. Build the referenced columns + * in the predicate as RexInputRef and find out which table the RexInputRef + * comes from via {@link RelMetadataQuery#getExpressionLineage} + * @param relBuilder Builder + * @param mq RelMetadataQuery, which is used to get the expression lineage + * + * @return Expanded predicates + */ + private RexNode apply( + RexNode condition, + RelNode relNode, + List<RelDataTypeField> fieldList, + RelBuilder relBuilder, + RelMetadataQuery mq) { + HashMap<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef = new HashMap<>(); + ImmutableBitSet columnBits = RelOptUtil.InputFinder.bits(condition); + if (columnBits.isEmpty()) { + return condition; + } + // Trace which table does the column referenced in the condition come from + for (int columnBit : columnBits.asList()) { + Set<RexNode> exprLineage = + mq.getExpressionLineage( + relNode, + RexInputRef.of(columnBit, fieldList)); + // If mq.getExpressionLineage cannot get result, skip it + if (exprLineage == null) { + continue; + } + Set<RexTableInputRef.RelTableRef> relTableRefs = + RexUtil.gatherTableReferences(Lists.newArrayList(exprLineage)); + // If the column come from multiple tables, skip it Review Comment: ```suggestion // If the column comes from multiple tables, skip it ``` ########## core/src/main/java/org/apache/calcite/rel/rules/ExpandDisjuctionRule.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ControlFlowException; +import org.apache.calcite.util.ImmutableBitSet; + +import com.google.common.collect.Lists; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Rule to expand disjuction in condition of a {@link Filter} or {@link Join}, + * It makes sense to make this optimization part of the predicate pushdown. For example: + * + * <blockquote><pre>{@code + * select t1.name from t1, t2 + * where t1.id = t2.id + * and ( + * (t1.id > 20 and t2.height < 50) + * or + * (t1.weight < 200 and t2.sales > 100) + * ) + * }</pre></blockquote> + * + * <p>we can expand to obtain the condition + * + * <blockquote><pre>{@code + * t1.id > 20 or t1.weight < 200 + * t2.height < 50 or t2.sales > 100 + * }</pre></blockquote> + * + * <p>new generated predicates are redundant, but they could be pushed down to + * scan operator of t1/t2 and reduce the cardinality. + * + * <p>This rule should only be applied once to avoid generate same redundant expression and + * it should be used before {@link CoreRules#FILTER_INTO_JOIN} and {@link CoreRules#JOIN_CONDITION_PUSH}. + * + * @see CoreRules#EXPAND_FILTER_DISJUCTION + * @see CoreRules#EXPAND_JOIN_DISJUCTION + */ [email protected] +public class ExpandDisjuctionRule + extends RelRule<ExpandDisjuctionRule.Config> + implements TransformationRule { + + /** + * Creates a ExpandDisjuctionRule. + */ + protected ExpandDisjuctionRule(Config config) { + super(config); + } + + @Override public void onMatch(RelOptRuleCall call) { + config.matchHandler().accept(this, call); + } + + /** + * Expand predicates for condition of a {@link Filter} or {@link Join}. + * + * @param condition Condition to be expanded in Filter or Join + * @param relNode The Filter or Join node + * @param fieldList The field list of the Filter or Join inputs. Build the referenced columns + * in the predicate as RexInputRef and find out which table the RexInputRef + * comes from via {@link RelMetadataQuery#getExpressionLineage} + * @param relBuilder Builder + * @param mq RelMetadataQuery, which is used to get the expression lineage + * + * @return Expanded predicates + */ + private RexNode apply( + RexNode condition, + RelNode relNode, + List<RelDataTypeField> fieldList, + RelBuilder relBuilder, + RelMetadataQuery mq) { + HashMap<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef = new HashMap<>(); + ImmutableBitSet columnBits = RelOptUtil.InputFinder.bits(condition); + if (columnBits.isEmpty()) { + return condition; + } + // Trace which table does the column referenced in the condition come from + for (int columnBit : columnBits.asList()) { + Set<RexNode> exprLineage = + mq.getExpressionLineage( + relNode, + RexInputRef.of(columnBit, fieldList)); + // If mq.getExpressionLineage cannot get result, skip it + if (exprLineage == null) { + continue; + } + Set<RexTableInputRef.RelTableRef> relTableRefs = + RexUtil.gatherTableReferences(Lists.newArrayList(exprLineage)); + // If the column come from multiple tables, skip it + if (relTableRefs.isEmpty() || relTableRefs.size() > 1) { + continue; + } + inputRefToTableRef.put(columnBit, relTableRefs.iterator().next()); + } + + ExpandDisjuctionHelper expandHelper = + new ExpandDisjuctionHelper(inputRefToTableRef, relBuilder, config.processLimit()); + Map<RexTableInputRef.RelTableRef, RexNode> expandResult = expandHelper.expand(condition); + RexNode newCondition = condition; + for (RexNode expandCondition : expandResult.values()) { + newCondition = relBuilder.and(newCondition, expandCondition); + } + return newCondition; + } + + private static void matchFilter(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Filter filter = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + RexNode newCondition = + rule.apply( + filter.getCondition(), + filter, + filter.getRowType().getFieldList(), + relBuilder, + mq); + if (newCondition.equals(filter.getCondition())) { + return; + } + Filter newFilter = filter.copy(filter.getTraitSet(), filter.getInput(), newCondition); + call.transformTo(newFilter); + } + + private static void matchJoin(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Join join = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + List<RelDataTypeField> fieldList = + Lists.newArrayList(join.getLeft().getRowType().getFieldList()); + fieldList.addAll(join.getRight().getRowType().getFieldList()); + RexNode newCondition = + rule.apply( + join.getCondition(), + join, + fieldList, + relBuilder, + mq); + if (newCondition.equals(join.getCondition())) { + return; + } + Join newJoin = + join.copy( + join.getTraitSet(), + newCondition, + join.getLeft(), + join.getRight(), + join.getJoinType(), + join.isSemiJoinDone()); + call.transformTo(newJoin); + } + + /** + * Helper class to expand predicates. + */ + private static class ExpandDisjuctionHelper { + + private final Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef; + + private final RelBuilder relBuilder; + + private final int maxProcessLimitNodes; + + private int currentCount; + + private ExpandDisjuctionHelper( + Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef, + RelBuilder relBuilder, + int maxProcessLimitNodes) { + this.inputRefToTableRef = inputRefToTableRef; + this.relBuilder = relBuilder; + this.maxProcessLimitNodes = maxProcessLimitNodes; + } + + private Map<RexTableInputRef.RelTableRef, RexNode> expand(RexNode condition) { + try { + this.currentCount = 0; + return expandDeep(condition); + } catch (OverLimitException e) { + return new HashMap<>(); + } + } + + /** + * Expand predicates recursively that can be pushed down to single table. + * + * @param condition Predicate to be expanded + * @return Additional predicates that can be pushed down for each table + */ + private Map<RexTableInputRef.RelTableRef, RexNode> expandDeep(RexNode condition) { + incrementAndCheck(); + Map<RexTableInputRef.RelTableRef, RexNode> additionalConditions = new HashMap<>(); + ImmutableBitSet inputRefs = RelOptUtil.InputFinder.bits(condition); + if (inputRefs.isEmpty()) { + return additionalConditions; + } + RexTableInputRef.RelTableRef tableRef = inputRefsBelongOneTable(inputRefs); + // The condition already belongs to one table, return it directly + if (tableRef != null) { + additionalConditions.put(tableRef, condition); + return additionalConditions; + } + + // Recursively expand the expression according to whether it is a conjunction + // or a disjunction. If it is neither a disjunction nor a conjunction, it cannot + // be expanded further and an empty Map is returned. + switch (condition.getKind()) { + case AND: + List<RexNode> andOperands = RexUtil.flattenAnd(((RexCall) condition).getOperands()); + for (RexNode andOperand : andOperands) { + Map<RexTableInputRef.RelTableRef, RexNode> operandResult = expandDeep(andOperand); + mergeAnd(additionalConditions, operandResult); + } + break; + case OR: + List<RexNode> orOperands = RexUtil.flattenOr(((RexCall) condition).getOperands()); + additionalConditions.putAll(expandDeep(orOperands.get(0))); + for (int i = 1; i < orOperands.size(); i++) { + Map<RexTableInputRef.RelTableRef, RexNode> operandResult = + expandDeep(orOperands.get(i)); + mergeOr(additionalConditions, operandResult); + } + break; + default: + break; + } + + return additionalConditions; + } + + private RexTableInputRef.@Nullable RelTableRef inputRefsBelongOneTable( + ImmutableBitSet inputRefs) { + RexTableInputRef.RelTableRef tableRef = inputRefToTableRef.get(inputRefs.nth(0)); + if (tableRef == null) { + return null; + } + for (int inputBit : inputRefs.asList()) { + RexTableInputRef.RelTableRef inputBitTableRef = inputRefToTableRef.get(inputBit); + if (inputBitTableRef == null || !inputBitTableRef.equals(tableRef)) { + return null; + } + } + return tableRef; + } + + /** + * For additional predicates of each operand in conjuction, all of them should be retained and Review Comment: conjuNction (I saw Mihai spotted a similar mispelling for disjunction), can you do a massive grep on this? ########## core/src/main/java/org/apache/calcite/rel/rules/ExpandDisjuctionRule.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ControlFlowException; +import org.apache.calcite.util.ImmutableBitSet; + +import com.google.common.collect.Lists; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Rule to expand disjuction in condition of a {@link Filter} or {@link Join}, + * It makes sense to make this optimization part of the predicate pushdown. For example: + * + * <blockquote><pre>{@code + * select t1.name from t1, t2 + * where t1.id = t2.id + * and ( + * (t1.id > 20 and t2.height < 50) + * or + * (t1.weight < 200 and t2.sales > 100) + * ) + * }</pre></blockquote> + * + * <p>we can expand to obtain the condition + * + * <blockquote><pre>{@code + * t1.id > 20 or t1.weight < 200 + * t2.height < 50 or t2.sales > 100 + * }</pre></blockquote> + * + * <p>new generated predicates are redundant, but they could be pushed down to + * scan operator of t1/t2 and reduce the cardinality. + * + * <p>This rule should only be applied once to avoid generate same redundant expression and + * it should be used before {@link CoreRules#FILTER_INTO_JOIN} and {@link CoreRules#JOIN_CONDITION_PUSH}. + * + * @see CoreRules#EXPAND_FILTER_DISJUCTION + * @see CoreRules#EXPAND_JOIN_DISJUCTION + */ [email protected] +public class ExpandDisjuctionRule + extends RelRule<ExpandDisjuctionRule.Config> + implements TransformationRule { + + /** + * Creates a ExpandDisjuctionRule. + */ + protected ExpandDisjuctionRule(Config config) { + super(config); + } + + @Override public void onMatch(RelOptRuleCall call) { + config.matchHandler().accept(this, call); + } + + /** + * Expand predicates for condition of a {@link Filter} or {@link Join}. + * + * @param condition Condition to be expanded in Filter or Join + * @param relNode The Filter or Join node + * @param fieldList The field list of the Filter or Join inputs. Build the referenced columns + * in the predicate as RexInputRef and find out which table the RexInputRef + * comes from via {@link RelMetadataQuery#getExpressionLineage} + * @param relBuilder Builder + * @param mq RelMetadataQuery, which is used to get the expression lineage + * + * @return Expanded predicates + */ + private RexNode apply( + RexNode condition, + RelNode relNode, + List<RelDataTypeField> fieldList, + RelBuilder relBuilder, + RelMetadataQuery mq) { + HashMap<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef = new HashMap<>(); + ImmutableBitSet columnBits = RelOptUtil.InputFinder.bits(condition); Review Comment: Nit: you should group together (i.e., no newline) lines that make sense to be considered together (like a variable used in a loop coming right after it, or a variable used in the following statement), and split groups of lines that are conceptually separated, like "extract input ref and their mappings to tables" and "computing the new filter conditions". I know it's not a deal breaker, but I find it quite hard to read as-is, it would be nice if you could do a pass over the code to improve readability. Here, for instance, I'd put a newline between 112 and 113 (which can go with the `if` that follows as they are strictly related). For formatting, you can generally take a look around at similar code to take inspiration. ########## core/src/main/java/org/apache/calcite/rel/rules/ExpandDisjuctionRule.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ControlFlowException; +import org.apache.calcite.util.ImmutableBitSet; + +import com.google.common.collect.Lists; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Rule to expand disjuction in condition of a {@link Filter} or {@link Join}, + * It makes sense to make this optimization part of the predicate pushdown. For example: + * + * <blockquote><pre>{@code + * select t1.name from t1, t2 + * where t1.id = t2.id + * and ( + * (t1.id > 20 and t2.height < 50) + * or + * (t1.weight < 200 and t2.sales > 100) + * ) + * }</pre></blockquote> + * + * <p>we can expand to obtain the condition + * + * <blockquote><pre>{@code + * t1.id > 20 or t1.weight < 200 + * t2.height < 50 or t2.sales > 100 + * }</pre></blockquote> + * + * <p>new generated predicates are redundant, but they could be pushed down to + * scan operator of t1/t2 and reduce the cardinality. + * + * <p>This rule should only be applied once to avoid generate same redundant expression and + * it should be used before {@link CoreRules#FILTER_INTO_JOIN} and {@link CoreRules#JOIN_CONDITION_PUSH}. + * + * @see CoreRules#EXPAND_FILTER_DISJUCTION + * @see CoreRules#EXPAND_JOIN_DISJUCTION + */ [email protected] +public class ExpandDisjuctionRule + extends RelRule<ExpandDisjuctionRule.Config> + implements TransformationRule { + + /** + * Creates a ExpandDisjuctionRule. + */ + protected ExpandDisjuctionRule(Config config) { + super(config); + } + + @Override public void onMatch(RelOptRuleCall call) { + config.matchHandler().accept(this, call); + } + + /** + * Expand predicates for condition of a {@link Filter} or {@link Join}. + * + * @param condition Condition to be expanded in Filter or Join + * @param relNode The Filter or Join node + * @param fieldList The field list of the Filter or Join inputs. Build the referenced columns + * in the predicate as RexInputRef and find out which table the RexInputRef + * comes from via {@link RelMetadataQuery#getExpressionLineage} + * @param relBuilder Builder + * @param mq RelMetadataQuery, which is used to get the expression lineage + * + * @return Expanded predicates + */ + private RexNode apply( + RexNode condition, + RelNode relNode, + List<RelDataTypeField> fieldList, + RelBuilder relBuilder, + RelMetadataQuery mq) { + HashMap<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef = new HashMap<>(); + ImmutableBitSet columnBits = RelOptUtil.InputFinder.bits(condition); + if (columnBits.isEmpty()) { + return condition; + } + // Trace which table does the column referenced in the condition come from + for (int columnBit : columnBits.asList()) { + Set<RexNode> exprLineage = + mq.getExpressionLineage( + relNode, + RexInputRef.of(columnBit, fieldList)); + // If mq.getExpressionLineage cannot get result, skip it + if (exprLineage == null) { + continue; + } + Set<RexTableInputRef.RelTableRef> relTableRefs = + RexUtil.gatherTableReferences(Lists.newArrayList(exprLineage)); + // If the column come from multiple tables, skip it + if (relTableRefs.isEmpty() || relTableRefs.size() > 1) { + continue; + } + inputRefToTableRef.put(columnBit, relTableRefs.iterator().next()); + } + + ExpandDisjuctionHelper expandHelper = + new ExpandDisjuctionHelper(inputRefToTableRef, relBuilder, config.processLimit()); + Map<RexTableInputRef.RelTableRef, RexNode> expandResult = expandHelper.expand(condition); + RexNode newCondition = condition; + for (RexNode expandCondition : expandResult.values()) { + newCondition = relBuilder.and(newCondition, expandCondition); + } + return newCondition; + } + + private static void matchFilter(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Filter filter = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + RexNode newCondition = + rule.apply( + filter.getCondition(), + filter, + filter.getRowType().getFieldList(), + relBuilder, + mq); + if (newCondition.equals(filter.getCondition())) { + return; + } + Filter newFilter = filter.copy(filter.getTraitSet(), filter.getInput(), newCondition); + call.transformTo(newFilter); + } + + private static void matchJoin(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Join join = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + List<RelDataTypeField> fieldList = + Lists.newArrayList(join.getLeft().getRowType().getFieldList()); + fieldList.addAll(join.getRight().getRowType().getFieldList()); + RexNode newCondition = + rule.apply( + join.getCondition(), + join, + fieldList, + relBuilder, + mq); + if (newCondition.equals(join.getCondition())) { + return; + } + Join newJoin = + join.copy( + join.getTraitSet(), + newCondition, + join.getLeft(), + join.getRight(), + join.getJoinType(), + join.isSemiJoinDone()); + call.transformTo(newJoin); + } + + /** + * Helper class to expand predicates. + */ + private static class ExpandDisjuctionHelper { + + private final Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef; + + private final RelBuilder relBuilder; + + private final int maxProcessLimitNodes; + + private int currentCount; + + private ExpandDisjuctionHelper( + Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef, + RelBuilder relBuilder, + int maxProcessLimitNodes) { + this.inputRefToTableRef = inputRefToTableRef; + this.relBuilder = relBuilder; + this.maxProcessLimitNodes = maxProcessLimitNodes; + } + + private Map<RexTableInputRef.RelTableRef, RexNode> expand(RexNode condition) { + try { + this.currentCount = 0; + return expandDeep(condition); + } catch (OverLimitException e) { + return new HashMap<>(); + } + } + + /** + * Expand predicates recursively that can be pushed down to single table. + * + * @param condition Predicate to be expanded + * @return Additional predicates that can be pushed down for each table + */ + private Map<RexTableInputRef.RelTableRef, RexNode> expandDeep(RexNode condition) { + incrementAndCheck(); + Map<RexTableInputRef.RelTableRef, RexNode> additionalConditions = new HashMap<>(); + ImmutableBitSet inputRefs = RelOptUtil.InputFinder.bits(condition); + if (inputRefs.isEmpty()) { + return additionalConditions; + } + RexTableInputRef.RelTableRef tableRef = inputRefsBelongOneTable(inputRefs); + // The condition already belongs to one table, return it directly + if (tableRef != null) { + additionalConditions.put(tableRef, condition); + return additionalConditions; + } + + // Recursively expand the expression according to whether it is a conjunction + // or a disjunction. If it is neither a disjunction nor a conjunction, it cannot + // be expanded further and an empty Map is returned. + switch (condition.getKind()) { + case AND: + List<RexNode> andOperands = RexUtil.flattenAnd(((RexCall) condition).getOperands()); + for (RexNode andOperand : andOperands) { + Map<RexTableInputRef.RelTableRef, RexNode> operandResult = expandDeep(andOperand); + mergeAnd(additionalConditions, operandResult); + } + break; + case OR: + List<RexNode> orOperands = RexUtil.flattenOr(((RexCall) condition).getOperands()); + additionalConditions.putAll(expandDeep(orOperands.get(0))); + for (int i = 1; i < orOperands.size(); i++) { + Map<RexTableInputRef.RelTableRef, RexNode> operandResult = + expandDeep(orOperands.get(i)); + mergeOr(additionalConditions, operandResult); + } + break; + default: + break; + } + + return additionalConditions; + } + + private RexTableInputRef.@Nullable RelTableRef inputRefsBelongOneTable( + ImmutableBitSet inputRefs) { + RexTableInputRef.RelTableRef tableRef = inputRefToTableRef.get(inputRefs.nth(0)); + if (tableRef == null) { + return null; + } + for (int inputBit : inputRefs.asList()) { + RexTableInputRef.RelTableRef inputBitTableRef = inputRefToTableRef.get(inputBit); + if (inputBitTableRef == null || !inputBitTableRef.equals(tableRef)) { + return null; + } + } + return tableRef; + } + + /** + * For additional predicates of each operand in conjuction, all of them should be retained and + * use 'AND' to combine expressions. + * + * @param baseMap Additional predicates that current conjunction has already saved + * @param forMergeMap Additional predicates that current operand has expanded + */ + private void mergeAnd( + Map<RexTableInputRef.RelTableRef, RexNode> baseMap, + Map<RexTableInputRef.RelTableRef, RexNode> forMergeMap) { + for (Map.Entry<RexTableInputRef.RelTableRef, RexNode> entry : forMergeMap.entrySet()) { + RexNode mergeExpression = + relBuilder.and( + entry.getValue(), + baseMap.getOrDefault(entry.getKey(), relBuilder.literal(true))); + baseMap.put(entry.getKey(), mergeExpression); + } + } + + /** + * Only if all operands in disjunction have additional predicates for table t1, we use 'OR' + * to combine expressions and retain them as additional predicates of table t1. + * + * @param baseMap Additional predicates that current disjunction has already saved + * @param forMergeMap Additional predicates that current operand has expanded + */ + private void mergeOr( + Map<RexTableInputRef.RelTableRef, RexNode> baseMap, + Map<RexTableInputRef.RelTableRef, RexNode> forMergeMap) { + if (baseMap.isEmpty()) { + return; + } + + Iterator<Map.Entry<RexTableInputRef.RelTableRef, RexNode>> iterator = + baseMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<RexTableInputRef.RelTableRef, RexNode> entry = iterator.next(); + if (!forMergeMap.containsKey(entry.getKey())) { + iterator.remove(); + continue; + } + RexNode mergedRex = + relBuilder.or( + entry.getValue(), + forMergeMap.get(entry.getKey())); + baseMap.put(entry.getKey(), mergedRex); + } + } + + private void incrementAndCheck() { + if (maxProcessLimitNodes > 0 && ++currentCount > maxProcessLimitNodes) { + throw OverLimitException.INSTANCE; Review Comment: throwing creates a stacktrace, which is a very expensive operation, we should use a boolean guard (possibly in an array if mutating it in other places is needed) to indicate that we should "exit", you can probably take inspiration from how [RexShuttle does it](https://github.com/apache/calcite/blob/b394abca407eea06ee8f8b50f30bd443f286bb42/core/src/main/java/org/apache/calcite/rex/RexShuttle.java#L42) ########## core/src/main/java/org/apache/calcite/rel/rules/ExpandDisjuctionRule.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ControlFlowException; +import org.apache.calcite.util.ImmutableBitSet; + +import com.google.common.collect.Lists; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Rule to expand disjuction in condition of a {@link Filter} or {@link Join}, + * It makes sense to make this optimization part of the predicate pushdown. For example: + * + * <blockquote><pre>{@code + * select t1.name from t1, t2 + * where t1.id = t2.id + * and ( + * (t1.id > 20 and t2.height < 50) + * or + * (t1.weight < 200 and t2.sales > 100) + * ) + * }</pre></blockquote> + * + * <p>we can expand to obtain the condition + * + * <blockquote><pre>{@code + * t1.id > 20 or t1.weight < 200 + * t2.height < 50 or t2.sales > 100 + * }</pre></blockquote> + * + * <p>new generated predicates are redundant, but they could be pushed down to + * scan operator of t1/t2 and reduce the cardinality. + * + * <p>This rule should only be applied once to avoid generate same redundant expression and + * it should be used before {@link CoreRules#FILTER_INTO_JOIN} and {@link CoreRules#JOIN_CONDITION_PUSH}. + * + * @see CoreRules#EXPAND_FILTER_DISJUCTION + * @see CoreRules#EXPAND_JOIN_DISJUCTION + */ [email protected] +public class ExpandDisjuctionRule + extends RelRule<ExpandDisjuctionRule.Config> + implements TransformationRule { + + /** + * Creates a ExpandDisjuctionRule. + */ + protected ExpandDisjuctionRule(Config config) { + super(config); + } + + @Override public void onMatch(RelOptRuleCall call) { + config.matchHandler().accept(this, call); + } + + /** + * Expand predicates for condition of a {@link Filter} or {@link Join}. + * + * @param condition Condition to be expanded in Filter or Join + * @param relNode The Filter or Join node + * @param fieldList The field list of the Filter or Join inputs. Build the referenced columns + * in the predicate as RexInputRef and find out which table the RexInputRef + * comes from via {@link RelMetadataQuery#getExpressionLineage} + * @param relBuilder Builder + * @param mq RelMetadataQuery, which is used to get the expression lineage + * + * @return Expanded predicates + */ + private RexNode apply( + RexNode condition, + RelNode relNode, + List<RelDataTypeField> fieldList, + RelBuilder relBuilder, + RelMetadataQuery mq) { + HashMap<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef = new HashMap<>(); + ImmutableBitSet columnBits = RelOptUtil.InputFinder.bits(condition); + if (columnBits.isEmpty()) { + return condition; + } + // Trace which table does the column referenced in the condition come from + for (int columnBit : columnBits.asList()) { + Set<RexNode> exprLineage = + mq.getExpressionLineage( + relNode, + RexInputRef.of(columnBit, fieldList)); + // If mq.getExpressionLineage cannot get result, skip it + if (exprLineage == null) { + continue; + } + Set<RexTableInputRef.RelTableRef> relTableRefs = + RexUtil.gatherTableReferences(Lists.newArrayList(exprLineage)); + // If the column come from multiple tables, skip it + if (relTableRefs.isEmpty() || relTableRefs.size() > 1) { + continue; + } + inputRefToTableRef.put(columnBit, relTableRefs.iterator().next()); + } + + ExpandDisjuctionHelper expandHelper = + new ExpandDisjuctionHelper(inputRefToTableRef, relBuilder, config.processLimit()); + Map<RexTableInputRef.RelTableRef, RexNode> expandResult = expandHelper.expand(condition); + RexNode newCondition = condition; + for (RexNode expandCondition : expandResult.values()) { + newCondition = relBuilder.and(newCondition, expandCondition); + } + return newCondition; + } + + private static void matchFilter(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Filter filter = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + RexNode newCondition = + rule.apply( + filter.getCondition(), + filter, + filter.getRowType().getFieldList(), + relBuilder, + mq); + if (newCondition.equals(filter.getCondition())) { + return; + } + Filter newFilter = filter.copy(filter.getTraitSet(), filter.getInput(), newCondition); + call.transformTo(newFilter); + } + + private static void matchJoin(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Join join = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + List<RelDataTypeField> fieldList = + Lists.newArrayList(join.getLeft().getRowType().getFieldList()); + fieldList.addAll(join.getRight().getRowType().getFieldList()); + RexNode newCondition = + rule.apply( + join.getCondition(), + join, + fieldList, + relBuilder, + mq); + if (newCondition.equals(join.getCondition())) { + return; + } + Join newJoin = + join.copy( + join.getTraitSet(), + newCondition, + join.getLeft(), + join.getRight(), + join.getJoinType(), + join.isSemiJoinDone()); + call.transformTo(newJoin); + } + + /** + * Helper class to expand predicates. + */ + private static class ExpandDisjuctionHelper { + + private final Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef; + + private final RelBuilder relBuilder; + + private final int maxProcessLimitNodes; + + private int currentCount; + + private ExpandDisjuctionHelper( + Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef, + RelBuilder relBuilder, + int maxProcessLimitNodes) { + this.inputRefToTableRef = inputRefToTableRef; + this.relBuilder = relBuilder; + this.maxProcessLimitNodes = maxProcessLimitNodes; + } + + private Map<RexTableInputRef.RelTableRef, RexNode> expand(RexNode condition) { + try { + this.currentCount = 0; + return expandDeep(condition); + } catch (OverLimitException e) { + return new HashMap<>(); + } + } + + /** + * Expand predicates recursively that can be pushed down to single table. + * + * @param condition Predicate to be expanded + * @return Additional predicates that can be pushed down for each table + */ + private Map<RexTableInputRef.RelTableRef, RexNode> expandDeep(RexNode condition) { + incrementAndCheck(); + Map<RexTableInputRef.RelTableRef, RexNode> additionalConditions = new HashMap<>(); + ImmutableBitSet inputRefs = RelOptUtil.InputFinder.bits(condition); + if (inputRefs.isEmpty()) { + return additionalConditions; + } + RexTableInputRef.RelTableRef tableRef = inputRefsBelongOneTable(inputRefs); + // The condition already belongs to one table, return it directly + if (tableRef != null) { + additionalConditions.put(tableRef, condition); + return additionalConditions; + } + + // Recursively expand the expression according to whether it is a conjunction + // or a disjunction. If it is neither a disjunction nor a conjunction, it cannot + // be expanded further and an empty Map is returned. + switch (condition.getKind()) { + case AND: + List<RexNode> andOperands = RexUtil.flattenAnd(((RexCall) condition).getOperands()); + for (RexNode andOperand : andOperands) { + Map<RexTableInputRef.RelTableRef, RexNode> operandResult = expandDeep(andOperand); + mergeAnd(additionalConditions, operandResult); + } + break; + case OR: + List<RexNode> orOperands = RexUtil.flattenOr(((RexCall) condition).getOperands()); + additionalConditions.putAll(expandDeep(orOperands.get(0))); + for (int i = 1; i < orOperands.size(); i++) { + Map<RexTableInputRef.RelTableRef, RexNode> operandResult = + expandDeep(orOperands.get(i)); + mergeOr(additionalConditions, operandResult); + } + break; + default: + break; + } + + return additionalConditions; + } + + private RexTableInputRef.@Nullable RelTableRef inputRefsBelongOneTable( Review Comment: Nit: inputRefsBelongToOneTable? ########## core/src/main/java/org/apache/calcite/rel/rules/ExpandDisjuctionRule.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ControlFlowException; +import org.apache.calcite.util.ImmutableBitSet; + +import com.google.common.collect.Lists; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Rule to expand disjuction in condition of a {@link Filter} or {@link Join}, + * It makes sense to make this optimization part of the predicate pushdown. For example: + * + * <blockquote><pre>{@code + * select t1.name from t1, t2 + * where t1.id = t2.id + * and ( + * (t1.id > 20 and t2.height < 50) + * or + * (t1.weight < 200 and t2.sales > 100) + * ) + * }</pre></blockquote> + * + * <p>we can expand to obtain the condition + * + * <blockquote><pre>{@code + * t1.id > 20 or t1.weight < 200 + * t2.height < 50 or t2.sales > 100 + * }</pre></blockquote> + * + * <p>new generated predicates are redundant, but they could be pushed down to + * scan operator of t1/t2 and reduce the cardinality. + * + * <p>This rule should only be applied once to avoid generate same redundant expression and + * it should be used before {@link CoreRules#FILTER_INTO_JOIN} and {@link CoreRules#JOIN_CONDITION_PUSH}. + * + * @see CoreRules#EXPAND_FILTER_DISJUCTION + * @see CoreRules#EXPAND_JOIN_DISJUCTION + */ [email protected] +public class ExpandDisjuctionRule + extends RelRule<ExpandDisjuctionRule.Config> + implements TransformationRule { + + /** + * Creates a ExpandDisjuctionRule. + */ + protected ExpandDisjuctionRule(Config config) { + super(config); + } + + @Override public void onMatch(RelOptRuleCall call) { + config.matchHandler().accept(this, call); + } + + /** + * Expand predicates for condition of a {@link Filter} or {@link Join}. + * + * @param condition Condition to be expanded in Filter or Join + * @param relNode The Filter or Join node + * @param fieldList The field list of the Filter or Join inputs. Build the referenced columns + * in the predicate as RexInputRef and find out which table the RexInputRef + * comes from via {@link RelMetadataQuery#getExpressionLineage} + * @param relBuilder Builder + * @param mq RelMetadataQuery, which is used to get the expression lineage + * + * @return Expanded predicates + */ + private RexNode apply( + RexNode condition, + RelNode relNode, + List<RelDataTypeField> fieldList, + RelBuilder relBuilder, + RelMetadataQuery mq) { + HashMap<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef = new HashMap<>(); + ImmutableBitSet columnBits = RelOptUtil.InputFinder.bits(condition); + if (columnBits.isEmpty()) { + return condition; + } + // Trace which table does the column referenced in the condition come from + for (int columnBit : columnBits.asList()) { + Set<RexNode> exprLineage = + mq.getExpressionLineage( + relNode, + RexInputRef.of(columnBit, fieldList)); + // If mq.getExpressionLineage cannot get result, skip it + if (exprLineage == null) { + continue; + } + Set<RexTableInputRef.RelTableRef> relTableRefs = + RexUtil.gatherTableReferences(Lists.newArrayList(exprLineage)); + // If the column come from multiple tables, skip it + if (relTableRefs.isEmpty() || relTableRefs.size() > 1) { + continue; + } + inputRefToTableRef.put(columnBit, relTableRefs.iterator().next()); + } + + ExpandDisjuctionHelper expandHelper = + new ExpandDisjuctionHelper(inputRefToTableRef, relBuilder, config.processLimit()); + Map<RexTableInputRef.RelTableRef, RexNode> expandResult = expandHelper.expand(condition); + RexNode newCondition = condition; + for (RexNode expandCondition : expandResult.values()) { + newCondition = relBuilder.and(newCondition, expandCondition); + } + return newCondition; + } + + private static void matchFilter(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Filter filter = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + RexNode newCondition = + rule.apply( + filter.getCondition(), + filter, + filter.getRowType().getFieldList(), + relBuilder, + mq); + if (newCondition.equals(filter.getCondition())) { + return; + } + Filter newFilter = filter.copy(filter.getTraitSet(), filter.getInput(), newCondition); + call.transformTo(newFilter); + } + + private static void matchJoin(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Join join = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + List<RelDataTypeField> fieldList = + Lists.newArrayList(join.getLeft().getRowType().getFieldList()); + fieldList.addAll(join.getRight().getRowType().getFieldList()); + RexNode newCondition = + rule.apply( + join.getCondition(), + join, + fieldList, + relBuilder, + mq); + if (newCondition.equals(join.getCondition())) { + return; + } + Join newJoin = + join.copy( + join.getTraitSet(), + newCondition, + join.getLeft(), + join.getRight(), + join.getJoinType(), + join.isSemiJoinDone()); + call.transformTo(newJoin); + } + + /** + * Helper class to expand predicates. + */ + private static class ExpandDisjuctionHelper { + + private final Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef; + + private final RelBuilder relBuilder; + + private final int maxProcessLimitNodes; + + private int currentCount; + + private ExpandDisjuctionHelper( Review Comment: This reminds me somewhat a `RexVisitor`, can you take a look at some implementations in the project to see if it can fit your needs? ########## core/src/main/java/org/apache/calcite/rel/rules/ExpandDisjuctionRule.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ControlFlowException; +import org.apache.calcite.util.ImmutableBitSet; + +import com.google.common.collect.Lists; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Rule to expand disjuction in condition of a {@link Filter} or {@link Join}, + * It makes sense to make this optimization part of the predicate pushdown. For example: + * + * <blockquote><pre>{@code + * select t1.name from t1, t2 + * where t1.id = t2.id + * and ( + * (t1.id > 20 and t2.height < 50) + * or + * (t1.weight < 200 and t2.sales > 100) + * ) + * }</pre></blockquote> + * + * <p>we can expand to obtain the condition + * + * <blockquote><pre>{@code + * t1.id > 20 or t1.weight < 200 + * t2.height < 50 or t2.sales > 100 + * }</pre></blockquote> + * + * <p>new generated predicates are redundant, but they could be pushed down to + * scan operator of t1/t2 and reduce the cardinality. + * + * <p>This rule should only be applied once to avoid generate same redundant expression and + * it should be used before {@link CoreRules#FILTER_INTO_JOIN} and {@link CoreRules#JOIN_CONDITION_PUSH}. + * + * @see CoreRules#EXPAND_FILTER_DISJUCTION + * @see CoreRules#EXPAND_JOIN_DISJUCTION + */ [email protected] +public class ExpandDisjuctionRule + extends RelRule<ExpandDisjuctionRule.Config> + implements TransformationRule { + + /** + * Creates a ExpandDisjuctionRule. + */ + protected ExpandDisjuctionRule(Config config) { + super(config); + } + + @Override public void onMatch(RelOptRuleCall call) { + config.matchHandler().accept(this, call); + } + + /** + * Expand predicates for condition of a {@link Filter} or {@link Join}. + * + * @param condition Condition to be expanded in Filter or Join + * @param relNode The Filter or Join node + * @param fieldList The field list of the Filter or Join inputs. Build the referenced columns + * in the predicate as RexInputRef and find out which table the RexInputRef + * comes from via {@link RelMetadataQuery#getExpressionLineage} + * @param relBuilder Builder + * @param mq RelMetadataQuery, which is used to get the expression lineage + * + * @return Expanded predicates + */ + private RexNode apply( + RexNode condition, + RelNode relNode, + List<RelDataTypeField> fieldList, + RelBuilder relBuilder, + RelMetadataQuery mq) { + HashMap<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef = new HashMap<>(); + ImmutableBitSet columnBits = RelOptUtil.InputFinder.bits(condition); + if (columnBits.isEmpty()) { + return condition; + } + // Trace which table does the column referenced in the condition come from + for (int columnBit : columnBits.asList()) { + Set<RexNode> exprLineage = + mq.getExpressionLineage( + relNode, + RexInputRef.of(columnBit, fieldList)); + // If mq.getExpressionLineage cannot get result, skip it + if (exprLineage == null) { + continue; + } + Set<RexTableInputRef.RelTableRef> relTableRefs = + RexUtil.gatherTableReferences(Lists.newArrayList(exprLineage)); + // If the column come from multiple tables, skip it + if (relTableRefs.isEmpty() || relTableRefs.size() > 1) { + continue; + } + inputRefToTableRef.put(columnBit, relTableRefs.iterator().next()); + } + + ExpandDisjuctionHelper expandHelper = + new ExpandDisjuctionHelper(inputRefToTableRef, relBuilder, config.processLimit()); + Map<RexTableInputRef.RelTableRef, RexNode> expandResult = expandHelper.expand(condition); + RexNode newCondition = condition; + for (RexNode expandCondition : expandResult.values()) { + newCondition = relBuilder.and(newCondition, expandCondition); + } + return newCondition; + } + + private static void matchFilter(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Filter filter = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + RexNode newCondition = + rule.apply( + filter.getCondition(), + filter, + filter.getRowType().getFieldList(), + relBuilder, + mq); + if (newCondition.equals(filter.getCondition())) { + return; + } + Filter newFilter = filter.copy(filter.getTraitSet(), filter.getInput(), newCondition); + call.transformTo(newFilter); + } + + private static void matchJoin(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Join join = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + List<RelDataTypeField> fieldList = + Lists.newArrayList(join.getLeft().getRowType().getFieldList()); + fieldList.addAll(join.getRight().getRowType().getFieldList()); + RexNode newCondition = + rule.apply( + join.getCondition(), + join, + fieldList, + relBuilder, + mq); + if (newCondition.equals(join.getCondition())) { + return; + } + Join newJoin = + join.copy( + join.getTraitSet(), + newCondition, + join.getLeft(), + join.getRight(), + join.getJoinType(), + join.isSemiJoinDone()); + call.transformTo(newJoin); + } + + /** + * Helper class to expand predicates. + */ + private static class ExpandDisjuctionHelper { + + private final Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef; + + private final RelBuilder relBuilder; + + private final int maxProcessLimitNodes; + + private int currentCount; + + private ExpandDisjuctionHelper( + Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef, + RelBuilder relBuilder, + int maxProcessLimitNodes) { + this.inputRefToTableRef = inputRefToTableRef; + this.relBuilder = relBuilder; + this.maxProcessLimitNodes = maxProcessLimitNodes; + } + + private Map<RexTableInputRef.RelTableRef, RexNode> expand(RexNode condition) { + try { + this.currentCount = 0; + return expandDeep(condition); + } catch (OverLimitException e) { + return new HashMap<>(); + } + } + + /** + * Expand predicates recursively that can be pushed down to single table. + * + * @param condition Predicate to be expanded + * @return Additional predicates that can be pushed down for each table + */ + private Map<RexTableInputRef.RelTableRef, RexNode> expandDeep(RexNode condition) { + incrementAndCheck(); + Map<RexTableInputRef.RelTableRef, RexNode> additionalConditions = new HashMap<>(); + ImmutableBitSet inputRefs = RelOptUtil.InputFinder.bits(condition); + if (inputRefs.isEmpty()) { + return additionalConditions; + } + RexTableInputRef.RelTableRef tableRef = inputRefsBelongOneTable(inputRefs); + // The condition already belongs to one table, return it directly + if (tableRef != null) { + additionalConditions.put(tableRef, condition); + return additionalConditions; + } + + // Recursively expand the expression according to whether it is a conjunction + // or a disjunction. If it is neither a disjunction nor a conjunction, it cannot + // be expanded further and an empty Map is returned. + switch (condition.getKind()) { + case AND: + List<RexNode> andOperands = RexUtil.flattenAnd(((RexCall) condition).getOperands()); + for (RexNode andOperand : andOperands) { + Map<RexTableInputRef.RelTableRef, RexNode> operandResult = expandDeep(andOperand); + mergeAnd(additionalConditions, operandResult); + } + break; + case OR: + List<RexNode> orOperands = RexUtil.flattenOr(((RexCall) condition).getOperands()); + additionalConditions.putAll(expandDeep(orOperands.get(0))); + for (int i = 1; i < orOperands.size(); i++) { + Map<RexTableInputRef.RelTableRef, RexNode> operandResult = + expandDeep(orOperands.get(i)); + mergeOr(additionalConditions, operandResult); + } + break; + default: + break; + } + + return additionalConditions; + } + + private RexTableInputRef.@Nullable RelTableRef inputRefsBelongOneTable( + ImmutableBitSet inputRefs) { + RexTableInputRef.RelTableRef tableRef = inputRefToTableRef.get(inputRefs.nth(0)); + if (tableRef == null) { + return null; + } + for (int inputBit : inputRefs.asList()) { + RexTableInputRef.RelTableRef inputBitTableRef = inputRefToTableRef.get(inputBit); + if (inputBitTableRef == null || !inputBitTableRef.equals(tableRef)) { + return null; + } + } + return tableRef; + } + + /** + * For additional predicates of each operand in conjuction, all of them should be retained and + * use 'AND' to combine expressions. + * + * @param baseMap Additional predicates that current conjunction has already saved + * @param forMergeMap Additional predicates that current operand has expanded + */ + private void mergeAnd( + Map<RexTableInputRef.RelTableRef, RexNode> baseMap, + Map<RexTableInputRef.RelTableRef, RexNode> forMergeMap) { + for (Map.Entry<RexTableInputRef.RelTableRef, RexNode> entry : forMergeMap.entrySet()) { + RexNode mergeExpression = + relBuilder.and( + entry.getValue(), + baseMap.getOrDefault(entry.getKey(), relBuilder.literal(true))); + baseMap.put(entry.getKey(), mergeExpression); + } + } + + /** + * Only if all operands in disjunction have additional predicates for table t1, we use 'OR' + * to combine expressions and retain them as additional predicates of table t1. + * + * @param baseMap Additional predicates that current disjunction has already saved + * @param forMergeMap Additional predicates that current operand has expanded + */ + private void mergeOr( + Map<RexTableInputRef.RelTableRef, RexNode> baseMap, + Map<RexTableInputRef.RelTableRef, RexNode> forMergeMap) { + if (baseMap.isEmpty()) { + return; + } + + Iterator<Map.Entry<RexTableInputRef.RelTableRef, RexNode>> iterator = + baseMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<RexTableInputRef.RelTableRef, RexNode> entry = iterator.next(); + if (!forMergeMap.containsKey(entry.getKey())) { + iterator.remove(); + continue; + } + RexNode mergedRex = + relBuilder.or( + entry.getValue(), + forMergeMap.get(entry.getKey())); + baseMap.put(entry.getKey(), mergedRex); + } + } + + private void incrementAndCheck() { + if (maxProcessLimitNodes > 0 && ++currentCount > maxProcessLimitNodes) { + throw OverLimitException.INSTANCE; + } + } + + /** Exception to catch when we pass the limit. */ + private static class OverLimitException extends ControlFlowException { + protected static final OverLimitException INSTANCE = new OverLimitException(); + + private OverLimitException() {} + } + } + + /** Rule configuration. */ + @Value.Immutable(singleton = false) + public interface Config extends RelRule.Config { + Config FILTER = ImmutableExpandDisjuctionRule.Config.builder() + .withMatchHandler(ExpandDisjuctionRule::matchFilter) + .build() + .withOperandSupplier(b -> + b.operand(Filter.class).anyInputs()); + + Config JOIN = ImmutableExpandDisjuctionRule.Config.builder() + .withMatchHandler(ExpandDisjuctionRule::matchJoin) + .build() + .withOperandSupplier(b -> + b.operand(Join.class).anyInputs()); + + @Override default ExpandDisjuctionRule toRule() { + return new ExpandDisjuctionRule(this); + } + + default int processLimit() { + return 10000; Review Comment: This is configurable, right? 10k is just the default? ########## core/src/main/java/org/apache/calcite/rel/rules/ExpandDisjuctionRule.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ControlFlowException; +import org.apache.calcite.util.ImmutableBitSet; + +import com.google.common.collect.Lists; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Rule to expand disjuction in condition of a {@link Filter} or {@link Join}, + * It makes sense to make this optimization part of the predicate pushdown. For example: + * + * <blockquote><pre>{@code + * select t1.name from t1, t2 + * where t1.id = t2.id + * and ( + * (t1.id > 20 and t2.height < 50) + * or + * (t1.weight < 200 and t2.sales > 100) + * ) + * }</pre></blockquote> + * + * <p>we can expand to obtain the condition + * + * <blockquote><pre>{@code + * t1.id > 20 or t1.weight < 200 + * t2.height < 50 or t2.sales > 100 + * }</pre></blockquote> + * + * <p>new generated predicates are redundant, but they could be pushed down to + * scan operator of t1/t2 and reduce the cardinality. + * + * <p>This rule should only be applied once to avoid generate same redundant expression and + * it should be used before {@link CoreRules#FILTER_INTO_JOIN} and {@link CoreRules#JOIN_CONDITION_PUSH}. + * + * @see CoreRules#EXPAND_FILTER_DISJUCTION + * @see CoreRules#EXPAND_JOIN_DISJUCTION + */ [email protected] +public class ExpandDisjuctionRule + extends RelRule<ExpandDisjuctionRule.Config> + implements TransformationRule { + + /** + * Creates a ExpandDisjuctionRule. + */ + protected ExpandDisjuctionRule(Config config) { + super(config); + } + + @Override public void onMatch(RelOptRuleCall call) { + config.matchHandler().accept(this, call); + } + + /** + * Expand predicates for condition of a {@link Filter} or {@link Join}. + * + * @param condition Condition to be expanded in Filter or Join + * @param relNode The Filter or Join node + * @param fieldList The field list of the Filter or Join inputs. Build the referenced columns + * in the predicate as RexInputRef and find out which table the RexInputRef + * comes from via {@link RelMetadataQuery#getExpressionLineage} + * @param relBuilder Builder + * @param mq RelMetadataQuery, which is used to get the expression lineage + * + * @return Expanded predicates + */ + private RexNode apply( + RexNode condition, + RelNode relNode, + List<RelDataTypeField> fieldList, + RelBuilder relBuilder, + RelMetadataQuery mq) { + HashMap<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef = new HashMap<>(); + ImmutableBitSet columnBits = RelOptUtil.InputFinder.bits(condition); + if (columnBits.isEmpty()) { + return condition; + } + // Trace which table does the column referenced in the condition come from + for (int columnBit : columnBits.asList()) { + Set<RexNode> exprLineage = + mq.getExpressionLineage( + relNode, + RexInputRef.of(columnBit, fieldList)); + // If mq.getExpressionLineage cannot get result, skip it + if (exprLineage == null) { + continue; + } + Set<RexTableInputRef.RelTableRef> relTableRefs = + RexUtil.gatherTableReferences(Lists.newArrayList(exprLineage)); + // If the column come from multiple tables, skip it + if (relTableRefs.isEmpty() || relTableRefs.size() > 1) { + continue; + } + inputRefToTableRef.put(columnBit, relTableRefs.iterator().next()); + } + + ExpandDisjuctionHelper expandHelper = + new ExpandDisjuctionHelper(inputRefToTableRef, relBuilder, config.processLimit()); + Map<RexTableInputRef.RelTableRef, RexNode> expandResult = expandHelper.expand(condition); + RexNode newCondition = condition; + for (RexNode expandCondition : expandResult.values()) { + newCondition = relBuilder.and(newCondition, expandCondition); + } + return newCondition; + } + + private static void matchFilter(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Filter filter = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + RexNode newCondition = + rule.apply( + filter.getCondition(), + filter, + filter.getRowType().getFieldList(), + relBuilder, + mq); + if (newCondition.equals(filter.getCondition())) { + return; + } + Filter newFilter = filter.copy(filter.getTraitSet(), filter.getInput(), newCondition); + call.transformTo(newFilter); + } + + private static void matchJoin(ExpandDisjuctionRule rule, RelOptRuleCall call) { + Join join = call.rel(0); + RelMetadataQuery mq = call.getMetadataQuery(); + RelBuilder relBuilder = call.builder(); + + List<RelDataTypeField> fieldList = + Lists.newArrayList(join.getLeft().getRowType().getFieldList()); + fieldList.addAll(join.getRight().getRowType().getFieldList()); + RexNode newCondition = + rule.apply( + join.getCondition(), + join, + fieldList, + relBuilder, + mq); + if (newCondition.equals(join.getCondition())) { + return; + } + Join newJoin = + join.copy( + join.getTraitSet(), + newCondition, + join.getLeft(), + join.getRight(), + join.getJoinType(), + join.isSemiJoinDone()); + call.transformTo(newJoin); + } + + /** + * Helper class to expand predicates. + */ + private static class ExpandDisjuctionHelper { + + private final Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef; + + private final RelBuilder relBuilder; + + private final int maxProcessLimitNodes; + + private int currentCount; + + private ExpandDisjuctionHelper( + Map<Integer, RexTableInputRef.RelTableRef> inputRefToTableRef, + RelBuilder relBuilder, + int maxProcessLimitNodes) { + this.inputRefToTableRef = inputRefToTableRef; + this.relBuilder = relBuilder; + this.maxProcessLimitNodes = maxProcessLimitNodes; + } + + private Map<RexTableInputRef.RelTableRef, RexNode> expand(RexNode condition) { + try { + this.currentCount = 0; + return expandDeep(condition); + } catch (OverLimitException e) { + return new HashMap<>(); + } + } + + /** + * Expand predicates recursively that can be pushed down to single table. + * + * @param condition Predicate to be expanded + * @return Additional predicates that can be pushed down for each table + */ + private Map<RexTableInputRef.RelTableRef, RexNode> expandDeep(RexNode condition) { + incrementAndCheck(); + Map<RexTableInputRef.RelTableRef, RexNode> additionalConditions = new HashMap<>(); + ImmutableBitSet inputRefs = RelOptUtil.InputFinder.bits(condition); + if (inputRefs.isEmpty()) { + return additionalConditions; + } + RexTableInputRef.RelTableRef tableRef = inputRefsBelongOneTable(inputRefs); + // The condition already belongs to one table, return it directly + if (tableRef != null) { + additionalConditions.put(tableRef, condition); + return additionalConditions; + } + + // Recursively expand the expression according to whether it is a conjunction + // or a disjunction. If it is neither a disjunction nor a conjunction, it cannot + // be expanded further and an empty Map is returned. + switch (condition.getKind()) { + case AND: + List<RexNode> andOperands = RexUtil.flattenAnd(((RexCall) condition).getOperands()); + for (RexNode andOperand : andOperands) { + Map<RexTableInputRef.RelTableRef, RexNode> operandResult = expandDeep(andOperand); + mergeAnd(additionalConditions, operandResult); + } + break; + case OR: + List<RexNode> orOperands = RexUtil.flattenOr(((RexCall) condition).getOperands()); + additionalConditions.putAll(expandDeep(orOperands.get(0))); + for (int i = 1; i < orOperands.size(); i++) { + Map<RexTableInputRef.RelTableRef, RexNode> operandResult = + expandDeep(orOperands.get(i)); + mergeOr(additionalConditions, operandResult); + } + break; + default: + break; + } + + return additionalConditions; + } + + private RexTableInputRef.@Nullable RelTableRef inputRefsBelongOneTable( + ImmutableBitSet inputRefs) { + RexTableInputRef.RelTableRef tableRef = inputRefToTableRef.get(inputRefs.nth(0)); + if (tableRef == null) { + return null; + } + for (int inputBit : inputRefs.asList()) { + RexTableInputRef.RelTableRef inputBitTableRef = inputRefToTableRef.get(inputBit); + if (inputBitTableRef == null || !inputBitTableRef.equals(tableRef)) { + return null; + } + } + return tableRef; + } + + /** + * For additional predicates of each operand in conjuction, all of them should be retained and + * use 'AND' to combine expressions. + * + * @param baseMap Additional predicates that current conjunction has already saved + * @param forMergeMap Additional predicates that current operand has expanded + */ + private void mergeAnd( + Map<RexTableInputRef.RelTableRef, RexNode> baseMap, + Map<RexTableInputRef.RelTableRef, RexNode> forMergeMap) { + for (Map.Entry<RexTableInputRef.RelTableRef, RexNode> entry : forMergeMap.entrySet()) { + RexNode mergeExpression = + relBuilder.and( + entry.getValue(), + baseMap.getOrDefault(entry.getKey(), relBuilder.literal(true))); + baseMap.put(entry.getKey(), mergeExpression); + } + } + + /** + * Only if all operands in disjunction have additional predicates for table t1, we use 'OR' + * to combine expressions and retain them as additional predicates of table t1. + * + * @param baseMap Additional predicates that current disjunction has already saved + * @param forMergeMap Additional predicates that current operand has expanded + */ + private void mergeOr( + Map<RexTableInputRef.RelTableRef, RexNode> baseMap, + Map<RexTableInputRef.RelTableRef, RexNode> forMergeMap) { + if (baseMap.isEmpty()) { + return; + } + + Iterator<Map.Entry<RexTableInputRef.RelTableRef, RexNode>> iterator = + baseMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<RexTableInputRef.RelTableRef, RexNode> entry = iterator.next(); + if (!forMergeMap.containsKey(entry.getKey())) { + iterator.remove(); + continue; + } + RexNode mergedRex = + relBuilder.or( + entry.getValue(), + forMergeMap.get(entry.getKey())); + baseMap.put(entry.getKey(), mergedRex); + } + } + + private void incrementAndCheck() { + if (maxProcessLimitNodes > 0 && ++currentCount > maxProcessLimitNodes) { + throw OverLimitException.INSTANCE; + } + } + + /** Exception to catch when we pass the limit. */ + private static class OverLimitException extends ControlFlowException { + protected static final OverLimitException INSTANCE = new OverLimitException(); + + private OverLimitException() {} + } + } + + /** Rule configuration. */ + @Value.Immutable(singleton = false) + public interface Config extends RelRule.Config { + Config FILTER = ImmutableExpandDisjuctionRule.Config.builder() + .withMatchHandler(ExpandDisjuctionRule::matchFilter) + .build() + .withOperandSupplier(b -> + b.operand(Filter.class).anyInputs()); Review Comment: Nit: If you fit into 80 chars you can always place it in a single line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
