[ 
https://issues.apache.org/jira/browse/FLINK-36808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren updated FLINK-36808:
----------------------------------
    Description: 
Here is the SQL to reproduce the issue:
{code:java}
-- Data of table `stream`:
-- (1, Alice)
-- (2, Bob)
CREATE TEMPORARY TABLE `stream` (
    `id` BIGINT,
    `name` STRING,
    `txn_time` as proctime(),
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://localhost:5432/postgres',
  'table-name' = 'stream',
  'username' = 'postgres',
  'password' = 'postgres'
);

-- Data of table `dim`:
-- (1, OK)
-- (2, OK)
CREATE TEMPORARY TABLE `dim` (
    `id` BIGINT,
    `status` STRING,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://localhost:5432/postgres',
  'table-name' = 'dim',
  'username' = 'postgres',
  'password' = 'postgres'
);

-- Lookup join two tables twice with different filter, and union them together
SELECT
     s.id,
     s.name,
     s.txn_time,
     d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS 
`d`
ON
     `s`.`id` = `d`.`id`
WHERE
     `d`.`status` = 'OK' 
UNION ALL
SELECT
     s.id,
     s.name,
     s.txn_time,
     d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS 
`d`
ON
     `s`.`id` = `d`.`id`
WHERE
     `d`.`status` = 'NOT_EXISTS';{code}
The first lookup join should output:
{code:java}
(1, Alice 2024-11-27 11:52:19.332, OK)
(2, Bob 2024-11-27 11:52:19.332, OK) {code}
The second lookup join should output nothing, as there's not status 
'NOT_EXISTS'.

But the result after union is:
{code:java}
1, Alice, 2024-11-27 11:52:19.332, OK
2, Bob, 2024-11-27 11:52:19.332, OK
1, Alice, 2024-11-27 11:52:19.333, NOT_EXISTS
2, Bob, 2024-11-27 11:52:19.333, NOT_EXISTS {code}
There shouldn't be any 'NOT_EXISTS's. 

The SQL plan shows that, the constant conditions 'OK' and 'NOT_EXISTS' are 
appended directly by the calc after the lookup join operation, which is not as 
expected. 
{code:java}
| == Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
:  +- LogicalFilter(condition=[=($4, _UTF-16LE'OK')])
:     +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 2}])
:        :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
:        :  +- LogicalTableScan(table=[[default_catalog, default_database, 
stream]])
:        +- LogicalFilter(condition=[=($cor0.id, $0)])
:           +- LogicalSnapshot(period=[$cor0.txn_time])
:              +- LogicalTableScan(table=[[default_catalog, default_database, 
dim]])
+- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
   +- LogicalFilter(condition=[=($4, _UTF-16LE'NOT_EXISTS')])
      +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{0, 2}])
         :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
         :  +- LogicalTableScan(table=[[default_catalog, default_database, 
stream]])
         +- LogicalFilter(condition=[=($cor1.id, $0)])
            +- LogicalSnapshot(period=[$cor1.txn_time])
               +- LogicalTableScan(table=[[default_catalog, default_database, 
dim]])

== Optimized Physical Plan ==
Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
+- Union(all=[true], union=[id, name, txn_time, status])
   :- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 
status])
   :  +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
   :     +- Calc(select=[id, name, PROCTIME() AS txn_time])
   :        +- TableSourceScan(table=[[default_catalog, default_database, 
stream]], fields=[id, name])
   +- Calc(select=[id, name, txn_time, 
CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status])
      +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
         +- Calc(select=[id, name, PROCTIME() AS txn_time])
            +- TableSourceScan(table=[[default_catalog, default_database, 
stream]], fields=[id, name])

== Optimized Execution Plan ==
Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
+- Union(all=[true], union=[id, name, txn_time, status])
   :- Calc(select=[id, name, txn_time, CAST('OK' AS VARCHAR(2147483647)) AS 
status])
   :  +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, 
id])(reuse_id=[1])
   :     +- Calc(select=[id, name, PROCTIME() AS txn_time])
   :        +- TableSourceScan(table=[[default_catalog, default_database, 
stream]], fields=[id, name])
   +- Calc(select=[id, name, txn_time, CAST('NOT_EXISTS' AS 
VARCHAR(2147483647)) AS status])
      +- Reused(reference_id=[1])
 | {code}

  was:
Here is the SQL to reproduce the issue:
{code:java}
-- Data of table `stream`:
-- (1, Alice)
-- (2, Bob)
CREATE TEMPORARY TABLE `stream` (
    `id` BIGINT,
    `name` STRING,
    `txn_time` as proctime(),
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://localhost:5432/postgres',
  'table-name' = 'stream',
  'username' = 'postgres',
  'password' = 'postgres'
);

-- Data of table `dim`:
-- (1, OK)
-- (2, OK)
CREATE TEMPORARY TABLE `dim` (
    `id` BIGINT,
    `status` STRING,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://localhost:5432/postgres',
  'table-name' = 'dim',
  'username' = 'postgres',
  'password' = 'postgres'
);

-- Lookup join two tables twice with different filter, and union them together
SELECT
     s.id,
     s.name,
     s.txn_time,
     d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS 
`d`
ON
     `s`.`id` = `d`.`id`
WHERE
     `d`.`status` = 'OK' 
UNION ALL
SELECT
     s.id,
     s.name,
     s.txn_time,
     d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS 
`d`
ON
     `s`.`id` = `d`.`id`
WHERE
     `d`.`status` = 'NOT_EXISTS';{code}
The first lookup join should output:
{code:java}
(1, Alice 2024-11-27 11:52:19.332, OK)
(2, Bob 2024-11-27 11:52:19.332, OK) {code}
The second lookup join should output nothing, as there's not status 
'NOT_EXISTS'.

But the result after union is:
{code:java}
1, Alice, 2024-11-27 11:52:19.332, OK
2, Bob, 2024-11-27 11:52:19.332, OK
1, Alice, 2024-11-27 11:52:19.333, NOT_EXISTS
2, Bob, 2024-11-27 11:52:19.333, NOT_EXISTS {code}
There shouldn't be any 'NOT_EXISTS's. 

The SQL plan shows that, the constant conditions 'OK' and 'NOT_EXISTS' are 
appended directly by the calc after the lookup join operation, which is not as 
expected. 
{code:java}
| == Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
:  +- LogicalFilter(condition=[=($4, _UTF-16LE'OK')])
:     +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 2}])
:        :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
:        :  +- LogicalTableScan(table=[[default_catalog, default_database, 
stream]])
:        +- LogicalFilter(condition=[=($cor0.id, $0)])
:           +- LogicalSnapshot(period=[$cor0.txn_time])
:              +- LogicalTableScan(table=[[default_catalog, default_database, 
dim]])
+- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
   +- LogicalFilter(condition=[=($4, _UTF-16LE'NOT_EXISTS')])
      +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{0, 2}])
         :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
         :  +- LogicalTableScan(table=[[default_catalog, default_database, 
stream]])
         +- LogicalFilter(condition=[=($cor1.id, $0)])
            +- LogicalSnapshot(period=[$cor1.txn_time])
               +- LogicalTableScan(table=[[default_catalog, default_database, 
dim]])== Optimized Physical Plan ==
Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
+- Union(all=[true], union=[id, name, txn_time, status])
   :- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 
status])
   :  +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
   :     +- Calc(select=[id, name, PROCTIME() AS txn_time])
   :        +- TableSourceScan(table=[[default_catalog, default_database, 
stream]], fields=[id, name])
   +- Calc(select=[id, name, txn_time, 
CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status])
      +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
         +- Calc(select=[id, name, PROCTIME() AS txn_time])
            +- TableSourceScan(table=[[default_catalog, default_database, 
stream]], fields=[id, name])== Optimized Execution Plan ==
Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
+- Union(all=[true], union=[id, name, txn_time, status])
   :- Calc(select=[id, name, txn_time, CAST('OK' AS VARCHAR(2147483647)) AS 
status])
   :  +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, 
id])(reuse_id=[1])
   :     +- Calc(select=[id, name, PROCTIME() AS txn_time])
   :        +- TableSourceScan(table=[[default_catalog, default_database, 
stream]], fields=[id, name])
   +- Calc(select=[id, name, txn_time, CAST('NOT_EXISTS' AS 
VARCHAR(2147483647)) AS status])
      +- Reused(reference_id=[1])
 | {code}


> UNION ALL after lookup join produces unexpected results
> -------------------------------------------------------
>
>                 Key: FLINK-36808
>                 URL: https://issues.apache.org/jira/browse/FLINK-36808
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.20.0, 1.19.1
>            Reporter: Qingsheng Ren
>            Priority: Major
>
> Here is the SQL to reproduce the issue:
> {code:java}
> -- Data of table `stream`:
> -- (1, Alice)
> -- (2, Bob)
> CREATE TEMPORARY TABLE `stream` (
>     `id` BIGINT,
>     `name` STRING,
>     `txn_time` as proctime(),
>     PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:postgresql://localhost:5432/postgres',
>   'table-name' = 'stream',
>   'username' = 'postgres',
>   'password' = 'postgres'
> );
> -- Data of table `dim`:
> -- (1, OK)
> -- (2, OK)
> CREATE TEMPORARY TABLE `dim` (
>     `id` BIGINT,
>     `status` STRING,
>     PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:postgresql://localhost:5432/postgres',
>   'table-name' = 'dim',
>   'username' = 'postgres',
>   'password' = 'postgres'
> );
> -- Lookup join two tables twice with different filter, and union them together
> SELECT
>      s.id,
>      s.name,
>      s.txn_time,
>      d.status
> FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS 
> `d`
> ON
>      `s`.`id` = `d`.`id`
> WHERE
>      `d`.`status` = 'OK' 
> UNION ALL
> SELECT
>      s.id,
>      s.name,
>      s.txn_time,
>      d.status
> FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS 
> `d`
> ON
>      `s`.`id` = `d`.`id`
> WHERE
>      `d`.`status` = 'NOT_EXISTS';{code}
> The first lookup join should output:
> {code:java}
> (1, Alice 2024-11-27 11:52:19.332, OK)
> (2, Bob 2024-11-27 11:52:19.332, OK) {code}
> The second lookup join should output nothing, as there's not status 
> 'NOT_EXISTS'.
> But the result after union is:
> {code:java}
> 1, Alice, 2024-11-27 11:52:19.332, OK
> 2, Bob, 2024-11-27 11:52:19.332, OK
> 1, Alice, 2024-11-27 11:52:19.333, NOT_EXISTS
> 2, Bob, 2024-11-27 11:52:19.333, NOT_EXISTS {code}
> There shouldn't be any 'NOT_EXISTS's. 
> The SQL plan shows that, the constant conditions 'OK' and 'NOT_EXISTS' are 
> appended directly by the calc after the lookup join operation, which is not 
> as expected. 
> {code:java}
> | == Abstract Syntax Tree ==
> LogicalUnion(all=[true])
> :- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
> :  +- LogicalFilter(condition=[=($4, _UTF-16LE'OK')])
> :     +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
> requiredColumns=[{0, 2}])
> :        :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
> :        :  +- LogicalTableScan(table=[[default_catalog, default_database, 
> stream]])
> :        +- LogicalFilter(condition=[=($cor0.id, $0)])
> :           +- LogicalSnapshot(period=[$cor0.txn_time])
> :              +- LogicalTableScan(table=[[default_catalog, default_database, 
> dim]])
> +- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
>    +- LogicalFilter(condition=[=($4, _UTF-16LE'NOT_EXISTS')])
>       +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
> requiredColumns=[{0, 2}])
>          :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
>          :  +- LogicalTableScan(table=[[default_catalog, default_database, 
> stream]])
>          +- LogicalFilter(condition=[=($cor1.id, $0)])
>             +- LogicalSnapshot(period=[$cor1.txn_time])
>                +- LogicalTableScan(table=[[default_catalog, default_database, 
> dim]])
> == Optimized Physical Plan ==
> Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
> +- Union(all=[true], union=[id, name, txn_time, status])
>    :- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 
> status])
>    :  +- LookupJoin(table=[default_catalog.default_database.dim], 
> joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
>    :     +- Calc(select=[id, name, PROCTIME() AS txn_time])
>    :        +- TableSourceScan(table=[[default_catalog, default_database, 
> stream]], fields=[id, name])
>    +- Calc(select=[id, name, txn_time, 
> CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status])
>       +- LookupJoin(table=[default_catalog.default_database.dim], 
> joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
>          +- Calc(select=[id, name, PROCTIME() AS txn_time])
>             +- TableSourceScan(table=[[default_catalog, default_database, 
> stream]], fields=[id, name])
> == Optimized Execution Plan ==
> Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
> +- Union(all=[true], union=[id, name, txn_time, status])
>    :- Calc(select=[id, name, txn_time, CAST('OK' AS VARCHAR(2147483647)) AS 
> status])
>    :  +- LookupJoin(table=[default_catalog.default_database.dim], 
> joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, 
> id])(reuse_id=[1])
>    :     +- Calc(select=[id, name, PROCTIME() AS txn_time])
>    :        +- TableSourceScan(table=[[default_catalog, default_database, 
> stream]], fields=[id, name])
>    +- Calc(select=[id, name, txn_time, CAST('NOT_EXISTS' AS 
> VARCHAR(2147483647)) AS status])
>       +- Reused(reference_id=[1])
>  | {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to