[ 
https://issues.apache.org/jira/browse/FLINK-32321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752710#comment-17752710
 ] 

Yunhong Zheng commented on FLINK-32321:
---------------------------------------

Hi, [~jark] . Could you assign this issue to me ? Thanks!

I think we shouldn't delete the constant lookup pk condition like 'rule_type = 
0' while we push the constant condition on pk down to source. It will result in 
lookup join may without a join condition after push down. Just as the below 
case shown in streaming mode (Flink-1.18):
{code:java}
util.addTable("""
                |CREATE TABLE MyTable (
                |  `a` INT,
                |  `b` STRING,
                |  `c` INT,
                |   PROCTIME()
                |) WITH (
                |  'connector' = 'values'
                |)
                |""".stripMargin)

util.addTable("""
                |CREATE TABLE LookupTableWithFilterPushDown (
                |  `id` INT,
                |  `name` STRING,
                |  `age` INT,
                |  PRIMARY KEY(age) NOT ENFORCED
                |) WITH (
                |  'connector' = 'values',
                |  'filterable-fields' = 'age'
                |)
                |""".stripMargin)
val sql =
  """
    | SELECT * FROM MyTable AS T LEFT JOIN LookupTableWithFilterPushDown
    | FOR SYSTEM_TIME AS OF T.proctime D
    | ON D.age = 100
    |""".stripMargin
util.verifyExecPlan(sql)

{code}
The physical plan will be :
{code:java}
optimize result: 
Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
+- 
LookupJoin(table=[default_catalog.default_database.LookupTableWithFilterPushDown],
 joinType=[LeftOuterJoin], lookup=[], select=[a, b, c, proctime, rowtime, id, 
name, age])
   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime]) {code}
I think this will get wrong lookupJoin result while we don't have equal join 
conditions after filter push down.

> Temporal Join job missing condition after “ON”
> ----------------------------------------------
>
>                 Key: FLINK-32321
>                 URL: https://issues.apache.org/jira/browse/FLINK-32321
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Gateway
>    Affects Versions: 1.17.1
>            Reporter: macdoor615
>            Priority: Major
>
> We have a SQL job, like this
> {code:java}
> select ... from prod_kafka.f_alarm_tag_dev
>  /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */ as f 
> left join mysql_bnpmp.gem_bnpmp.f_a_alarm_filter
>  /*+ OPTIONS('lookup.cache.max-rows' = '5000',
>  'lookup.cache.ttl' = '30s') */
> FOR SYSTEM_TIME AS OF f.proctime ff  on ff.rule_type = 0 and f.ne_ip = ff.ip 
> {code}
> We submit to flink 1.17.1 cluster with sql-gateway. We found job detail 
> missing lookup condition (rule_type=0) 
> {code:java}
>   +- [1196]:LookupJoin(table=[mysql_bnpmp.gem_bnpmp.f_a_alarm_filter], 
> joinType=[LeftOuterJoin], lookup=[ip=ne_ip], select=[event_id, {code}
> We submit same sql to flink 1.17.0 cluster with sql-gateway. There is 
> (rule_type=0) lookup condition
> {code:java}
>   +- [3]:LookupJoin(table=[mysql_bnpmp.gem_bnpmp.f_a_alarm_filter], 
> joinType=[LeftOuterJoin], lookup=[rule_type=0, ip=ne_ip], where=[(rule_type = 
> 0)], select=[event_id, severity,{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to