This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3f53ef1fad8f45083cbad34eaaf2a67fb7fa9df4 Author: Jark Wu <[email protected]> AuthorDate: Fri Mar 10 22:46:47 2023 +0800 [FLINK-30129][table] Push projection through ChangelogNormalize This closes #22156 --- .../PushFilterPastChangelogNormalizeRule.java | 225 ++++++++++++++------- .../PushFilterPastChangelogNormalizeRuleTest.java | 64 ++++++ .../PushFilterPastChangelogNormalizeRuleTest.xml | 83 +++++++- 3 files changed, 287 insertions(+), 85 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRule.java index c9ef634d369..d9178061f9f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRule.java @@ -22,16 +22,21 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange; +import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.RelRule; +import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLocalRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexProgram; import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.calcite.rex.RexShuttle; import org.apache.calcite.rex.RexUtil; import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.util.Pair; @@ -40,7 +45,9 @@ import org.immutables.value.Value; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -48,15 +55,16 @@ import java.util.stream.IntStream; import static org.apache.flink.table.planner.plan.utils.RexNodeExtractor.extractRefInputFields; /** - * Pushes primary key filters through a {@link StreamPhysicalChangelogNormalize ChangelogNormalize} - * operator to reduce its state size. + * Pushes primary key filters and used fields project through a {@link + * StreamPhysicalChangelogNormalize ChangelogNormalize} operator to reduce its state size. * * <p>This rule looks for Calc → ChangelogNormalize where the {@link StreamPhysicalCalc Calc} - * contains a filter condition. The condition is transformed into CNF and then each conjunction is - * tested for whether it affects only primary key columns. If such conditions exist, they are moved - * into a new, separate Calc and pushed through the ChangelogNormalize operator. ChangelogNormalize - * keeps state for every unique key it encounters, thus pushing filters on the primary key in front - * of it helps reduce the size of its state. + * contains a filter condition or a projection. The condition is transformed into CNF and then each + * conjunction is tested for whether it affects only primary key columns. If such conditions or + * projection exist, they are moved into a new, separate Calc and pushed through the + * ChangelogNormalize operator. ChangelogNormalize keeps state for every unique key it encounters, + * thus pushing filters on the primary key and projection on values in front of it helps reduce the + * size of its state. * * <p>Note that pushing primary key filters is safe to do, but pushing any other filters can lead to * incorrect results. @@ -77,31 +85,55 @@ public class PushFilterPastChangelogNormalizeRule public void onMatch(RelOptRuleCall call) { final StreamPhysicalCalc calc = call.rel(0); final StreamPhysicalChangelogNormalize changelogNormalize = call.rel(1); - - final RexProgram program = calc.getProgram(); - final RexNode condition = - RexUtil.toCnf( - call.builder().getRexBuilder(), - program.expandLocalRef(program.getCondition())); - final Set<Integer> primaryKeyIndices = IntStream.of(changelogNormalize.uniqueKeys()).boxed().collect(Collectors.toSet()); // Determine which filters can be pushed (= involve only primary key columns) final List<RexNode> primaryKeyPredicates = new ArrayList<>(); final List<RexNode> otherPredicates = new ArrayList<>(); - partitionPrimaryKeyPredicates( - RelOptUtil.conjunctions(condition), - primaryKeyIndices, - primaryKeyPredicates, - otherPredicates); + final RexProgram program = calc.getProgram(); + if (program.getCondition() != null) { + final RexNode condition = + RexUtil.toCnf( + call.builder().getRexBuilder(), + program.expandLocalRef(program.getCondition())); + partitionPrimaryKeyPredicates( + RelOptUtil.conjunctions(condition), + primaryKeyIndices, + primaryKeyPredicates, + otherPredicates); + } + + // used input field indices + int[] usedInputFields = extractUsedInputFields(calc, primaryKeyIndices); - // Construct a new ChangelogNormalize which has primary key filters pushed into it + // Construct a new ChangelogNormalize which has used fields project + // and primary key filters pushed into it final StreamPhysicalChangelogNormalize newChangelogNormalize = - pushFiltersThroughChangelogNormalize(call, primaryKeyPredicates); + pushCalcThroughChangelogNormalize(call, primaryKeyPredicates, usedInputFields); // Retain only filters which haven't been pushed - transformWithRemainingPredicates(call, newChangelogNormalize, otherPredicates); + transformWithRemainingPredicates( + call, newChangelogNormalize, otherPredicates, usedInputFields); + } + + /** Extracts input fields which are used in the Calc node and the ChangelogNormalize node. */ + private int[] extractUsedInputFields(StreamPhysicalCalc calc, Set<Integer> primaryKeyIndices) { + RexProgram program = calc.getProgram(); + List<RexNode> projectsAndCondition = + program.getProjectList().stream() + .map(program::expandLocalRef) + .collect(Collectors.toList()); + if (program.getCondition() != null) { + projectsAndCondition.add(program.expandLocalRef(program.getCondition())); + } + Set<Integer> projectedFields = + Arrays.stream(extractRefInputFields(projectsAndCondition)) + .boxed() + .collect(Collectors.toSet()); + // we can't project primary keys + projectedFields.addAll(primaryKeyIndices); + return projectedFields.stream().sorted().mapToInt(Integer::intValue).toArray(); } /** @@ -123,43 +155,64 @@ public class PushFilterPastChangelogNormalizeRule } } - /** Pushes {@param primaryKeyPredicates} into the {@link StreamPhysicalChangelogNormalize}. */ - private StreamPhysicalChangelogNormalize pushFiltersThroughChangelogNormalize( - RelOptRuleCall call, List<RexNode> primaryKeyPredicates) { + /** + * Pushes {@param primaryKeyPredicates} and used fields project into the {@link + * StreamPhysicalChangelogNormalize}. + */ + private StreamPhysicalChangelogNormalize pushCalcThroughChangelogNormalize( + RelOptRuleCall call, List<RexNode> primaryKeyPredicates, int[] usedInputFields) { final StreamPhysicalChangelogNormalize changelogNormalize = call.rel(1); final StreamPhysicalExchange exchange = call.rel(2); + final Set<Integer> primaryKeyIndices = + IntStream.of(changelogNormalize.uniqueKeys()).boxed().collect(Collectors.toSet()); - if (primaryKeyPredicates.isEmpty()) { - // There are no filters which can be pushed, so just return the existing node. + if (primaryKeyPredicates.isEmpty() + && usedInputFields.length == changelogNormalize.getRowType().getFieldCount()) { + // There are no filters and no project which can be pushed, so just return the existing + // node. return changelogNormalize; } - final StreamPhysicalCalc pushedFiltersCalc = - projectIdentityWithConditions( - call.builder(), exchange.getInput(), primaryKeyPredicates); + final StreamPhysicalCalc pushedCalc = + projectUsedFieldsWithConditions( + call.builder(), exchange.getInput(), primaryKeyPredicates, usedInputFields); + + // build input field reference from old field index to new field index + final Map<Integer, Integer> inputRefMapping = buildFieldsMapping(usedInputFields); + final List<Integer> newPrimaryKeyIndices = + primaryKeyIndices.stream().map(inputRefMapping::get).collect(Collectors.toList()); + final FlinkRelDistribution newDistribution = + FlinkRelDistribution.hash(newPrimaryKeyIndices, true); + final RelTraitSet newTraitSet = exchange.getTraitSet().replace(newDistribution); final StreamPhysicalExchange newExchange = - (StreamPhysicalExchange) - exchange.copy( - exchange.getTraitSet(), - Collections.singletonList(pushedFiltersCalc)); + exchange.copy(newTraitSet, pushedCalc, newDistribution); return (StreamPhysicalChangelogNormalize) changelogNormalize.copy( - changelogNormalize.getTraitSet(), Collections.singletonList(newExchange)); + changelogNormalize.getTraitSet(), + newExchange, + newPrimaryKeyIndices.stream().mapToInt(Integer::intValue).toArray()); } /** - * Returns a {@link StreamPhysicalCalc} with the given {@param conditions} and an identity - * projection. + * Builds a new {@link StreamPhysicalCalc} on the input node with the given {@param conditions} + * and a used fields projection. */ - private StreamPhysicalCalc projectIdentityWithConditions( - RelBuilder relBuilder, RelNode newInput, List<RexNode> conditions) { - + private StreamPhysicalCalc projectUsedFieldsWithConditions( + RelBuilder relBuilder, RelNode input, List<RexNode> conditions, int[] usedFields) { + final RelDataType inputRowType = input.getRowType(); + final List<String> inputFieldNames = inputRowType.getFieldNames(); final RexProgramBuilder programBuilder = - new RexProgramBuilder(newInput.getRowType(), relBuilder.getRexBuilder()); - programBuilder.addIdentity(); + new RexProgramBuilder(inputRowType, relBuilder.getRexBuilder()); + + // add project + for (int fieldIndex : usedFields) { + programBuilder.addProject( + programBuilder.makeInputRef(fieldIndex), inputFieldNames.get(fieldIndex)); + } + // add conditions final RexNode condition = relBuilder.and(conditions); if (!condition.isAlwaysTrue()) { programBuilder.addCondition(condition); @@ -167,36 +220,13 @@ public class PushFilterPastChangelogNormalizeRule final RexProgram newProgram = programBuilder.getProgram(); return new StreamPhysicalCalc( - newInput.getCluster(), - newInput.getTraitSet(), - newInput, + input.getCluster(), + input.getTraitSet(), + input, newProgram, newProgram.getOutputRowType()); } - /** - * Returns a {@link StreamPhysicalCalc} which is a copy of {@param calc}, but with the - * projections applied from {@param projectFromCalc}. - */ - private StreamPhysicalCalc projectWith( - RelBuilder relBuilder, StreamPhysicalCalc projectFromCalc, StreamPhysicalCalc calc) { - final RexProgramBuilder programBuilder = - new RexProgramBuilder(calc.getRowType(), relBuilder.getRexBuilder()); - if (calc.getProgram().getCondition() != null) { - programBuilder.addCondition( - calc.getProgram().expandLocalRef(calc.getProgram().getCondition())); - } - - for (Pair<RexLocalRef, String> projectRef : - projectFromCalc.getProgram().getNamedProjects()) { - final RexNode project = projectFromCalc.getProgram().expandLocalRef(projectRef.left); - programBuilder.addProject(project, projectRef.right); - } - - final RexProgram newProgram = programBuilder.getProgram(); - return (StreamPhysicalCalc) calc.copy(calc.getTraitSet(), calc.getInput(), newProgram); - } - /** * Transforms the {@link RelOptRuleCall} to use {@param changelogNormalize} as the new input to * a {@link StreamPhysicalCalc} which uses {@param predicates} for the condition. @@ -204,21 +234,69 @@ public class PushFilterPastChangelogNormalizeRule private void transformWithRemainingPredicates( RelOptRuleCall call, StreamPhysicalChangelogNormalize changelogNormalize, - List<RexNode> predicates) { + List<RexNode> predicates, + int[] usedInputFields) { final StreamPhysicalCalc calc = call.rel(0); final RelBuilder relBuilder = call.builder(); + final RexProgramBuilder programBuilder = + new RexProgramBuilder(changelogNormalize.getRowType(), relBuilder.getRexBuilder()); + + final Map<Integer, Integer> inputRefMapping = buildFieldsMapping(usedInputFields); + + // add projects + for (Pair<RexLocalRef, String> ref : calc.getProgram().getNamedProjects()) { + RexNode shiftedProject = + adjustInputRef(calc.getProgram().expandLocalRef(ref.left), inputRefMapping); + programBuilder.addProject(shiftedProject, ref.right); + } - final StreamPhysicalCalc newCalc = - projectIdentityWithConditions(relBuilder, changelogNormalize, predicates); - final StreamPhysicalCalc newProjectedCalc = projectWith(relBuilder, calc, newCalc); + // add conditions + final List<RexNode> shiftedPredicates = + predicates.stream() + .map(p -> adjustInputRef(p, inputRefMapping)) + .collect(Collectors.toList()); + final RexNode condition = relBuilder.and(shiftedPredicates); + if (!condition.isAlwaysTrue()) { + programBuilder.addCondition(condition); + } - if (newProjectedCalc.getProgram().isTrivial()) { + final RexProgram newProgram = programBuilder.getProgram(); + if (newProgram.isTrivial()) { call.transformTo(changelogNormalize); } else { + final StreamPhysicalCalc newProjectedCalc = + new StreamPhysicalCalc( + changelogNormalize.getCluster(), + changelogNormalize.getTraitSet(), + changelogNormalize, + newProgram, + newProgram.getOutputRowType()); call.transformTo(newProjectedCalc); } } + /** Adjust the {@param expr} field indices according to the field index {@param mapping}. */ + private RexNode adjustInputRef(RexNode expr, Map<Integer, Integer> mapping) { + return expr.accept( + new RexShuttle() { + + @Override + public RexNode visitInputRef(RexInputRef inputRef) { + Integer newIndex = mapping.get(inputRef.getIndex()); + return new RexInputRef(newIndex, inputRef.getType()); + } + }); + } + + /** Build field reference mapping from old field index to new field index after projection. */ + private Map<Integer, Integer> buildFieldsMapping(int[] projectedInputRefs) { + final Map<Integer, Integer> fieldsOldToNewIndexMapping = new HashMap<>(); + for (int i = 0; i < projectedInputRefs.length; i++) { + fieldsOldToNewIndexMapping.put(projectedInputRefs[i], i); + } + return fieldsOldToNewIndexMapping; + } + // --------------------------------------------------------------------------------------------- /** Configuration for {@link PushFilterPastChangelogNormalizeRule}. */ @@ -247,7 +325,6 @@ public class PushFilterPastChangelogNormalizeRule operandBuilder -> operandBuilder .operand(StreamPhysicalCalc.class) - .predicate(calc -> calc.getProgram().getCondition() != null) .oneInput(changelogNormalizeTransform); return withOperandSupplier(calcTransform).as(Config.class); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRuleTest.java index ca30965aad1..a7e9f1d9893 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRuleTest.java @@ -98,4 +98,68 @@ public class PushFilterPastChangelogNormalizeRuleTest extends TableTestBase { util.tableEnv().createTable("T", sourceDescriptor); util.verifyRelPlan("SELECT f0, f1 FROM T WHERE (f1 < 1 OR f2 > 10) AND f0 IS NOT NULL"); } + + @Test + public void testOnlyProjection() { + final TableDescriptor sourceDescriptor = + TableFactoryHarness.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", STRING()) + .column("f1", INT().notNull()) + .column("f2", STRING().notNull()) + .primaryKey("f1") + .build()) + .unboundedScanSource(ChangelogMode.upsert()) + .build(); + + util.tableEnv().createTable("T", sourceDescriptor); + util.verifyRelPlan("SELECT f1, f2 FROM T"); + } + + @Test + public void testFilterAndProjection() { + final TableDescriptor sourceDescriptor = + TableFactoryHarness.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", STRING()) + .column("f1", INT().notNull()) + .column("f2", BIGINT().notNull()) + .column("f3", STRING()) + .column("f4", BIGINT().notNull()) + .column("f5", BIGINT().notNull()) + .column("f6", BIGINT().notNull()) + .column("f7", BIGINT().notNull()) + .primaryKey("f1", "f2") + .build()) + .unboundedScanSource(ChangelogMode.upsert()) + .build(); + + util.tableEnv().createTable("T", sourceDescriptor); + util.verifyRelPlan("SELECT f1, f5 FROM T WHERE (f1 < 1 OR f2 > 10) AND f3 IS NOT NULL"); + } + + @Test + public void testPartialPrimaryKeyFilterAndProjection() { + final TableDescriptor sourceDescriptor = + TableFactoryHarness.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", STRING()) + .column("f1", INT().notNull()) + .column("f2", BIGINT().notNull()) + .column("f3", STRING()) + .column("f4", BIGINT().notNull()) + .column("f5", BIGINT().notNull()) + .column("f6", BIGINT().notNull()) + .column("f7", BIGINT().notNull()) + .primaryKey("f1", "f2") + .build()) + .unboundedScanSource(ChangelogMode.upsert()) + .build(); + + util.tableEnv().createTable("T", sourceDescriptor); + util.verifyRelPlan("SELECT f1, f5 FROM T WHERE f1 < 1 AND f3 IS NOT NULL"); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRuleTest.xml index 8c9d7a93b9d..7b5f30bbd2f 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRuleTest.xml @@ -16,25 +16,24 @@ See the License for the specific language governing permissions and limitations under the License. --> <Root> - <TestCase name="testWithMultipleFilters"> + <TestCase name="testFilterAndProjection"> <Resource name="sql"> - <![CDATA[SELECT f1, SUM(f1) AS `sum` FROM T WHERE f1 < 10 AND (f1 > 3 OR f2 IS NULL) GROUP BY f1]]> + <![CDATA[SELECT f1, f5 FROM T WHERE (f1 < 1 OR f2 > 10) AND f3 IS NOT NULL]]> </Resource> <Resource name="ast"> <![CDATA[ -LogicalAggregate(group=[{0}], sum=[SUM($0)]) -+- LogicalProject(f1=[$1]) - +- LogicalFilter(condition=[AND(<($1, 10), OR(>($1, 3), IS NULL($2)))]) - +- LogicalTableScan(table=[[default_catalog, default_database, T]]) +LogicalProject(f1=[$1], f5=[$5]) ++- LogicalFilter(condition=[AND(OR(<($1, 1), >($2, 10)), IS NOT NULL($3))]) + +- LogicalTableScan(table=[[default_catalog, default_database, T]]) ]]> </Resource> <Resource name="optimized rel plan"> <![CDATA[ -Calc(select=[f1, f1 AS sum], where=[OR(>(f1, 3), IS NULL(f2))]) -+- ChangelogNormalize(key=[f1]) - +- Exchange(distribution=[hash[f1]]) - +- Calc(select=[f0, f1, f2], where=[<(f1, 10)]) - +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2]) +Calc(select=[f1, f5], where=[IS NOT NULL(f3)]) ++- ChangelogNormalize(key=[f1, f2]) + +- Exchange(distribution=[hash[f1, f2]]) + +- Calc(select=[f1, f2, f3, f5], where=[OR(<(f1, 1), >(f2, 10))]) + +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2, f3, f4, f5, f6, f7]) ]]> </Resource> </TestCase> @@ -56,6 +55,46 @@ Calc(select=[f0, f1], where=[IS NOT NULL(f0)]) +- Exchange(distribution=[hash[f1, f2]]) +- Calc(select=[f0, f1, f2], where=[OR(<(f1, 1), >(f2, 10))]) +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2]) +]]> + </Resource> + </TestCase> + <TestCase name="testOnlyProjection"> + <Resource name="sql"> + <![CDATA[SELECT f1, f2 FROM T]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(f1=[$1], f2=[$2]) ++- LogicalTableScan(table=[[default_catalog, default_database, T]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +ChangelogNormalize(key=[f1]) ++- Exchange(distribution=[hash[f1]]) + +- Calc(select=[f1, f2]) + +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2]) +]]> + </Resource> + </TestCase> + <TestCase name="testPartialPrimaryKeyFilterAndProjection"> + <Resource name="sql"> + <![CDATA[SELECT f1, f5 FROM T WHERE f1 < 1 AND f3 IS NOT NULL]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(f1=[$1], f5=[$5]) ++- LogicalFilter(condition=[AND(<($1, 1), IS NOT NULL($3))]) + +- LogicalTableScan(table=[[default_catalog, default_database, T]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[f1, f5], where=[IS NOT NULL(f3)]) ++- ChangelogNormalize(key=[f1, f2]) + +- Exchange(distribution=[hash[f1, f2]]) + +- Calc(select=[f1, f2, f3, f5], where=[<(f1, 1)]) + +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2, f3, f4, f5, f6, f7]) ]]> </Resource> </TestCase> @@ -76,6 +115,28 @@ ChangelogNormalize(key=[f1]) +- Exchange(distribution=[hash[f1]]) +- Calc(select=[f0, f1], where=[<(f1, 1)]) +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1]) +]]> + </Resource> + </TestCase> + <TestCase name="testWithMultipleFilters"> + <Resource name="sql"> + <![CDATA[SELECT f1, SUM(f1) AS `sum` FROM T WHERE f1 < 10 AND (f1 > 3 OR f2 IS NULL) GROUP BY f1]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalAggregate(group=[{0}], sum=[SUM($0)]) ++- LogicalProject(f1=[$1]) + +- LogicalFilter(condition=[AND(<($1, 10), OR(>($1, 3), IS NULL($2)))]) + +- LogicalTableScan(table=[[default_catalog, default_database, T]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[f1, f1 AS sum], where=[OR(>(f1, 3), IS NULL(f2))]) ++- ChangelogNormalize(key=[f1]) + +- Exchange(distribution=[hash[f1]]) + +- Calc(select=[f1, f2], where=[<(f1, 10)]) + +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2]) ]]> </Resource> </TestCase>
