[ 
https://issues.apache.org/jira/browse/FLINK-25396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17462941#comment-17462941
 ] 

Jing Zhang edited comment on FLINK-25396 at 12/21/21, 2:50 AM:
---------------------------------------------------------------

[~HunterHunter]Thanks for reporting this optimization. 
However I think this is deduplicate with 
[FLINK-23687|https://issues.apache.org/jira/browse/FLINK-23687].
I would close this JIAR, we could discuss in 
[FLINK-23687|https://issues.apache.org/jira/browse/FLINK-23687]. WDYT?


was (Author: qingru zhang):
[~HunterHunter]Thanks for reporting this optimization. 
However I think this is deduplicate with 
[FLINK-23687|https://issues.apache.org/jira/browse/FLINK-23687].


> lookupjoin source table for pre-partitioning
> --------------------------------------------
>
>                 Key: FLINK-25396
>                 URL: https://issues.apache.org/jira/browse/FLINK-25396
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: HunterHunter
>            Priority: Major
>
> When we perform external associations, we need to partition by key first, so 
> that the same key is in a task, which can reduce the number of queries and 
> make the external data cached by each task more scattered rather than full
> Example:select * from sourceTable t1 LEFT JOIN lookuptable FOR SYSTEM_TIME AS 
> OF t1.proctime as t2 ON t1.msg = t2.word
> Execution Plan like:
> {code:java}
> == Optimized Execution Plan ==
> Calc(select=[topic, offset, rowtime, msg, uid, PROCTIME_MATERIALIZE(proctime) 
> AS proctime, word])
> +- LookupJoin(table=[default_catalog.default_database.hbaselookup], 
> joinType=[LeftOuterJoin], async=[false], lookup=[word=msg], select=[topic, 
> offset, rowtime, msg, uid, proctime, word])
>    +- Calc(select=[CAST(topic) AS topic, CAST(offset) AS offset, 
> Reinterpret(rowtime) AS rowtime, msg, uid, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, test, 
> watermark=[-($0, 10000:INTERVAL SECOND)]]], fields=[rowtime, msg, uid, topic, 
> offset])
> {code}
> After I made the optimization, I added a hint 
> configuration(lookup.join.pre-partition) and added a rule to generate an 
> exchange. so that I can pre-partition by the join key when obtaining external 
> data synchronously
> select * from test t1 LEFT JOIN hbaselookup /*+ 
> OPTIONS('lookup.join.pre-partition'='true') */ FOR SYSTEM_TIME AS OF 
> t1.proctime as t2 ON t1.msg = t2.word
> {code:java}
> == Optimized Execution Plan ==
> Calc(select=[topic, offset, rowtime, msg, uid, PROCTIME_MATERIALIZE(proctime) 
> AS proctime, word])
> +- LookupJoin(table=[default_catalog.default_database.hbaselookup], 
> joinType=[LeftOuterJoin], async=[false], lookup=[word=msg], select=[topic, 
> offset, rowtime, msg, uid, proctime, word])
>    +- Exchange(distribution=[hash[msg]])
>       +- Calc(select=[CAST(topic) AS topic, CAST(offset) AS offset, 
> Reinterpret(rowtime) AS rowtime, msg, uid, PROCTIME() AS proctime])
>          +- TableSourceScan(table=[[default_catalog, default_database, test, 
> watermark=[-($0, 10000:INTERVAL SECOND)]]], fields=[rowtime, msg, uid, topic, 
> offset])
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to