[ 
https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364807#comment-17364807
 ] 

JING ZHANG commented on FLINK-22955:
------------------------------------

Hi, [~gsavl], as we discussed in maillist, the root cause of exception is 
LookupJoin wants to find a method that matches two lookup keys, 'age, id' after 
optimizer push down age=10 into LookupJoin.

There is a possible improvement to avoid throwing an exception: if not found 
matched method with two lookup keys, try to match the method with one lookup 
key. Dimension tables return all records matches 'id=a', then filter output by 
'age=10' later Calc.

But in this way, we need to know whether the only argument in eval method 
represents 'age' or 'id'. The hint is missed in current API. As discussed with 
[~jark] , We think this improvement would involve refactor `LookupTableSource`.

We prefer let the user handle this case by adding a eval method that matches 
lookup keys. for example 
{code:java}
// first solution with one eval method with variable arguments length
@SerialVersionUID(1L)
class AsyncTableFunction1 extends AsyncTableFunction[RowData] {

  @varargs
  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer*): 
Unit = {
  }
}

// second solution with multiple eval method, each method with fixed arguments 
length
@SerialVersionUID(1L)
class AsyncTableFunction1 extends AsyncTableFunction[RowData] {

  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): 
Unit = {
  }

  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, 
b: Integer): Unit = {
  }
}

{code}
What do you think, [~gsavl].

 

> lookup join filter push down result to mismatch function signature
> ------------------------------------------------------------------
>
>                 Key: FLINK-22955
>                 URL: https://issues.apache.org/jira/browse/FLINK-22955
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.11.3, 1.13.1, 1.12.4
>         Environment: Flink 1.13.1
> how to reproduce: patch file attached
>            Reporter: Cooper Luan
>            Priority: Critical
>             Fix For: 1.11.4, 1.12.5, 1.13.2
>
>         Attachments: 
> 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch
>
>
> a sql like this may result to look function signature mismatch exception when 
> explain sql
> {code:sql}
> CREATE TEMPORARY VIEW v_vvv AS
> SELECT * FROM MyTable AS T
> JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
> ON T.a = D.id;
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 10;{code}
> the lookup function is
> {code:scala}
> class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
>   def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: 
> Integer): Unit = {
>   }
> }{code}
> exec plan is
> {code:java}
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
> fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 
> 10)], select=[a, b, id, name])
>    +- Calc(select=[a, b])
>       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> the "lookup=[age=10, id=a]" result to mismatch signature mismatch
>  
> but if I add 1 more insert, it works well
> {code:sql}
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 30
> {code}
> exec plan is
> {code:java}
> == Optimized Execution Plan ==
> LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
> rowtime, id, name, age, ts])(reuse_id=[1])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, 
> rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 10)])
>    +- 
> Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 30)])
>    +- Reused(reference_id=[1])
> {code}
>  the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" 
> (wrong)
>  
> so, in "multi insert" case, planner works great
> in "single insert" case, planner throw exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to