[
https://issues.apache.org/jira/browse/FLINK-36930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lilo closed FLINK-36930.
------------------------
Resolution: Not A Bug
Well, it seems I made a mistake here. When joining based on constant
conditions, all filters are pushed down, so lookupkeys is empty, which seems
reasonable.
> Lookup join with JDBC connector fails when join condition does not include
> fields from the probe/left table
> -----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-36930
> URL: https://issues.apache.org/jira/browse/FLINK-36930
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.17.0, 1.18.1, 1.19.1
> Environment: * Flink versions affected: 1.17.0 and later versions
> (Tested on 1.19.1)
> * Flink version where this worked: 1.16.2
> * Connector: JDBC (tested with Apache Derby and MySQL ; likely reproducible
> with other JDBC drivers). Notably, this issue does not affect the Hive
> connector.
> Reporter: Lilo
> Priority: Critical
>
> I've encountered a regression in Flink 1.17.0 (and later versions) related to
> lookup joins when using the JDBC connector as the lookup source. The issue
> arises when the join condition does not include fields from the probe/left
> table. This scenario worked correctly in Flink 1.16.2 but now throws a
> `TableException`.
>
>
> The following SQL code demonstrates the problem.
> {code:sql}
> SET 'execution.target' = 'local';
> -- Create the lookup table (using Apache Derby as an example)
> DROP TEMPORARY TABLE IF EXISTS lookup_table;
> CREATE TEMPORARY TABLE lookup_table (
> id INT,
> sub_id INT,
> v STRING
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:derby:memory:myInMemoryDB;create=true',
> 'table-name' = 'lookup_table'
> );
> -- Create the main table (using datagen connector)
> DROP TEMPORARY TABLE IF EXISTS main_table;
> CREATE TEMPORARY TABLE main_table (
> v STRING,
> some_id INT,
> some_sub_id INT,
> proctime AS PROCTIME()
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '2',
> 'fields.some_id.kind' = 'random',
> 'fields.some_id.min' = '1',
> 'fields.some_id.max' = '5',
> 'fields.some_sub_id.kind' = 'random',
> 'fields.some_sub_id.min' = '1',
> 'fields.some_sub_id.max' = '3',
> 'fields.v.length' = '10'
> );
> -- This lookup join works correctly (join condition includes a field from
> main_table)
> EXPLAIN PLAN FOR
> SELECT
> t1.*,
> t2.v AS lookup_value
> FROM main_table t1
> INNER JOIN lookup_table FOR SYSTEM_TIME AS OF t1.proctime AS t2
> ON t2.id = 1
> AND t2.sub_id = t1.some_sub_id;
> -- This lookup join FAILS in Flink 1.17.0+ (join condition only uses
> constants)
> EXPLAIN PLAN FOR
> SELECT
> t1.*,
> t2.v AS lookup_value
> FROM main_table t1
> INNER JOIN lookup_table FOR SYSTEM_TIME AS OF t1.proctime AS t2
> ON t2.id = 1
> AND t2.sub_id = 1;
> -- Attempting to bypass the issue with a view also fails
> DROP VIEW IF EXISTS main_table_view;
> CREATE TEMPORARY VIEW main_table_view AS SELECT *, 1 AS fake_id FROM
> main_table;
> EXPLAIN PLAN FOR
> SELECT
> t1.*,
> t2.v AS lookup_value
> FROM main_table_view t1
> INNER JOIN lookup_table FOR SYSTEM_TIME AS OF t1.proctime AS t2
> ON t2.id = 1
> AND t2.sub_id = t1.fake_id;
> {code}
> *Expected Behavior:*
> These queries should execute successfully, even when the join condition does
> not involve fields from the main table. This was the behavior in Flink 1.16.2.
> *Actual Behavior:*
> In Flink 1.17.0 and later, the second query (where the join condition only
> uses constants) throws the following exception (from Flink 1.17.2):
> {code:java}
> Caused by: org.apache.flink.table.api.TableException: Temporal table join
> requires an equality condition on fields of table
> [default_catalog.default_database.lookup_table].
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.validate(CommonExecLookupJoin.java:687)
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.createJoinTransformation(CommonExecLookupJoin.java:249)
> at
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.java:157)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257)
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)