[ 
https://issues.apache.org/jira/browse/FLINK-36930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lilo closed FLINK-36930.
------------------------
    Resolution: Not A Bug

Well, it seems I made a mistake here. When joining based on constant 
conditions, all filters are pushed down, so lookupkeys is empty, which seems 
reasonable.

> Lookup join with JDBC connector fails when join condition does not include 
> fields from the probe/left table
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36930
>                 URL: https://issues.apache.org/jira/browse/FLINK-36930
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.17.0, 1.18.1, 1.19.1
>         Environment:  * Flink versions affected: 1.17.0 and later versions 
> (Tested on 1.19.1)
>  * Flink version where this worked: 1.16.2
>  * Connector: JDBC (tested with Apache Derby and MySQL ; likely reproducible 
> with other JDBC drivers). Notably, this issue does not affect the Hive 
> connector.
>            Reporter: Lilo
>            Priority: Critical
>
> I've encountered a regression in Flink 1.17.0 (and later versions) related to 
> lookup joins when using the JDBC connector as the lookup source. The issue 
> arises when the join condition does not include fields from the probe/left 
> table. This scenario worked correctly in Flink 1.16.2 but now throws a 
> `TableException`.
>  
>  
> The following SQL code demonstrates the problem.
> {code:sql}
> SET 'execution.target' = 'local';
> -- Create the lookup table (using Apache Derby as an example)
> DROP TEMPORARY TABLE IF EXISTS lookup_table;
> CREATE TEMPORARY TABLE lookup_table (
>     id INT,
>     sub_id INT,
>     v STRING
> ) WITH (
>     'connector' = 'jdbc',
>     'url' = 'jdbc:derby:memory:myInMemoryDB;create=true',
>     'table-name' = 'lookup_table'
> );
> -- Create the main table (using datagen connector)
> DROP TEMPORARY TABLE IF EXISTS main_table;
> CREATE TEMPORARY TABLE main_table (
>     v STRING,
>     some_id INT,
>     some_sub_id INT,
>     proctime AS PROCTIME()
> ) WITH (
>     'connector' = 'datagen',
>     'rows-per-second' = '2',
>     'fields.some_id.kind' = 'random',
>     'fields.some_id.min' = '1',
>     'fields.some_id.max' = '5',
>     'fields.some_sub_id.kind' = 'random',
>     'fields.some_sub_id.min' = '1',
>     'fields.some_sub_id.max' = '3',
>     'fields.v.length' = '10'
> );
> -- This lookup join works correctly (join condition includes a field from 
> main_table)
> EXPLAIN PLAN FOR
> SELECT
>     t1.*,
>     t2.v AS lookup_value
> FROM main_table t1
> INNER JOIN lookup_table FOR SYSTEM_TIME AS OF t1.proctime AS t2
>     ON t2.id = 1
>     AND t2.sub_id = t1.some_sub_id;
> -- This lookup join FAILS in Flink 1.17.0+ (join condition only uses 
> constants)
> EXPLAIN PLAN FOR
> SELECT
>     t1.*,
>     t2.v AS lookup_value
> FROM main_table t1
> INNER JOIN lookup_table FOR SYSTEM_TIME AS OF t1.proctime AS t2
>     ON t2.id = 1
>     AND t2.sub_id = 1;
> -- Attempting to bypass the issue with a view also fails
> DROP VIEW IF EXISTS main_table_view;
> CREATE TEMPORARY VIEW main_table_view AS SELECT *, 1 AS fake_id FROM 
> main_table;
> EXPLAIN PLAN FOR
> SELECT
>     t1.*,
>     t2.v AS lookup_value
> FROM main_table_view t1
> INNER JOIN lookup_table FOR SYSTEM_TIME AS OF t1.proctime AS t2
>     ON t2.id = 1
>     AND t2.sub_id = t1.fake_id;
> {code}
> *Expected Behavior:*
> These queries should execute successfully, even when the join condition does 
> not involve fields from the main table. This was the behavior in Flink 1.16.2.
> *Actual Behavior:*
> In Flink 1.17.0 and later, the second query (where the join condition only 
> uses constants) throws the following exception (from Flink 1.17.2):
> {code:java}
> Caused by: org.apache.flink.table.api.TableException: Temporal table join 
> requires an equality condition on fields of table 
> [default_catalog.default_database.lookup_table].
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.validate(CommonExecLookupJoin.java:687)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.createJoinTransformation(CommonExecLookupJoin.java:249)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.java:157)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to