This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3f53ef1fad8f45083cbad34eaaf2a67fb7fa9df4
Author: Jark Wu <[email protected]>
AuthorDate: Fri Mar 10 22:46:47 2023 +0800

    [FLINK-30129][table] Push projection through ChangelogNormalize
    
    This closes #22156
---
 .../PushFilterPastChangelogNormalizeRule.java      | 225 ++++++++++++++-------
 .../PushFilterPastChangelogNormalizeRuleTest.java  |  64 ++++++
 .../PushFilterPastChangelogNormalizeRuleTest.xml   |  83 +++++++-
 3 files changed, 287 insertions(+), 85 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRule.java
index c9ef634d369..d9178061f9f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRule.java
@@ -22,16 +22,21 @@ import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
 
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLocalRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.rex.RexShuttle;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.util.Pair;
@@ -40,7 +45,9 @@ import org.immutables.value.Value;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -48,15 +55,16 @@ import java.util.stream.IntStream;
 import static 
org.apache.flink.table.planner.plan.utils.RexNodeExtractor.extractRefInputFields;
 
 /**
- * Pushes primary key filters through a {@link 
StreamPhysicalChangelogNormalize ChangelogNormalize}
- * operator to reduce its state size.
+ * Pushes primary key filters and used fields project through a {@link
+ * StreamPhysicalChangelogNormalize ChangelogNormalize} operator to reduce its 
state size.
  *
  * <p>This rule looks for Calc → ChangelogNormalize where the {@link 
StreamPhysicalCalc Calc}
- * contains a filter condition. The condition is transformed into CNF and then 
each conjunction is
- * tested for whether it affects only primary key columns. If such conditions 
exist, they are moved
- * into a new, separate Calc and pushed through the ChangelogNormalize 
operator. ChangelogNormalize
- * keeps state for every unique key it encounters, thus pushing filters on the 
primary key in front
- * of it helps reduce the size of its state.
+ * contains a filter condition or a projection. The condition is transformed 
into CNF and then each
+ * conjunction is tested for whether it affects only primary key columns. If 
such conditions or
+ * projection exist, they are moved into a new, separate Calc and pushed 
through the
+ * ChangelogNormalize operator. ChangelogNormalize keeps state for every 
unique key it encounters,
+ * thus pushing filters on the primary key and projection on values in front 
of it helps reduce the
+ * size of its state.
  *
  * <p>Note that pushing primary key filters is safe to do, but pushing any 
other filters can lead to
  * incorrect results.
@@ -77,31 +85,55 @@ public class PushFilterPastChangelogNormalizeRule
     public void onMatch(RelOptRuleCall call) {
         final StreamPhysicalCalc calc = call.rel(0);
         final StreamPhysicalChangelogNormalize changelogNormalize = 
call.rel(1);
-
-        final RexProgram program = calc.getProgram();
-        final RexNode condition =
-                RexUtil.toCnf(
-                        call.builder().getRexBuilder(),
-                        program.expandLocalRef(program.getCondition()));
-
         final Set<Integer> primaryKeyIndices =
                 
IntStream.of(changelogNormalize.uniqueKeys()).boxed().collect(Collectors.toSet());
 
         // Determine which filters can be pushed (= involve only primary key 
columns)
         final List<RexNode> primaryKeyPredicates = new ArrayList<>();
         final List<RexNode> otherPredicates = new ArrayList<>();
-        partitionPrimaryKeyPredicates(
-                RelOptUtil.conjunctions(condition),
-                primaryKeyIndices,
-                primaryKeyPredicates,
-                otherPredicates);
+        final RexProgram program = calc.getProgram();
+        if (program.getCondition() != null) {
+            final RexNode condition =
+                    RexUtil.toCnf(
+                            call.builder().getRexBuilder(),
+                            program.expandLocalRef(program.getCondition()));
+            partitionPrimaryKeyPredicates(
+                    RelOptUtil.conjunctions(condition),
+                    primaryKeyIndices,
+                    primaryKeyPredicates,
+                    otherPredicates);
+        }
+
+        // used input field indices
+        int[] usedInputFields = extractUsedInputFields(calc, 
primaryKeyIndices);
 
-        // Construct a new ChangelogNormalize which has primary key filters 
pushed into it
+        // Construct a new ChangelogNormalize which has used fields project
+        // and primary key filters pushed into it
         final StreamPhysicalChangelogNormalize newChangelogNormalize =
-                pushFiltersThroughChangelogNormalize(call, 
primaryKeyPredicates);
+                pushCalcThroughChangelogNormalize(call, primaryKeyPredicates, 
usedInputFields);
 
         // Retain only filters which haven't been pushed
-        transformWithRemainingPredicates(call, newChangelogNormalize, 
otherPredicates);
+        transformWithRemainingPredicates(
+                call, newChangelogNormalize, otherPredicates, usedInputFields);
+    }
+
+    /** Extracts input fields which are used in the Calc node and the 
ChangelogNormalize node. */
+    private int[] extractUsedInputFields(StreamPhysicalCalc calc, Set<Integer> 
primaryKeyIndices) {
+        RexProgram program = calc.getProgram();
+        List<RexNode> projectsAndCondition =
+                program.getProjectList().stream()
+                        .map(program::expandLocalRef)
+                        .collect(Collectors.toList());
+        if (program.getCondition() != null) {
+            
projectsAndCondition.add(program.expandLocalRef(program.getCondition()));
+        }
+        Set<Integer> projectedFields =
+                Arrays.stream(extractRefInputFields(projectsAndCondition))
+                        .boxed()
+                        .collect(Collectors.toSet());
+        // we can't project primary keys
+        projectedFields.addAll(primaryKeyIndices);
+        return 
projectedFields.stream().sorted().mapToInt(Integer::intValue).toArray();
     }
 
     /**
@@ -123,43 +155,64 @@ public class PushFilterPastChangelogNormalizeRule
         }
     }
 
-    /** Pushes {@param primaryKeyPredicates} into the {@link 
StreamPhysicalChangelogNormalize}. */
-    private StreamPhysicalChangelogNormalize 
pushFiltersThroughChangelogNormalize(
-            RelOptRuleCall call, List<RexNode> primaryKeyPredicates) {
+    /**
+     * Pushes {@param primaryKeyPredicates} and used fields project into the 
{@link
+     * StreamPhysicalChangelogNormalize}.
+     */
+    private StreamPhysicalChangelogNormalize pushCalcThroughChangelogNormalize(
+            RelOptRuleCall call, List<RexNode> primaryKeyPredicates, int[] 
usedInputFields) {
         final StreamPhysicalChangelogNormalize changelogNormalize = 
call.rel(1);
         final StreamPhysicalExchange exchange = call.rel(2);
+        final Set<Integer> primaryKeyIndices =
+                
IntStream.of(changelogNormalize.uniqueKeys()).boxed().collect(Collectors.toSet());
 
-        if (primaryKeyPredicates.isEmpty()) {
-            // There are no filters which can be pushed, so just return the 
existing node.
+        if (primaryKeyPredicates.isEmpty()
+                && usedInputFields.length == 
changelogNormalize.getRowType().getFieldCount()) {
+            // There are no filters and no project which can be pushed, so 
just return the existing
+            // node.
             return changelogNormalize;
         }
 
-        final StreamPhysicalCalc pushedFiltersCalc =
-                projectIdentityWithConditions(
-                        call.builder(), exchange.getInput(), 
primaryKeyPredicates);
+        final StreamPhysicalCalc pushedCalc =
+                projectUsedFieldsWithConditions(
+                        call.builder(), exchange.getInput(), 
primaryKeyPredicates, usedInputFields);
+
+        // build input field reference from old field index to new field index
+        final Map<Integer, Integer> inputRefMapping = 
buildFieldsMapping(usedInputFields);
+        final List<Integer> newPrimaryKeyIndices =
+                
primaryKeyIndices.stream().map(inputRefMapping::get).collect(Collectors.toList());
 
+        final FlinkRelDistribution newDistribution =
+                FlinkRelDistribution.hash(newPrimaryKeyIndices, true);
+        final RelTraitSet newTraitSet = 
exchange.getTraitSet().replace(newDistribution);
         final StreamPhysicalExchange newExchange =
-                (StreamPhysicalExchange)
-                        exchange.copy(
-                                exchange.getTraitSet(),
-                                Collections.singletonList(pushedFiltersCalc));
+                exchange.copy(newTraitSet, pushedCalc, newDistribution);
 
         return (StreamPhysicalChangelogNormalize)
                 changelogNormalize.copy(
-                        changelogNormalize.getTraitSet(), 
Collections.singletonList(newExchange));
+                        changelogNormalize.getTraitSet(),
+                        newExchange,
+                        
newPrimaryKeyIndices.stream().mapToInt(Integer::intValue).toArray());
     }
 
     /**
-     * Returns a {@link StreamPhysicalCalc} with the given {@param conditions} 
and an identity
-     * projection.
+     * Builds a new {@link StreamPhysicalCalc} on the input node with the 
given {@param conditions}
+     * and a used fields projection.
      */
-    private StreamPhysicalCalc projectIdentityWithConditions(
-            RelBuilder relBuilder, RelNode newInput, List<RexNode> conditions) 
{
-
+    private StreamPhysicalCalc projectUsedFieldsWithConditions(
+            RelBuilder relBuilder, RelNode input, List<RexNode> conditions, 
int[] usedFields) {
+        final RelDataType inputRowType = input.getRowType();
+        final List<String> inputFieldNames = inputRowType.getFieldNames();
         final RexProgramBuilder programBuilder =
-                new RexProgramBuilder(newInput.getRowType(), 
relBuilder.getRexBuilder());
-        programBuilder.addIdentity();
+                new RexProgramBuilder(inputRowType, 
relBuilder.getRexBuilder());
+
+        // add project
+        for (int fieldIndex : usedFields) {
+            programBuilder.addProject(
+                    programBuilder.makeInputRef(fieldIndex), 
inputFieldNames.get(fieldIndex));
+        }
 
+        // add conditions
         final RexNode condition = relBuilder.and(conditions);
         if (!condition.isAlwaysTrue()) {
             programBuilder.addCondition(condition);
@@ -167,36 +220,13 @@ public class PushFilterPastChangelogNormalizeRule
 
         final RexProgram newProgram = programBuilder.getProgram();
         return new StreamPhysicalCalc(
-                newInput.getCluster(),
-                newInput.getTraitSet(),
-                newInput,
+                input.getCluster(),
+                input.getTraitSet(),
+                input,
                 newProgram,
                 newProgram.getOutputRowType());
     }
 
-    /**
-     * Returns a {@link StreamPhysicalCalc} which is a copy of {@param calc}, 
but with the
-     * projections applied from {@param projectFromCalc}.
-     */
-    private StreamPhysicalCalc projectWith(
-            RelBuilder relBuilder, StreamPhysicalCalc projectFromCalc, 
StreamPhysicalCalc calc) {
-        final RexProgramBuilder programBuilder =
-                new RexProgramBuilder(calc.getRowType(), 
relBuilder.getRexBuilder());
-        if (calc.getProgram().getCondition() != null) {
-            programBuilder.addCondition(
-                    
calc.getProgram().expandLocalRef(calc.getProgram().getCondition()));
-        }
-
-        for (Pair<RexLocalRef, String> projectRef :
-                projectFromCalc.getProgram().getNamedProjects()) {
-            final RexNode project = 
projectFromCalc.getProgram().expandLocalRef(projectRef.left);
-            programBuilder.addProject(project, projectRef.right);
-        }
-
-        final RexProgram newProgram = programBuilder.getProgram();
-        return (StreamPhysicalCalc) calc.copy(calc.getTraitSet(), 
calc.getInput(), newProgram);
-    }
-
     /**
      * Transforms the {@link RelOptRuleCall} to use {@param 
changelogNormalize} as the new input to
      * a {@link StreamPhysicalCalc} which uses {@param predicates} for the 
condition.
@@ -204,21 +234,69 @@ public class PushFilterPastChangelogNormalizeRule
     private void transformWithRemainingPredicates(
             RelOptRuleCall call,
             StreamPhysicalChangelogNormalize changelogNormalize,
-            List<RexNode> predicates) {
+            List<RexNode> predicates,
+            int[] usedInputFields) {
         final StreamPhysicalCalc calc = call.rel(0);
         final RelBuilder relBuilder = call.builder();
+        final RexProgramBuilder programBuilder =
+                new RexProgramBuilder(changelogNormalize.getRowType(), 
relBuilder.getRexBuilder());
+
+        final Map<Integer, Integer> inputRefMapping = 
buildFieldsMapping(usedInputFields);
+
+        // add projects
+        for (Pair<RexLocalRef, String> ref : 
calc.getProgram().getNamedProjects()) {
+            RexNode shiftedProject =
+                    adjustInputRef(calc.getProgram().expandLocalRef(ref.left), 
inputRefMapping);
+            programBuilder.addProject(shiftedProject, ref.right);
+        }
 
-        final StreamPhysicalCalc newCalc =
-                projectIdentityWithConditions(relBuilder, changelogNormalize, 
predicates);
-        final StreamPhysicalCalc newProjectedCalc = projectWith(relBuilder, 
calc, newCalc);
+        // add conditions
+        final List<RexNode> shiftedPredicates =
+                predicates.stream()
+                        .map(p -> adjustInputRef(p, inputRefMapping))
+                        .collect(Collectors.toList());
+        final RexNode condition = relBuilder.and(shiftedPredicates);
+        if (!condition.isAlwaysTrue()) {
+            programBuilder.addCondition(condition);
+        }
 
-        if (newProjectedCalc.getProgram().isTrivial()) {
+        final RexProgram newProgram = programBuilder.getProgram();
+        if (newProgram.isTrivial()) {
             call.transformTo(changelogNormalize);
         } else {
+            final StreamPhysicalCalc newProjectedCalc =
+                    new StreamPhysicalCalc(
+                            changelogNormalize.getCluster(),
+                            changelogNormalize.getTraitSet(),
+                            changelogNormalize,
+                            newProgram,
+                            newProgram.getOutputRowType());
             call.transformTo(newProjectedCalc);
         }
     }
 
+    /** Adjust the {@param expr} field indices according to the field index 
{@param mapping}. */
+    private RexNode adjustInputRef(RexNode expr, Map<Integer, Integer> 
mapping) {
+        return expr.accept(
+                new RexShuttle() {
+
+                    @Override
+                    public RexNode visitInputRef(RexInputRef inputRef) {
+                        Integer newIndex = mapping.get(inputRef.getIndex());
+                        return new RexInputRef(newIndex, inputRef.getType());
+                    }
+                });
+    }
+
+    /** Build field reference mapping from old field index to new field index 
after projection. */
+    private Map<Integer, Integer> buildFieldsMapping(int[] projectedInputRefs) 
{
+        final Map<Integer, Integer> fieldsOldToNewIndexMapping = new 
HashMap<>();
+        for (int i = 0; i < projectedInputRefs.length; i++) {
+            fieldsOldToNewIndexMapping.put(projectedInputRefs[i], i);
+        }
+        return fieldsOldToNewIndexMapping;
+    }
+
     // 
---------------------------------------------------------------------------------------------
 
     /** Configuration for {@link PushFilterPastChangelogNormalizeRule}. */
@@ -247,7 +325,6 @@ public class PushFilterPastChangelogNormalizeRule
                     operandBuilder ->
                             operandBuilder
                                     .operand(StreamPhysicalCalc.class)
-                                    .predicate(calc -> 
calc.getProgram().getCondition() != null)
                                     .oneInput(changelogNormalizeTransform);
 
             return withOperandSupplier(calcTransform).as(Config.class);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRuleTest.java
index ca30965aad1..a7e9f1d9893 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRuleTest.java
@@ -98,4 +98,68 @@ public class PushFilterPastChangelogNormalizeRuleTest 
extends TableTestBase {
         util.tableEnv().createTable("T", sourceDescriptor);
         util.verifyRelPlan("SELECT f0, f1 FROM T WHERE (f1 < 1 OR f2 > 10) AND 
f0 IS NOT NULL");
     }
+
+    @Test
+    public void testOnlyProjection() {
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("f0", STRING())
+                                        .column("f1", INT().notNull())
+                                        .column("f2", STRING().notNull())
+                                        .primaryKey("f1")
+                                        .build())
+                        .unboundedScanSource(ChangelogMode.upsert())
+                        .build();
+
+        util.tableEnv().createTable("T", sourceDescriptor);
+        util.verifyRelPlan("SELECT f1, f2 FROM T");
+    }
+
+    @Test
+    public void testFilterAndProjection() {
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("f0", STRING())
+                                        .column("f1", INT().notNull())
+                                        .column("f2", BIGINT().notNull())
+                                        .column("f3", STRING())
+                                        .column("f4", BIGINT().notNull())
+                                        .column("f5", BIGINT().notNull())
+                                        .column("f6", BIGINT().notNull())
+                                        .column("f7", BIGINT().notNull())
+                                        .primaryKey("f1", "f2")
+                                        .build())
+                        .unboundedScanSource(ChangelogMode.upsert())
+                        .build();
+
+        util.tableEnv().createTable("T", sourceDescriptor);
+        util.verifyRelPlan("SELECT f1, f5 FROM T WHERE (f1 < 1 OR f2 > 10) AND 
f3 IS NOT NULL");
+    }
+
+    @Test
+    public void testPartialPrimaryKeyFilterAndProjection() {
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("f0", STRING())
+                                        .column("f1", INT().notNull())
+                                        .column("f2", BIGINT().notNull())
+                                        .column("f3", STRING())
+                                        .column("f4", BIGINT().notNull())
+                                        .column("f5", BIGINT().notNull())
+                                        .column("f6", BIGINT().notNull())
+                                        .column("f7", BIGINT().notNull())
+                                        .primaryKey("f1", "f2")
+                                        .build())
+                        .unboundedScanSource(ChangelogMode.upsert())
+                        .build();
+
+        util.tableEnv().createTable("T", sourceDescriptor);
+        util.verifyRelPlan("SELECT f1, f5 FROM T WHERE f1 < 1 AND f3 IS NOT 
NULL");
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRuleTest.xml
index 8c9d7a93b9d..7b5f30bbd2f 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushFilterPastChangelogNormalizeRuleTest.xml
@@ -16,25 +16,24 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
-  <TestCase name="testWithMultipleFilters">
+  <TestCase name="testFilterAndProjection">
     <Resource name="sql">
-      <![CDATA[SELECT f1, SUM(f1) AS `sum` FROM T WHERE f1 < 10 AND (f1 > 3 OR 
f2 IS NULL) GROUP BY f1]]>
+      <![CDATA[SELECT f1, f5 FROM T WHERE (f1 < 1 OR f2 > 10) AND f3 IS NOT 
NULL]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalAggregate(group=[{0}], sum=[SUM($0)])
-+- LogicalProject(f1=[$1])
-   +- LogicalFilter(condition=[AND(<($1, 10), OR(>($1, 3), IS NULL($2)))])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+LogicalProject(f1=[$1], f5=[$5])
++- LogicalFilter(condition=[AND(OR(<($1, 1), >($2, 10)), IS NOT NULL($3))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[f1, f1 AS sum], where=[OR(>(f1, 3), IS NULL(f2))])
-+- ChangelogNormalize(key=[f1])
-   +- Exchange(distribution=[hash[f1]])
-      +- Calc(select=[f0, f1, f2], where=[<(f1, 10)])
-         +- TableSourceScan(table=[[default_catalog, default_database, T]], 
fields=[f0, f1, f2])
+Calc(select=[f1, f5], where=[IS NOT NULL(f3)])
++- ChangelogNormalize(key=[f1, f2])
+   +- Exchange(distribution=[hash[f1, f2]])
+      +- Calc(select=[f1, f2, f3, f5], where=[OR(<(f1, 1), >(f2, 10))])
+         +- TableSourceScan(table=[[default_catalog, default_database, T]], 
fields=[f0, f1, f2, f3, f4, f5, f6, f7])
 ]]>
     </Resource>
   </TestCase>
@@ -56,6 +55,46 @@ Calc(select=[f0, f1], where=[IS NOT NULL(f0)])
    +- Exchange(distribution=[hash[f1, f2]])
       +- Calc(select=[f0, f1, f2], where=[OR(<(f1, 1), >(f2, 10))])
          +- TableSourceScan(table=[[default_catalog, default_database, T]], 
fields=[f0, f1, f2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOnlyProjection">
+    <Resource name="sql">
+      <![CDATA[SELECT f1, f2 FROM T]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(f1=[$1], f2=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ChangelogNormalize(key=[f1])
++- Exchange(distribution=[hash[f1]])
+   +- Calc(select=[f1, f2])
+      +- TableSourceScan(table=[[default_catalog, default_database, T]], 
fields=[f0, f1, f2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPartialPrimaryKeyFilterAndProjection">
+    <Resource name="sql">
+      <![CDATA[SELECT f1, f5 FROM T WHERE f1 < 1 AND f3 IS NOT NULL]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(f1=[$1], f5=[$5])
++- LogicalFilter(condition=[AND(<($1, 1), IS NOT NULL($3))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[f1, f5], where=[IS NOT NULL(f3)])
++- ChangelogNormalize(key=[f1, f2])
+   +- Exchange(distribution=[hash[f1, f2]])
+      +- Calc(select=[f1, f2, f3, f5], where=[<(f1, 1)])
+         +- TableSourceScan(table=[[default_catalog, default_database, T]], 
fields=[f0, f1, f2, f3, f4, f5, f6, f7])
 ]]>
     </Resource>
   </TestCase>
@@ -76,6 +115,28 @@ ChangelogNormalize(key=[f1])
 +- Exchange(distribution=[hash[f1]])
    +- Calc(select=[f0, f1], where=[<(f1, 1)])
       +- TableSourceScan(table=[[default_catalog, default_database, T]], 
fields=[f0, f1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWithMultipleFilters">
+    <Resource name="sql">
+      <![CDATA[SELECT f1, SUM(f1) AS `sum` FROM T WHERE f1 < 10 AND (f1 > 3 OR 
f2 IS NULL) GROUP BY f1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0}], sum=[SUM($0)])
++- LogicalProject(f1=[$1])
+   +- LogicalFilter(condition=[AND(<($1, 10), OR(>($1, 3), IS NULL($2)))])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[f1, f1 AS sum], where=[OR(>(f1, 3), IS NULL(f2))])
++- ChangelogNormalize(key=[f1])
+   +- Exchange(distribution=[hash[f1]])
+      +- Calc(select=[f1, f2], where=[<(f1, 10)])
+         +- TableSourceScan(table=[[default_catalog, default_database, T]], 
fields=[f0, f1, f2])
 ]]>
     </Resource>
   </TestCase>

Reply via email to