[
https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-22955:
-----------------------------------
Labels: stale-critical (was: )
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Critical but is unassigned and neither itself nor its Sub-Tasks have been
updated for 7 days. I have gone ahead and marked it "stale-critical". If this
ticket is critical, please either assign yourself or give an update.
Afterwards, please remove the label or in 7 days the issue will be
deprioritized.
> lookup join filter push down result to mismatch function signature
> ------------------------------------------------------------------
>
> Key: FLINK-22955
> URL: https://issues.apache.org/jira/browse/FLINK-22955
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.11.3, 1.13.1, 1.12.4
> Environment: Flink 1.13.1
> how to reproduce: patch file attached
> Reporter: Cooper Luan
> Priority: Critical
> Labels: stale-critical
> Fix For: 1.11.4, 1.12.5, 1.13.2
>
> Attachments:
> 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch
>
>
> a sql like this may result to look function signature mismatch exception when
> explain sql
> {code:sql}
> CREATE TEMPORARY VIEW v_vvv AS
> SELECT * FROM MyTable AS T
> JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
> ON T.a = D.id;
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 10;{code}
> the lookup function is
> {code:scala}
> class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
> def eval(resultFuture: CompletableFuture[JCollection[RowData]], a:
> Integer): Unit = {
> }
> }{code}
> exec plan is
> {code:java}
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
> fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1],
> joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age =
> 10)], select=[a, b, id, name])
> +- Calc(select=[a, b])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
> fields=[a, b, c, proctime, rowtime])
> {code}
> the "lookup=[age=10, id=a]" result to mismatch signature mismatch
>
> but if I add 1 more insert, it works well
> {code:sql}
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 30
> {code}
> exec plan is
> {code:java}
> == Optimized Execution Plan ==
> LookupJoin(table=[default_catalog.default_database.LookupTableAsync1],
> joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime,
> rowtime, id, name, age, ts])(reuse_id=[1])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
> fields=[a, b, c, proctime,
> rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
> fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 10)])
> +-
> Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
> fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 30)])
> +- Reused(reference_id=[1])
> {code}
> the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]"
> (wrong)
>
> so, in "multi insert" case, planner works great
> in "single insert" case, planner throw exception
--
This message was sent by Atlassian Jira
(v8.3.4#803005)