[ 
https://issues.apache.org/jira/browse/FLINK-29120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-29120:
--------------------------------
    Description: 
As expected, Join Hint should only affects the current query block, and does 
not affect the Join strategy in subquery and view.

But current implementation behaviors inconsistently:

use source tables of flink-tpch-test, the following join hint takes effect 
unexpectedly
{code:java}
Flink SQL> create temporary view v1 as SELECT
>    p_name,
>    p_mfgr,
>    p_brand,
>    p_type,
>    s_name,
>    s_address
>  FROM
>    part,
>    supplier
>  WHERE p_partkey = s_suppkey;
[INFO] Execute statement succeed.

 


Flink SQL> explain SELECT /*+ SHUFFLE_MERGE(part)  */ * from v1;
== Abstract Syntax Tree ==
LogicalProject(p_name=[$1], p_mfgr=[$2], p_brand=[$3], p_type=[$4], 
s_name=[$10], s_address=[$11])
+- LogicalFilter(condition=[=($0, $9)])
   +- LogicalJoin(condition=[true], joinType=[inner], 
joinHints=[[[SHUFFLE_MERGE inheritPath:[0, 0] options:[part]]]])
      :- LogicalTableScan(table=[[default_catalog, default_database, part]])
      +- LogicalTableScan(table=[[default_catalog, default_database, supplier]])

== Optimized Physical Plan ==
Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
+- SortMergeJoin(joinType=[InnerJoin], where=[=(p_partkey, s_suppkey)], 
select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, 
s_address])
   :- Exchange(distribution=[hash[p_partkey]])
   :  +- TableSourceScan(table=[[default_catalog, default_database, part, 
project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], 
fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
   +- Exchange(distribution=[hash[s_suppkey]])
      +- TableSourceScan(table=[[default_catalog, default_database, supplier, 
project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, 
s_name, s_address])

== Optimized Execution Plan ==
Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
+- SortMergeJoin(joinType=[InnerJoin], where=[(p_partkey = s_suppkey)], 
select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, 
s_address])
   :- Exchange(distribution=[hash[p_partkey]])
   :  +- TableSourceScan(table=[[default_catalog, default_database, part, 
project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], 
fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
   +- Exchange(distribution=[hash[s_suppkey]])
      +- TableSourceScan(table=[[default_catalog, default_database, supplier, 
project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, 
s_name, s_address])

{code}
 

without hint

{code}

Flink SQL> explain SELECT * from v1;
== Abstract Syntax Tree ==
LogicalProject(p_name=[$1], p_mfgr=[$2], p_brand=[$3], p_type=[$4], 
s_name=[$10], s_address=[$11])
+- LogicalFilter(condition=[=($0, $9)])
   +- LogicalJoin(condition=[true], joinType=[inner])
      :- LogicalTableScan(table=[[default_catalog, default_database, part]])
      +- LogicalTableScan(table=[[default_catalog, default_database, supplier]])

== Optimized Physical Plan ==
Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
+- HashJoin(joinType=[InnerJoin], where=[=(p_partkey, s_suppkey)], 
select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, 
s_address], isBroadcast=[true], build=[right])
   :- TableSourceScan(table=[[default_catalog, default_database, part, 
project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], 
fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
   +- Exchange(distribution=[broadcast])
      +- TableSourceScan(table=[[default_catalog, default_database, supplier, 
project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, 
s_name, s_address])

== Optimized Execution Plan ==
Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
+- MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[InnerJoin], 
where=[(p_partkey = s_suppkey)], select=[p_partkey, p_name, p_mfgr, p_brand, 
p_type, s_suppkey, s_name, s_address], isBroadcast=[true], build=[right])\n:- 
[#1] TableSourceScan(table=[[default_catalog, default_database, part, 
project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], 
fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])\n+- [#2] 
Exchange(distribution=[broadcast])\n])
   :- TableSourceScan(table=[[default_catalog, default_database, part, 
project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], 
fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
   +- Exchange(distribution=[broadcast])
      +- TableSourceScan(table=[[default_catalog, default_database, supplier, 
project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, 
s_name, s_address])

{code}

 

 

  was:
As expected, Join Hint should only affects the current query block, and does 
not affect the Join strategy in subquery and view.

But current implementation behaviors inconsistently:

use source tables of flink-tpch-test, the following join hint takes effect 
unexpectedly

{code}


Flink SQL> create temporary view v1 as SELECT
>    p_name,
>    p_mfgr,
>    p_brand,
>    p_type,
>    s_name,
>    s_address
>  FROM
>    part,
>    supplier
>  WHERE p_partkey = s_suppkey;
[INFO] Execute statement succeed.

 


Flink SQL> explain SELECT /*+ SHUFFLE_MERGE(part)  */ * from v1;
== Abstract Syntax Tree ==
LogicalProject(p_name=[$1], p_mfgr=[$2], p_brand=[$3], p_type=[$4], 
s_name=[$10], s_address=[$11])
+- LogicalFilter(condition=[=($0, $9)])
   +- LogicalJoin(condition=[true], joinType=[inner], 
joinHints=[[[SHUFFLE_MERGE inheritPath:[0, 0] options:[part]]]])
      :- LogicalTableScan(table=[[default_catalog, default_database, part]])
      +- LogicalTableScan(table=[[default_catalog, default_database, supplier]])

== Optimized Physical Plan ==
Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
+- SortMergeJoin(joinType=[InnerJoin], where=[=(p_partkey, s_suppkey)], 
select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, 
s_address])
   :- Exchange(distribution=[hash[p_partkey]])
   :  +- TableSourceScan(table=[[default_catalog, default_database, part, 
project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], 
fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
   +- Exchange(distribution=[hash[s_suppkey]])
      +- TableSourceScan(table=[[default_catalog, default_database, supplier, 
project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, 
s_name, s_address])

== Optimized Execution Plan ==
Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
+- SortMergeJoin(joinType=[InnerJoin], where=[(p_partkey = s_suppkey)], 
select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, 
s_address])
   :- Exchange(distribution=[hash[p_partkey]])
   :  +- TableSourceScan(table=[[default_catalog, default_database, part, 
project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], 
fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
   +- Exchange(distribution=[hash[s_suppkey]])
      +- TableSourceScan(table=[[default_catalog, default_database, supplier, 
project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, 
s_name, s_address])

{code}

 

 

 


> Unexpected join hint propagation into view
> ------------------------------------------
>
>                 Key: FLINK-29120
>                 URL: https://issues.apache.org/jira/browse/FLINK-29120
>             Project: Flink
>          Issue Type: Bug
>            Reporter: lincoln lee
>            Priority: Blocker
>
> As expected, Join Hint should only affects the current query block, and does 
> not affect the Join strategy in subquery and view.
> But current implementation behaviors inconsistently:
> use source tables of flink-tpch-test, the following join hint takes effect 
> unexpectedly
> {code:java}
> Flink SQL> create temporary view v1 as SELECT
> >    p_name,
> >    p_mfgr,
> >    p_brand,
> >    p_type,
> >    s_name,
> >    s_address
> >  FROM
> >    part,
> >    supplier
> >  WHERE p_partkey = s_suppkey;
> [INFO] Execute statement succeed.
>  
> Flink SQL> explain SELECT /*+ SHUFFLE_MERGE(part)  */ * from v1;
> == Abstract Syntax Tree ==
> LogicalProject(p_name=[$1], p_mfgr=[$2], p_brand=[$3], p_type=[$4], 
> s_name=[$10], s_address=[$11])
> +- LogicalFilter(condition=[=($0, $9)])
>    +- LogicalJoin(condition=[true], joinType=[inner], 
> joinHints=[[[SHUFFLE_MERGE inheritPath:[0, 0] options:[part]]]])
>       :- LogicalTableScan(table=[[default_catalog, default_database, part]])
>       +- LogicalTableScan(table=[[default_catalog, default_database, 
> supplier]])
> == Optimized Physical Plan ==
> Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
> +- SortMergeJoin(joinType=[InnerJoin], where=[=(p_partkey, s_suppkey)], 
> select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, 
> s_address])
>    :- Exchange(distribution=[hash[p_partkey]])
>    :  +- TableSourceScan(table=[[default_catalog, default_database, part, 
> project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], 
> fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
>    +- Exchange(distribution=[hash[s_suppkey]])
>       +- TableSourceScan(table=[[default_catalog, default_database, supplier, 
> project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, 
> s_name, s_address])
> == Optimized Execution Plan ==
> Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
> +- SortMergeJoin(joinType=[InnerJoin], where=[(p_partkey = s_suppkey)], 
> select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, 
> s_address])
>    :- Exchange(distribution=[hash[p_partkey]])
>    :  +- TableSourceScan(table=[[default_catalog, default_database, part, 
> project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], 
> fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
>    +- Exchange(distribution=[hash[s_suppkey]])
>       +- TableSourceScan(table=[[default_catalog, default_database, supplier, 
> project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, 
> s_name, s_address])
> {code}
>  
> without hint
> {code}
> Flink SQL> explain SELECT * from v1;
> == Abstract Syntax Tree ==
> LogicalProject(p_name=[$1], p_mfgr=[$2], p_brand=[$3], p_type=[$4], 
> s_name=[$10], s_address=[$11])
> +- LogicalFilter(condition=[=($0, $9)])
>    +- LogicalJoin(condition=[true], joinType=[inner])
>       :- LogicalTableScan(table=[[default_catalog, default_database, part]])
>       +- LogicalTableScan(table=[[default_catalog, default_database, 
> supplier]])
> == Optimized Physical Plan ==
> Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
> +- HashJoin(joinType=[InnerJoin], where=[=(p_partkey, s_suppkey)], 
> select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, 
> s_address], isBroadcast=[true], build=[right])
>    :- TableSourceScan(table=[[default_catalog, default_database, part, 
> project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], 
> fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
>    +- Exchange(distribution=[broadcast])
>       +- TableSourceScan(table=[[default_catalog, default_database, supplier, 
> project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, 
> s_name, s_address])
> == Optimized Execution Plan ==
> Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
> +- MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[InnerJoin], 
> where=[(p_partkey = s_suppkey)], select=[p_partkey, p_name, p_mfgr, p_brand, 
> p_type, s_suppkey, s_name, s_address], isBroadcast=[true], build=[right])\n:- 
> [#1] TableSourceScan(table=[[default_catalog, default_database, part, 
> project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], 
> fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])\n+- [#2] 
> Exchange(distribution=[broadcast])\n])
>    :- TableSourceScan(table=[[default_catalog, default_database, part, 
> project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], 
> fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
>    +- Exchange(distribution=[broadcast])
>       +- TableSourceScan(table=[[default_catalog, default_database, supplier, 
> project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, 
> s_name, s_address])
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to