[
https://issues.apache.org/jira/browse/FLINK-32321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752710#comment-17752710
]
Yunhong Zheng commented on FLINK-32321:
---------------------------------------
Hi, [~jark] . Could you assign this issue to me ? Thanks!
I think we shouldn't delete the constant lookup pk condition like 'rule_type =
0' while we push the constant condition on pk down to source. It will result in
lookup join may without a join condition after push down. Just as the below
case shown in streaming mode (Flink-1.18):
{code:java}
util.addTable("""
|CREATE TABLE MyTable (
| `a` INT,
| `b` STRING,
| `c` INT,
| PROCTIME()
|) WITH (
| 'connector' = 'values'
|)
|""".stripMargin)
util.addTable("""
|CREATE TABLE LookupTableWithFilterPushDown (
| `id` INT,
| `name` STRING,
| `age` INT,
| PRIMARY KEY(age) NOT ENFORCED
|) WITH (
| 'connector' = 'values',
| 'filterable-fields' = 'age'
|)
|""".stripMargin)
val sql =
"""
| SELECT * FROM MyTable AS T LEFT JOIN LookupTableWithFilterPushDown
| FOR SYSTEM_TIME AS OF T.proctime D
| ON D.age = 100
|""".stripMargin
util.verifyExecPlan(sql)
{code}
The physical plan will be :
{code:java}
optimize result:
Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id,
name, age])
+-
LookupJoin(table=[default_catalog.default_database.LookupTableWithFilterPushDown],
joinType=[LeftOuterJoin], lookup=[], select=[a, b, c, proctime, rowtime, id,
name, age])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, proctime, rowtime]) {code}
I think this will get wrong lookupJoin result while we don't have equal join
conditions after filter push down.
> Temporal Join job missing condition after “ON”
> ----------------------------------------------
>
> Key: FLINK-32321
> URL: https://issues.apache.org/jira/browse/FLINK-32321
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Gateway
> Affects Versions: 1.17.1
> Reporter: macdoor615
> Priority: Major
>
> We have a SQL job, like this
> {code:java}
> select ... from prod_kafka.f_alarm_tag_dev
> /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */ as f
> left join mysql_bnpmp.gem_bnpmp.f_a_alarm_filter
> /*+ OPTIONS('lookup.cache.max-rows' = '5000',
> 'lookup.cache.ttl' = '30s') */
> FOR SYSTEM_TIME AS OF f.proctime ff on ff.rule_type = 0 and f.ne_ip = ff.ip
> {code}
> We submit to flink 1.17.1 cluster with sql-gateway. We found job detail
> missing lookup condition (rule_type=0)
> {code:java}
> +- [1196]:LookupJoin(table=[mysql_bnpmp.gem_bnpmp.f_a_alarm_filter],
> joinType=[LeftOuterJoin], lookup=[ip=ne_ip], select=[event_id, {code}
> We submit same sql to flink 1.17.0 cluster with sql-gateway. There is
> (rule_type=0) lookup condition
> {code:java}
> +- [3]:LookupJoin(table=[mysql_bnpmp.gem_bnpmp.f_a_alarm_filter],
> joinType=[LeftOuterJoin], lookup=[rule_type=0, ip=ne_ip], where=[(rule_type =
> 0)], select=[event_id, severity,{code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)