[
https://issues.apache.org/jira/browse/FLINK-29120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
lincoln lee updated FLINK-29120:
--------------------------------
Description:
As expected, Join Hint should only affects the current query block, and does
not affect the Join strategy in subquery and view.
But current implementation behaviors inconsistently:
use source tables of flink-tpch-test, the following join hint takes effect
unexpectedly
{code:java}
Flink SQL> create temporary view v1 as SELECT
> p_name,
> p_mfgr,
> p_brand,
> p_type,
> s_name,
> s_address
> FROM
> part,
> supplier
> WHERE p_partkey = s_suppkey;
[INFO] Execute statement succeed.
Flink SQL> explain SELECT /*+ SHUFFLE_MERGE(part) */ * from v1;
== Abstract Syntax Tree ==
LogicalProject(p_name=[$1], p_mfgr=[$2], p_brand=[$3], p_type=[$4],
s_name=[$10], s_address=[$11])
+- LogicalFilter(condition=[=($0, $9)])
+- LogicalJoin(condition=[true], joinType=[inner],
joinHints=[[[SHUFFLE_MERGE inheritPath:[0, 0] options:[part]]]])
:- LogicalTableScan(table=[[default_catalog, default_database, part]])
+- LogicalTableScan(table=[[default_catalog, default_database, supplier]])
== Optimized Physical Plan ==
Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
+- SortMergeJoin(joinType=[InnerJoin], where=[=(p_partkey, s_suppkey)],
select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name,
s_address])
:- Exchange(distribution=[hash[p_partkey]])
: +- TableSourceScan(table=[[default_catalog, default_database, part,
project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]],
fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
+- Exchange(distribution=[hash[s_suppkey]])
+- TableSourceScan(table=[[default_catalog, default_database, supplier,
project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey,
s_name, s_address])
== Optimized Execution Plan ==
Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
+- SortMergeJoin(joinType=[InnerJoin], where=[(p_partkey = s_suppkey)],
select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name,
s_address])
:- Exchange(distribution=[hash[p_partkey]])
: +- TableSourceScan(table=[[default_catalog, default_database, part,
project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]],
fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
+- Exchange(distribution=[hash[s_suppkey]])
+- TableSourceScan(table=[[default_catalog, default_database, supplier,
project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey,
s_name, s_address])
{code}
without hint
{code}
Flink SQL> explain SELECT * from v1;
== Abstract Syntax Tree ==
LogicalProject(p_name=[$1], p_mfgr=[$2], p_brand=[$3], p_type=[$4],
s_name=[$10], s_address=[$11])
+- LogicalFilter(condition=[=($0, $9)])
+- LogicalJoin(condition=[true], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, part]])
+- LogicalTableScan(table=[[default_catalog, default_database, supplier]])
== Optimized Physical Plan ==
Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
+- HashJoin(joinType=[InnerJoin], where=[=(p_partkey, s_suppkey)],
select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name,
s_address], isBroadcast=[true], build=[right])
:- TableSourceScan(table=[[default_catalog, default_database, part,
project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]],
fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
+- Exchange(distribution=[broadcast])
+- TableSourceScan(table=[[default_catalog, default_database, supplier,
project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey,
s_name, s_address])
== Optimized Execution Plan ==
Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
+- MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[InnerJoin],
where=[(p_partkey = s_suppkey)], select=[p_partkey, p_name, p_mfgr, p_brand,
p_type, s_suppkey, s_name, s_address], isBroadcast=[true], build=[right])\n:-
[#1] TableSourceScan(table=[[default_catalog, default_database, part,
project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]],
fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])\n+- [#2]
Exchange(distribution=[broadcast])\n])
:- TableSourceScan(table=[[default_catalog, default_database, part,
project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]],
fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
+- Exchange(distribution=[broadcast])
+- TableSourceScan(table=[[default_catalog, default_database, supplier,
project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey,
s_name, s_address])
{code}
was:
As expected, Join Hint should only affects the current query block, and does
not affect the Join strategy in subquery and view.
But current implementation behaviors inconsistently:
use source tables of flink-tpch-test, the following join hint takes effect
unexpectedly
{code}
Flink SQL> create temporary view v1 as SELECT
> p_name,
> p_mfgr,
> p_brand,
> p_type,
> s_name,
> s_address
> FROM
> part,
> supplier
> WHERE p_partkey = s_suppkey;
[INFO] Execute statement succeed.
Flink SQL> explain SELECT /*+ SHUFFLE_MERGE(part) */ * from v1;
== Abstract Syntax Tree ==
LogicalProject(p_name=[$1], p_mfgr=[$2], p_brand=[$3], p_type=[$4],
s_name=[$10], s_address=[$11])
+- LogicalFilter(condition=[=($0, $9)])
+- LogicalJoin(condition=[true], joinType=[inner],
joinHints=[[[SHUFFLE_MERGE inheritPath:[0, 0] options:[part]]]])
:- LogicalTableScan(table=[[default_catalog, default_database, part]])
+- LogicalTableScan(table=[[default_catalog, default_database, supplier]])
== Optimized Physical Plan ==
Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
+- SortMergeJoin(joinType=[InnerJoin], where=[=(p_partkey, s_suppkey)],
select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name,
s_address])
:- Exchange(distribution=[hash[p_partkey]])
: +- TableSourceScan(table=[[default_catalog, default_database, part,
project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]],
fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
+- Exchange(distribution=[hash[s_suppkey]])
+- TableSourceScan(table=[[default_catalog, default_database, supplier,
project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey,
s_name, s_address])
== Optimized Execution Plan ==
Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
+- SortMergeJoin(joinType=[InnerJoin], where=[(p_partkey = s_suppkey)],
select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name,
s_address])
:- Exchange(distribution=[hash[p_partkey]])
: +- TableSourceScan(table=[[default_catalog, default_database, part,
project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]],
fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
+- Exchange(distribution=[hash[s_suppkey]])
+- TableSourceScan(table=[[default_catalog, default_database, supplier,
project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey,
s_name, s_address])
{code}
> Unexpected join hint propagation into view
> ------------------------------------------
>
> Key: FLINK-29120
> URL: https://issues.apache.org/jira/browse/FLINK-29120
> Project: Flink
> Issue Type: Bug
> Reporter: lincoln lee
> Priority: Blocker
>
> As expected, Join Hint should only affects the current query block, and does
> not affect the Join strategy in subquery and view.
> But current implementation behaviors inconsistently:
> use source tables of flink-tpch-test, the following join hint takes effect
> unexpectedly
> {code:java}
> Flink SQL> create temporary view v1 as SELECT
> > p_name,
> > p_mfgr,
> > p_brand,
> > p_type,
> > s_name,
> > s_address
> > FROM
> > part,
> > supplier
> > WHERE p_partkey = s_suppkey;
> [INFO] Execute statement succeed.
>
> Flink SQL> explain SELECT /*+ SHUFFLE_MERGE(part) */ * from v1;
> == Abstract Syntax Tree ==
> LogicalProject(p_name=[$1], p_mfgr=[$2], p_brand=[$3], p_type=[$4],
> s_name=[$10], s_address=[$11])
> +- LogicalFilter(condition=[=($0, $9)])
> +- LogicalJoin(condition=[true], joinType=[inner],
> joinHints=[[[SHUFFLE_MERGE inheritPath:[0, 0] options:[part]]]])
> :- LogicalTableScan(table=[[default_catalog, default_database, part]])
> +- LogicalTableScan(table=[[default_catalog, default_database,
> supplier]])
> == Optimized Physical Plan ==
> Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
> +- SortMergeJoin(joinType=[InnerJoin], where=[=(p_partkey, s_suppkey)],
> select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name,
> s_address])
> :- Exchange(distribution=[hash[p_partkey]])
> : +- TableSourceScan(table=[[default_catalog, default_database, part,
> project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]],
> fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
> +- Exchange(distribution=[hash[s_suppkey]])
> +- TableSourceScan(table=[[default_catalog, default_database, supplier,
> project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey,
> s_name, s_address])
> == Optimized Execution Plan ==
> Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
> +- SortMergeJoin(joinType=[InnerJoin], where=[(p_partkey = s_suppkey)],
> select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name,
> s_address])
> :- Exchange(distribution=[hash[p_partkey]])
> : +- TableSourceScan(table=[[default_catalog, default_database, part,
> project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]],
> fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
> +- Exchange(distribution=[hash[s_suppkey]])
> +- TableSourceScan(table=[[default_catalog, default_database, supplier,
> project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey,
> s_name, s_address])
> {code}
>
> without hint
> {code}
> Flink SQL> explain SELECT * from v1;
> == Abstract Syntax Tree ==
> LogicalProject(p_name=[$1], p_mfgr=[$2], p_brand=[$3], p_type=[$4],
> s_name=[$10], s_address=[$11])
> +- LogicalFilter(condition=[=($0, $9)])
> +- LogicalJoin(condition=[true], joinType=[inner])
> :- LogicalTableScan(table=[[default_catalog, default_database, part]])
> +- LogicalTableScan(table=[[default_catalog, default_database,
> supplier]])
> == Optimized Physical Plan ==
> Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
> +- HashJoin(joinType=[InnerJoin], where=[=(p_partkey, s_suppkey)],
> select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name,
> s_address], isBroadcast=[true], build=[right])
> :- TableSourceScan(table=[[default_catalog, default_database, part,
> project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]],
> fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
> +- Exchange(distribution=[broadcast])
> +- TableSourceScan(table=[[default_catalog, default_database, supplier,
> project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey,
> s_name, s_address])
> == Optimized Execution Plan ==
> Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
> +- MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[InnerJoin],
> where=[(p_partkey = s_suppkey)], select=[p_partkey, p_name, p_mfgr, p_brand,
> p_type, s_suppkey, s_name, s_address], isBroadcast=[true], build=[right])\n:-
> [#1] TableSourceScan(table=[[default_catalog, default_database, part,
> project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]],
> fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])\n+- [#2]
> Exchange(distribution=[broadcast])\n])
> :- TableSourceScan(table=[[default_catalog, default_database, part,
> project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]],
> fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
> +- Exchange(distribution=[broadcast])
> +- TableSourceScan(table=[[default_catalog, default_database, supplier,
> project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey,
> s_name, s_address])
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)