HunterHunter created FLINK-25396:
------------------------------------

             Summary: lookupjoin source table for pre-partitioning
                 Key: FLINK-25396
                 URL: https://issues.apache.org/jira/browse/FLINK-25396
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: HunterHunter


When we perform external associations, we need to partition by key first, so 
that the same key is in a task, which can reduce the number of queries and make 
the external data cached by each task more scattered rather than full

Example:select * from sourceTable t1 LEFT JOIN lookuptable FOR SYSTEM_TIME AS 
OF t1.proctime as t2 ON t1.msg = t2.word

Execution Plan like:
{code:java}
== Optimized Execution Plan ==
Calc(select=[topic, offset, rowtime, msg, uid, PROCTIME_MATERIALIZE(proctime) 
AS proctime, word])
+- LookupJoin(table=[default_catalog.default_database.hbaselookup], 
joinType=[LeftOuterJoin], async=[false], lookup=[word=msg], select=[topic, 
offset, rowtime, msg, uid, proctime, word])
   +- Calc(select=[CAST(topic) AS topic, CAST(offset) AS offset, 
Reinterpret(rowtime) AS rowtime, msg, uid, PROCTIME() AS proctime])
      +- TableSourceScan(table=[[default_catalog, default_database, test, 
watermark=[-($0, 10000:INTERVAL SECOND)]]], fields=[rowtime, msg, uid, topic, 
offset])
{code}
After I made the optimization, I added a hint 
configuration(lookup.join.pre-partition) and added a rule to generate an 
exchange. so that I can pre-partition by the join key when obtaining external 
data synchronously

select * from test t1 LEFT JOIN hbaselookup /*+ 
OPTIONS('lookup.join.pre-partition'='true') */ FOR SYSTEM_TIME AS OF 
t1.proctime as t2 ON t1.msg = t2.word
{code:java}
== Optimized Execution Plan ==
Calc(select=[topic, offset, rowtime, msg, uid, PROCTIME_MATERIALIZE(proctime) 
AS proctime, word])
+- LookupJoin(table=[default_catalog.default_database.hbaselookup], 
joinType=[LeftOuterJoin], async=[false], lookup=[word=msg], select=[topic, 
offset, rowtime, msg, uid, proctime, word])
   +- Exchange(distribution=[hash[msg]])
      +- Calc(select=[CAST(topic) AS topic, CAST(offset) AS offset, 
Reinterpret(rowtime) AS rowtime, msg, uid, PROCTIME() AS proctime])
         +- TableSourceScan(table=[[default_catalog, default_database, test, 
watermark=[-($0, 10000:INTERVAL SECOND)]]], fields=[rowtime, msg, uid, topic, 
offset])
 {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to