Qingsheng Ren created FLINK-36808:
-------------------------------------
Summary: UNION ALL after lookup join produces unexpected results
Key: FLINK-36808
URL: https://issues.apache.org/jira/browse/FLINK-36808
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.19.1, 1.20.0
Reporter: Qingsheng Ren
Here is the SQL to reproduce the issue:
{code:java}
-- Data of table `stream`:
-- (1, Alice)
-- (2, Bob)
CREATE TEMPORARY TABLE `stream` (
`id` BIGINT,
`name` STRING,
`txn_time` as proctime(),
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/postgres',
'table-name' = 'stream',
'username' = 'postgres',
'password' = 'postgres'
);
-- Data of table `dim`:
-- (1, OK)
-- (2, OK)
CREATE TEMPORARY TABLE `dim` (
`id` BIGINT,
`status` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/postgres',
'table-name' = 'dim',
'username' = 'postgres',
'password' = 'postgres'
);
-- Lookup join two tables twice with different filter, and union them together
SELECT
s.id,
s.name,
s.txn_time,
d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS
`d`
ON
`s`.`id` = `d`.`id`
WHERE
`d`.`status` = 'OK'
UNION ALL
SELECT
s.id,
s.name,
s.txn_time,
d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS
`d`
ON
`s`.`id` = `d`.`id`
WHERE
`d`.`status` = 'NOT_EXISTS';{code}
The first lookup join should output:
{code:java}
(1, Alice 2024-11-27 11:52:19.332, OK)
(2, Bob 2024-11-27 11:52:19.332, OK) {code}
The second lookup join should output nothing, as there's not status
'NOT_EXISTS'.
But the result after union is:
{code:java}
1, Alice, 2024-11-27 11:52:19.332, OK
2, Bob, 2024-11-27 11:52:19.332, OK
1, Alice, 2024-11-27 11:52:19.333, NOT_EXISTS
2, Bob, 2024-11-27 11:52:19.333, NOT_EXISTS {code}
There shouldn't be any 'NOT_EXISTS's.
The SQL plan shows that, the constant conditions 'OK' and 'NOT_EXISTS' are
appended directly by the calc after the lookup join operation, which is not as
expected.
{code:java}
| == Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
: +- LogicalFilter(condition=[=($4, _UTF-16LE'OK')])
: +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 2}])
: :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
: : +- LogicalTableScan(table=[[default_catalog, default_database,
stream]])
: +- LogicalFilter(condition=[=($cor0.id, $0)])
: +- LogicalSnapshot(period=[$cor0.txn_time])
: +- LogicalTableScan(table=[[default_catalog, default_database,
dim]])
+- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
+- LogicalFilter(condition=[=($4, _UTF-16LE'NOT_EXISTS')])
+- LogicalCorrelate(correlation=[$cor1], joinType=[inner],
requiredColumns=[{0, 2}])
:- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
: +- LogicalTableScan(table=[[default_catalog, default_database,
stream]])
+- LogicalFilter(condition=[=($cor1.id, $0)])
+- LogicalSnapshot(period=[$cor1.txn_time])
+- LogicalTableScan(table=[[default_catalog, default_database,
dim]])== Optimized Physical Plan ==
Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
+- Union(all=[true], union=[id, name, txn_time, status])
:- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS
status])
: +- LookupJoin(table=[default_catalog.default_database.dim],
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
: +- Calc(select=[id, name, PROCTIME() AS txn_time])
: +- TableSourceScan(table=[[default_catalog, default_database,
stream]], fields=[id, name])
+- Calc(select=[id, name, txn_time,
CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status])
+- LookupJoin(table=[default_catalog.default_database.dim],
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
+- Calc(select=[id, name, PROCTIME() AS txn_time])
+- TableSourceScan(table=[[default_catalog, default_database,
stream]], fields=[id, name])== Optimized Execution Plan ==
Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
+- Union(all=[true], union=[id, name, txn_time, status])
:- Calc(select=[id, name, txn_time, CAST('OK' AS VARCHAR(2147483647)) AS
status])
: +- LookupJoin(table=[default_catalog.default_database.dim],
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time,
id])(reuse_id=[1])
: +- Calc(select=[id, name, PROCTIME() AS txn_time])
: +- TableSourceScan(table=[[default_catalog, default_database,
stream]], fields=[id, name])
+- Calc(select=[id, name, txn_time, CAST('NOT_EXISTS' AS
VARCHAR(2147483647)) AS status])
+- Reused(reference_id=[1])
| {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)