This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b7d6d67808 [FLINK-32844][table-planner] Does not apply runtime filter 
and DPP on same key
6b7d6d67808 is described below

commit 6b7d6d67808b7fcdd6fc93222c6c7055c4343475
Author: Lijie Wang <[email protected]>
AuthorDate: Tue Aug 15 14:08:27 2023 +0800

    [FLINK-32844][table-planner] Does not apply runtime filter and DPP on same 
key
    
    This closes #23215
---
 .../program/FlinkRuntimeFilterProgram.java         | 13 +++++++
 .../utils/DynamicPartitionPruningUtils.java        |  3 +-
 ...chPhysicalDynamicFilteringTableSourceScan.scala |  9 +++--
 .../program/FlinkRuntimeFilterProgramTest.java     | 45 ++++++++++++++++++++++
 .../program/FlinkRuntimeFilterProgramTest.xml      | 37 ++++++++++++++++++
 5 files changed, 103 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java
index 4acb6e4097d..b393913a3b6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.plan.optimize.program;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan;
 import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
 import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
 import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalHashJoin;
@@ -52,9 +53,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
@@ -474,6 +477,16 @@ public class FlinkRuntimeFilterProgram implements 
FlinkOptimizeProgram<BatchOpti
                         tryPushDownProbeAndInjectRuntimeFilter(input, 
probeIndices, buildSideInfo));
             }
             return union.copy(union.getTraitSet(), newInputs, union.all);
+        } else if (rel instanceof 
BatchPhysicalDynamicFilteringTableSourceScan) {
+            BatchPhysicalDynamicFilteringTableSourceScan tableScan =
+                    (BatchPhysicalDynamicFilteringTableSourceScan) rel;
+            final Set<Integer> dynamicFilteringIndices =
+                    new HashSet<>(tableScan.dynamicFilteringIndices());
+            if (dynamicFilteringIndices.containsAll(probeIndices)) {
+                // do nothing, return current probe side directly. Because the 
fields have already
+                // filtered by DPP.
+                return rel;
+            }
         } else {
             // the above cases can cover all cases of TPC-DS test
             // we may find more cases later
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
index 90f7b40bc0b..d6cde62b6ee 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
@@ -346,7 +346,8 @@ public class DynamicPartitionPruningUtils {
                         scan.getTraitSet(),
                         scan.getHints(),
                         tableSourceTable,
-                        dynamicFilteringDataCollector);
+                        dynamicFilteringDataCollector,
+                        acceptedFieldIndices);
             } else if (rel instanceof Exchange || rel instanceof Filter) {
                 return rel.copy(
                         rel.getTraitSet(),
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala
index 90fbc030845..1826e3b0c04 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala
@@ -40,7 +40,8 @@ class BatchPhysicalDynamicFilteringTableSourceScan(
     traitSet: RelTraitSet,
     hints: util.List[RelHint],
     tableSourceTable: TableSourceTable,
-    var input: RelNode) // var for updating
+    var input: RelNode, // var for updating
+    val dynamicFilteringIndices: util.List[Integer])
   extends BatchPhysicalTableSourceScan(cluster, traitSet, hints, 
tableSourceTable) {
 
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
@@ -49,7 +50,8 @@ class BatchPhysicalDynamicFilteringTableSourceScan(
       traitSet,
       getHints,
       tableSourceTable,
-      inputs.get(0))
+      inputs.get(0),
+      dynamicFilteringIndices)
   }
 
   override def copy(
@@ -60,7 +62,8 @@ class BatchPhysicalDynamicFilteringTableSourceScan(
       traitSet,
       getHints,
       tableSourceTable,
-      input)
+      input,
+      dynamicFilteringIndices)
   }
 
   override def replaceInput(ordinalInParent: Int, rel: RelNode): Unit = {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java
index d6c28fb487a..0e031b7cbee 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.planner.plan.optimize.program;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
@@ -33,6 +35,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.HashMap;
 
 /** Test for {@link FlinkRuntimeFilterProgram}. */
 public class FlinkRuntimeFilterProgramTest extends TableTestBase {
@@ -388,6 +391,48 @@ public class FlinkRuntimeFilterProgramTest extends 
TableTestBase {
         util.verifyPlan(query);
     }
 
+    @Test
+    public void testDoesNotApplyRuntimeFilterAndDPPOnSameKey() throws 
Exception {
+        // runtime filter will not success, because already applied DPP on the 
key
+        setupTableRowCount("dim", SUITABLE_DIM_ROW_COUNT);
+        createPartitionedFactTable(SUITABLE_FACT_ROW_COUNT);
+        String query =
+                "select * from dim, fact_part where fact_part.fact_date_sk = 
dim.dim_date_sk and dim.price < 500";
+        util.verifyPlan(query);
+    }
+
+    private void createPartitionedFactTable(long rowCount) throws Exception {
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE fact_part (\n"
+                                + "  id BIGINT,\n"
+                                + "  name STRING,\n"
+                                + "  amount BIGINT,\n"
+                                + "  price BIGINT,\n"
+                                + "  fact_date_sk BIGINT\n"
+                                + ") PARTITIONED BY (fact_date_sk)\n"
+                                + "WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'runtime-source' = 'NewSource',\n"
+                                + " 'partition-list' = 
'fact_date_sk:1990;fact_date_sk:1991;fact_date_sk:1992',\n"
+                                + " 'dynamic-filtering-fields' = 
'fact_date_sk;amount',\n"
+                                + " 'bounded' = 'true'\n"
+                                + ")");
+
+        final CatalogPartitionSpec partSpec =
+                new 
CatalogPartitionSpec(Collections.singletonMap("fact_date_sk", "666"));
+        catalog.createPartition(
+                new ObjectPath(util.getTableEnv().getCurrentDatabase(), 
"fact_part"),
+                partSpec,
+                new CatalogPartitionImpl(new HashMap<>(), ""),
+                true);
+        catalog.alterPartitionStatistics(
+                new ObjectPath(util.getTableEnv().getCurrentDatabase(), 
"fact_part"),
+                partSpec,
+                new CatalogTableStatistics(rowCount, 10, 1000L, 2000L),
+                true);
+    }
+
     private void setupSuitableTableStatistics() throws Exception {
         setupTableRowCount("dim", SUITABLE_DIM_ROW_COUNT);
         setupTableRowCount("fact", SUITABLE_FACT_ROW_COUNT);
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.xml
index 3d0014c6d0e..4cfe355fa24 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.xml
@@ -1010,6 +1010,43 @@ HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id,
 :     +- TableSourceScan(table=[[testCatalog, test_database, fact, filter=[], 
project=[id, amount, price, fact_date_sk], metadata=[]]], fields=[id, amount, 
price, fact_date_sk])
 +- Exchange(distribution=[hash[dim_date_sk]])
    +- Reused(reference_id=[1])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testDoesNotApplyRuntimeFilterAndDPPOnSameKey">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], male=[$1], amount=[$2], price=[$3], dim_date_sk=[$4], 
id0=[$5], name=[$6], amount0=[$7], price0=[$8], fact_date_sk=[$9])
++- LogicalFilter(condition=[AND(=($9, $4), <($3, 500))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], 
select=[id, male, amount, price, dim_date_sk, id0, name, amount0, price0, 
fact_date_sk], build=[left])
+:- Exchange(distribution=[hash[dim_date_sk]])
+:  +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+:     +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
++- Exchange(distribution=[hash[fact_date_sk]])
+   +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, 
fact_part]], fields=[id, name, amount, price, fact_date_sk])
+      +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+         +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[<(price, 500)])
+            +- TableSourceScan(table=[[testCatalog, test_database, dim, 
filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], 
select=[id, male, amount, price, dim_date_sk, id0, name, amount0, price0, 
fact_date_sk], build=[left])
+:- Exchange(distribution=[hash[dim_date_sk]])
+:  +- Calc(select=[id, male, amount, price, dim_date_sk], where=[(price < 
500)])(reuse_id=[1])
+:     +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
++- Exchange(distribution=[hash[fact_date_sk]])
+   +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, 
fact_part]], fields=[id, name, amount, price, fact_date_sk])
+      +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+         +- Reused(reference_id=[1])
 ]]>
                </Resource>
        </TestCase>

Reply via email to