This is an automated email from the ASF dual-hosted git repository.
wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6b7d6d67808 [FLINK-32844][table-planner] Does not apply runtime filter
and DPP on same key
6b7d6d67808 is described below
commit 6b7d6d67808b7fcdd6fc93222c6c7055c4343475
Author: Lijie Wang <[email protected]>
AuthorDate: Tue Aug 15 14:08:27 2023 +0800
[FLINK-32844][table-planner] Does not apply runtime filter and DPP on same
key
This closes #23215
---
.../program/FlinkRuntimeFilterProgram.java | 13 +++++++
.../utils/DynamicPartitionPruningUtils.java | 3 +-
...chPhysicalDynamicFilteringTableSourceScan.scala | 9 +++--
.../program/FlinkRuntimeFilterProgramTest.java | 45 ++++++++++++++++++++++
.../program/FlinkRuntimeFilterProgramTest.xml | 37 ++++++++++++++++++
5 files changed, 103 insertions(+), 4 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java
index 4acb6e4097d..b393913a3b6 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.plan.optimize.program;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan;
import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalHashJoin;
@@ -52,9 +53,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -474,6 +477,16 @@ public class FlinkRuntimeFilterProgram implements
FlinkOptimizeProgram<BatchOpti
tryPushDownProbeAndInjectRuntimeFilter(input,
probeIndices, buildSideInfo));
}
return union.copy(union.getTraitSet(), newInputs, union.all);
+ } else if (rel instanceof
BatchPhysicalDynamicFilteringTableSourceScan) {
+ BatchPhysicalDynamicFilteringTableSourceScan tableScan =
+ (BatchPhysicalDynamicFilteringTableSourceScan) rel;
+ final Set<Integer> dynamicFilteringIndices =
+ new HashSet<>(tableScan.dynamicFilteringIndices());
+ if (dynamicFilteringIndices.containsAll(probeIndices)) {
+ // do nothing, return current probe side directly. Because the
fields have already
+ // filtered by DPP.
+ return rel;
+ }
} else {
// the above cases can cover all cases of TPC-DS test
// we may find more cases later
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
index 90f7b40bc0b..d6cde62b6ee 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
@@ -346,7 +346,8 @@ public class DynamicPartitionPruningUtils {
scan.getTraitSet(),
scan.getHints(),
tableSourceTable,
- dynamicFilteringDataCollector);
+ dynamicFilteringDataCollector,
+ acceptedFieldIndices);
} else if (rel instanceof Exchange || rel instanceof Filter) {
return rel.copy(
rel.getTraitSet(),
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala
index 90fbc030845..1826e3b0c04 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala
@@ -40,7 +40,8 @@ class BatchPhysicalDynamicFilteringTableSourceScan(
traitSet: RelTraitSet,
hints: util.List[RelHint],
tableSourceTable: TableSourceTable,
- var input: RelNode) // var for updating
+ var input: RelNode, // var for updating
+ val dynamicFilteringIndices: util.List[Integer])
extends BatchPhysicalTableSourceScan(cluster, traitSet, hints,
tableSourceTable) {
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]):
RelNode = {
@@ -49,7 +50,8 @@ class BatchPhysicalDynamicFilteringTableSourceScan(
traitSet,
getHints,
tableSourceTable,
- inputs.get(0))
+ inputs.get(0),
+ dynamicFilteringIndices)
}
override def copy(
@@ -60,7 +62,8 @@ class BatchPhysicalDynamicFilteringTableSourceScan(
traitSet,
getHints,
tableSourceTable,
- input)
+ input,
+ dynamicFilteringIndices)
}
override def replaceInput(ordinalInParent: Int, rel: RelNode): Unit = {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java
index d6c28fb487a..0e031b7cbee 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.planner.plan.optimize.program;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
@@ -33,6 +35,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
+import java.util.HashMap;
/** Test for {@link FlinkRuntimeFilterProgram}. */
public class FlinkRuntimeFilterProgramTest extends TableTestBase {
@@ -388,6 +391,48 @@ public class FlinkRuntimeFilterProgramTest extends
TableTestBase {
util.verifyPlan(query);
}
+ @Test
+ public void testDoesNotApplyRuntimeFilterAndDPPOnSameKey() throws
Exception {
+ // runtime filter will not success, because already applied DPP on the
key
+ setupTableRowCount("dim", SUITABLE_DIM_ROW_COUNT);
+ createPartitionedFactTable(SUITABLE_FACT_ROW_COUNT);
+ String query =
+ "select * from dim, fact_part where fact_part.fact_date_sk =
dim.dim_date_sk and dim.price < 500";
+ util.verifyPlan(query);
+ }
+
+ private void createPartitionedFactTable(long rowCount) throws Exception {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE fact_part (\n"
+ + " id BIGINT,\n"
+ + " name STRING,\n"
+ + " amount BIGINT,\n"
+ + " price BIGINT,\n"
+ + " fact_date_sk BIGINT\n"
+ + ") PARTITIONED BY (fact_date_sk)\n"
+ + "WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'runtime-source' = 'NewSource',\n"
+ + " 'partition-list' =
'fact_date_sk:1990;fact_date_sk:1991;fact_date_sk:1992',\n"
+ + " 'dynamic-filtering-fields' =
'fact_date_sk;amount',\n"
+ + " 'bounded' = 'true'\n"
+ + ")");
+
+ final CatalogPartitionSpec partSpec =
+ new
CatalogPartitionSpec(Collections.singletonMap("fact_date_sk", "666"));
+ catalog.createPartition(
+ new ObjectPath(util.getTableEnv().getCurrentDatabase(),
"fact_part"),
+ partSpec,
+ new CatalogPartitionImpl(new HashMap<>(), ""),
+ true);
+ catalog.alterPartitionStatistics(
+ new ObjectPath(util.getTableEnv().getCurrentDatabase(),
"fact_part"),
+ partSpec,
+ new CatalogTableStatistics(rowCount, 10, 1000L, 2000L),
+ true);
+ }
+
private void setupSuitableTableStatistics() throws Exception {
setupTableRowCount("dim", SUITABLE_DIM_ROW_COUNT);
setupTableRowCount("fact", SUITABLE_FACT_ROW_COUNT);
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.xml
index 3d0014c6d0e..4cfe355fa24 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.xml
@@ -1010,6 +1010,43 @@ HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id,
: +- TableSourceScan(table=[[testCatalog, test_database, fact, filter=[],
project=[id, amount, price, fact_date_sk], metadata=[]]], fields=[id, amount,
price, fact_date_sk])
+- Exchange(distribution=[hash[dim_date_sk]])
+- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
+
+ <TestCase name="testDoesNotApplyRuntimeFilterAndDPPOnSameKey">
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], male=[$1], amount=[$2], price=[$3], dim_date_sk=[$4],
id0=[$5], name=[$6], amount0=[$7], price0=[$8], fact_date_sk=[$9])
++- LogicalFilter(condition=[AND(=($9, $4), <($3, 500))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+ +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)],
select=[id, male, amount, price, dim_date_sk, id0, name, amount0, price0,
fact_date_sk], build=[left])
+:- Exchange(distribution=[hash[dim_date_sk]])
+: +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price,
500)])
+: +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]],
fields=[id, male, amount, price, dim_date_sk])
++- Exchange(distribution=[hash[fact_date_sk]])
+ +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database,
fact_part]], fields=[id, name, amount, price, fact_date_sk])
+ +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+ +- Calc(select=[id, male, amount, price, dim_date_sk],
where=[<(price, 500)])
+ +- TableSourceScan(table=[[testCatalog, test_database, dim,
filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)],
select=[id, male, amount, price, dim_date_sk, id0, name, amount0, price0,
fact_date_sk], build=[left])
+:- Exchange(distribution=[hash[dim_date_sk]])
+: +- Calc(select=[id, male, amount, price, dim_date_sk], where=[(price <
500)])(reuse_id=[1])
+: +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]],
fields=[id, male, amount, price, dim_date_sk])
++- Exchange(distribution=[hash[fact_date_sk]])
+ +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database,
fact_part]], fields=[id, name, amount, price, fact_date_sk])
+ +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+ +- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>