godfreyhe commented on code in PR #21489:
URL: https://github.com/apache/flink/pull/21489#discussion_r1063054685


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -137,6 +140,20 @@ private static RelNode convertDppFactSide(
                     || !(tableSource instanceof ScanTableSource)) {
                 return rel;
             }
+
+            // Dpp cannot success if source support aggregate push down, 
source aggregate push
+            // down enabled is true and aggregate push down success.
+            if (tableSource instanceof SupportsAggregatePushDown
+                    && ShortcutUtils.unwrapContext(rel)
+                            .getTableConfig()
+                            .get(
+                                    OptimizerConfigOptions
+                                            
.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)
+                    && Arrays.stream(tableSourceTable.abilitySpecs())

Review Comment:
   just need check 
`Arrays.stream(tableSourceTable.abilitySpecs()).anyMatch(spec -> spec 
instanceof AggregatePushDownSpec)`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle;
+import org.apache.flink.table.planner.utils.DynamicPartitionPruningUtils;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner program that tries to do partition prune in the execution phase, 
which can translate a
+ * {@link BatchPhysicalTableSourceScan} to a {@link 
BatchPhysicalDynamicFilteringTableSourceScan}
+ * whose source is a partition source. The {@link
+ * OptimizerConfigOptions#TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED} need to 
be true.
+ *
+ * <p>Suppose we have the original physical plan:
+ *
+ * <pre>{@code
+ * LogicalProject(...)
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], 
select=[xxx])
+ *  * :- TableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key],) # 
Is a partition table.
+ *  * +- Exchange(distribution=[broadcast])
+ *  *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary 
filter condition.
+ *  *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }</pre>
+ *
+ * <p>This physical plan will be rewritten to:
+ *
+ * <pre>{@code
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], 
select=[xxx])
+ * :- DynamicFilteringTableSourceScan(table=[[fact]], fields=[xxx, 
fact_partition_key]) # Is a partition table.
+ * :  +- DynamicFilteringDataCollector(fields=[dim_key])
+ * :     +- Calc(select=[xxx], where=[<(xxx, xxx)])
+ * :        +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * +- Exchange(distribution=[broadcast])
+ *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary 
filter condition.
+ *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }</pre>
+ *
+ * <p>We use a {@link FlinkOptimizeProgram} instead of a {@link 
org.apache.calcite.plan.RelRule} to
+ * realize dynamic partition pruning because the {@link 
org.apache.calcite.plan.hep.HepPlanner} in
+ * Flink doesn't support matching a simple join, and replacing one node on one 
side of the join
+ * node. After that, rebuilding this join node. This is a defect of the 
existing optimizer, and it's
+ * matching pattern need to be simpler. Only then can we use {@link 
org.apache.calcite.plan.RelRule}
+ * to achieve dpp.
+ */
+public class FlinkDynamicPartitionPruningProgram
+        implements FlinkOptimizeProgram<BatchOptimizeContext> {
+
+    @Override
+    public RelNode optimize(RelNode root, BatchOptimizeContext context) {
+        DefaultRelShuttle shuttle =
+                new DefaultRelShuttle() {
+                    @Override
+                    public RelNode visit(RelNode rel) {
+                        if (!ShortcutUtils.unwrapContext(rel)
+                                .getTableConfig()
+                                .get(
+                                        OptimizerConfigOptions
+                                                
.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
+                            return rel;
+                        }

Review Comment:
   move these code to the begin of the `optimize` method



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -45,153 +52,177 @@
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.util.ImmutableIntList;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
 /** Planner utils for Dynamic partition Pruning. */
 public class DynamicPartitionPruningUtils {
 
-    /**
-     * For the input join node, judge whether the join left side and join 
right side meet the
-     * requirements of dynamic partition pruning. Fact side in left or right 
join is not clear.
-     */
-    public static boolean supportDynamicPartitionPruning(Join join) {
-        return supportDynamicPartitionPruning(join, true)
-                || supportDynamicPartitionPruning(join, false);
-    }
-
-    /**
-     * For the input join node, judge whether the join left side and join 
right side meet the
-     * requirements of dynamic partition pruning. Fact side in left or right 
is clear. If meets the
-     * requirements, return true.
-     */
-    public static boolean supportDynamicPartitionPruning(Join join, boolean 
factInLeft) {
-        if (!ShortcutUtils.unwrapContext(join)
-                .getTableConfig()
-                
.get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
-            return false;
-        }
-        // Now dynamic partition pruning supports left/right join, inner and 
semi join. but now semi
-        // join can not join reorder.
-        if (join.getJoinType() == JoinRelType.LEFT) {
-            if (factInLeft) {
-                return false;
-            }
-        } else if (join.getJoinType() == JoinRelType.RIGHT) {
-            if (!factInLeft) {
-                return false;
-            }
-        } else if (join.getJoinType() != JoinRelType.INNER
-                && join.getJoinType() != JoinRelType.SEMI) {
-            return false;
-        }
-
-        JoinInfo joinInfo = join.analyzeCondition();
-        if (joinInfo.leftKeys.isEmpty()) {
-            return false;
-        }
-        RelNode left = join.getLeft();
-        RelNode right = join.getRight();
-
-        // TODO Now fact side and dim side don't support many complex 
patterns, like join inside
-        // fact/dim side, agg inside fact/dim side etc. which will support 
next.
-        return factInLeft
-                ? isDynamicPartitionPruningPattern(left, right, 
joinInfo.leftKeys)
-                : isDynamicPartitionPruningPattern(right, left, 
joinInfo.rightKeys);
-    }
-
-    private static boolean isDynamicPartitionPruningPattern(
-            RelNode factSide, RelNode dimSide, ImmutableIntList 
factSideJoinKey) {
-        return isDimSide(dimSide) && isFactSide(factSide, factSideJoinKey);
-    }
-
-    /** make a dpp fact side factor to recurrence in fact side. */
-    private static boolean isFactSide(RelNode rel, ImmutableIntList joinKeys) {
-        DppFactSideFactors factSideFactors = new DppFactSideFactors();
-        visitFactSide(rel, factSideFactors, joinKeys);
-        return factSideFactors.isFactSide();
-    }
-
     /**
      * Judge whether input RelNode meets the conditions of dimSide. If 
joinKeys is null means we
      * need not consider the join keys in dim side, which already deal by 
dynamic partition pruning
      * rule. If joinKeys not null means we need to judge whether joinKeys 
changed in dim side, if
      * changed, this RelNode is not dim side.
      */
-    private static boolean isDimSide(RelNode rel) {
+    public static boolean isDimSide(RelNode rel) {
         DppDimSideFactors dimSideFactors = new DppDimSideFactors();
         visitDimSide(rel, dimSideFactors);
         return dimSideFactors.isDimSide();

Review Comment:
   Rename `DppDimSideFactors` to `DppDimSideChecker` and extract the logic into 
`DppDimSideChecker`  ?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle;
+import org.apache.flink.table.planner.utils.DynamicPartitionPruningUtils;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner program that tries to do partition prune in the execution phase, 
which can translate a
+ * {@link BatchPhysicalTableSourceScan} to a {@link 
BatchPhysicalDynamicFilteringTableSourceScan}
+ * whose source is a partition source. The {@link
+ * OptimizerConfigOptions#TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED} need to 
be true.
+ *
+ * <p>Suppose we have the original physical plan:
+ *
+ * <pre>{@code
+ * LogicalProject(...)
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], 
select=[xxx])
+ *  * :- TableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key],) # 
Is a partition table.
+ *  * +- Exchange(distribution=[broadcast])
+ *  *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary 
filter condition.
+ *  *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }</pre>
+ *
+ * <p>This physical plan will be rewritten to:
+ *
+ * <pre>{@code
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], 
select=[xxx])
+ * :- DynamicFilteringTableSourceScan(table=[[fact]], fields=[xxx, 
fact_partition_key]) # Is a partition table.
+ * :  +- DynamicFilteringDataCollector(fields=[dim_key])
+ * :     +- Calc(select=[xxx], where=[<(xxx, xxx)])
+ * :        +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * +- Exchange(distribution=[broadcast])
+ *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary 
filter condition.
+ *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }</pre>
+ *
+ * <p>We use a {@link FlinkOptimizeProgram} instead of a {@link 
org.apache.calcite.plan.RelRule} to
+ * realize dynamic partition pruning because the {@link 
org.apache.calcite.plan.hep.HepPlanner} in
+ * Flink doesn't support matching a simple join, and replacing one node on one 
side of the join
+ * node. After that, rebuilding this join node. This is a defect of the 
existing optimizer, and it's
+ * matching pattern need to be simpler. Only then can we use {@link 
org.apache.calcite.plan.RelRule}
+ * to achieve dpp.

Review Comment:
    * <p>NOTE: We use a {@link FlinkOptimizeProgram} instead of a {@link
    * org.apache.calcite.plan.RelRule} here because the {@link 
org.apache.calcite.plan.hep.HepPlanner}
    * doesn't support matching a partially determined pattern or dynamically 
replacing the inputs of
    * matched nodes. Once we improve HepPlanner, this class can be converted to 
RelRule.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -45,153 +52,177 @@
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.util.ImmutableIntList;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
 /** Planner utils for Dynamic partition Pruning. */
 public class DynamicPartitionPruningUtils {
 
-    /**
-     * For the input join node, judge whether the join left side and join 
right side meet the
-     * requirements of dynamic partition pruning. Fact side in left or right 
join is not clear.
-     */
-    public static boolean supportDynamicPartitionPruning(Join join) {
-        return supportDynamicPartitionPruning(join, true)
-                || supportDynamicPartitionPruning(join, false);
-    }
-
-    /**
-     * For the input join node, judge whether the join left side and join 
right side meet the
-     * requirements of dynamic partition pruning. Fact side in left or right 
is clear. If meets the
-     * requirements, return true.
-     */
-    public static boolean supportDynamicPartitionPruning(Join join, boolean 
factInLeft) {
-        if (!ShortcutUtils.unwrapContext(join)
-                .getTableConfig()
-                
.get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
-            return false;
-        }
-        // Now dynamic partition pruning supports left/right join, inner and 
semi join. but now semi
-        // join can not join reorder.
-        if (join.getJoinType() == JoinRelType.LEFT) {
-            if (factInLeft) {
-                return false;
-            }
-        } else if (join.getJoinType() == JoinRelType.RIGHT) {
-            if (!factInLeft) {
-                return false;
-            }
-        } else if (join.getJoinType() != JoinRelType.INNER
-                && join.getJoinType() != JoinRelType.SEMI) {
-            return false;
-        }
-
-        JoinInfo joinInfo = join.analyzeCondition();
-        if (joinInfo.leftKeys.isEmpty()) {
-            return false;
-        }
-        RelNode left = join.getLeft();
-        RelNode right = join.getRight();
-
-        // TODO Now fact side and dim side don't support many complex 
patterns, like join inside
-        // fact/dim side, agg inside fact/dim side etc. which will support 
next.
-        return factInLeft
-                ? isDynamicPartitionPruningPattern(left, right, 
joinInfo.leftKeys)
-                : isDynamicPartitionPruningPattern(right, left, 
joinInfo.rightKeys);
-    }
-
-    private static boolean isDynamicPartitionPruningPattern(
-            RelNode factSide, RelNode dimSide, ImmutableIntList 
factSideJoinKey) {
-        return isDimSide(dimSide) && isFactSide(factSide, factSideJoinKey);
-    }
-
-    /** make a dpp fact side factor to recurrence in fact side. */
-    private static boolean isFactSide(RelNode rel, ImmutableIntList joinKeys) {
-        DppFactSideFactors factSideFactors = new DppFactSideFactors();
-        visitFactSide(rel, factSideFactors, joinKeys);
-        return factSideFactors.isFactSide();
-    }
-
     /**
      * Judge whether input RelNode meets the conditions of dimSide. If 
joinKeys is null means we
      * need not consider the join keys in dim side, which already deal by 
dynamic partition pruning
      * rule. If joinKeys not null means we need to judge whether joinKeys 
changed in dim side, if
      * changed, this RelNode is not dim side.
      */
-    private static boolean isDimSide(RelNode rel) {
+    public static boolean isDimSide(RelNode rel) {
         DppDimSideFactors dimSideFactors = new DppDimSideFactors();
         visitDimSide(rel, dimSideFactors);
         return dimSideFactors.isDimSide();
     }
 
-    /**
-     * Visit fact side to judge whether fact side has partition table, 
partition table source meets
-     * the condition of dpp table source and dynamic filtering keys changed in 
fact side.
-     */
-    private static void visitFactSide(
-            RelNode rel, DppFactSideFactors factSideFactors, ImmutableIntList 
joinKeys) {
+    public static Tuple2<Boolean, RelNode> canConvertAndConvertDppFactSide(
+            RelNode rel,
+            ImmutableIntList joinKeys,
+            RelNode dimSide,
+            ImmutableIntList dimSideJoinKey) {
+        DppFactSideFactors factSideFactors = new DppFactSideFactors();

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to