Caizhi Weng created FLINK-25357:
-----------------------------------
Summary: SQL planner incorrectly changes a streaming join with
FLOOR(rowtime) into interval join
Key: FLINK-25357
URL: https://issues.apache.org/jira/browse/FLINK-25357
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.14.2
Reporter: Caizhi Weng
This issue is reported from the [user mailing
list|https://lists.apache.org/thread/v8omhomp58hb8m5dj4noxbr1dsyy6zjl].
Add the following test case to {{TableEnvironmentITCase}} to reproduce this
issue.
{code:scala}
@Test
def myTest(): Unit = {
val data = Seq(
Row.of(
"1",
java.time.LocalDateTime.of(2021, 12, 13, 12, 5, 8)
),
Row.of(
"1",
java.time.LocalDateTime.of(2021, 12, 13, 13, 5, 4)
),
Row.of(
"1",
java.time.LocalDateTime.of(2021, 12, 13, 14, 5, 6)
)
)
tEnv.executeSql(
s"""
|create table T (
| id STRING,
| b TIMESTAMP(3),
| WATERMARK FOR b AS b - INTERVAL '60' MINUTES
|) WITH (
| 'connector' = 'values',
| 'bounded' = 'true',
| 'data-id' = '${TestValuesTableFactory.registerData(data)}'
|)
|""".stripMargin)
tEnv.executeSql(
"""
|SELECT
| source.id AS sourceid,
| CAST(source.b AS TIMESTAMP) AS source_startat,
| CAST(target.b AS TIMESTAMP) AS target_startat
|FROM T source, T target
|WHERE source.id = target.id
|AND source.id IN ('1', '2', '3')
|AND source.b >= FLOOR(target.b TO HOUR) + INTERVAL '1' HOUR AND source.b
< FLOOR(target.b TO HOUR) + INTERVAL '2' HOUR
|""".stripMargin).print()
}
{code}
Results (correct) for the batch task is
{code}
+--------------------------------+----------------------------+----------------------------+
| sourceid | source_startat |
target_startat |
+--------------------------------+----------------------------+----------------------------+
| 1 | 2021-12-13 13:05:04.000000 | 2021-12-13
12:05:08.000000 |
| 1 | 2021-12-13 14:05:06.000000 | 2021-12-13
13:05:04.000000 |
+--------------------------------+----------------------------+----------------------------+
{code}
Results (incorrect) for the streaming task is
{code}
+----+--------------------------------+----------------------------+----------------------------+
| op | sourceid | source_startat |
target_startat |
+----+--------------------------------+----------------------------+----------------------------+
| +I | 1 | 2021-12-13 14:05:06.000000 | 2021-12-13
12:05:08.000000 |
| +I | 1 | 2021-12-13 14:05:06.000000 | 2021-12-13
13:05:04.000000 |
+----+--------------------------------+----------------------------+----------------------------+
{code}
Plan for the streaming task is
{code}
LogicalProject(sourceid=[$0], source_startat=[CAST($1):TIMESTAMP(6)],
target_startat=[CAST($3):TIMESTAMP(6)])
+- LogicalFilter(condition=[AND(=($0, $2), OR(=($0, _UTF-16LE'1'), =($0,
_UTF-16LE'2'), =($0, _UTF-16LE'3')), >=($1, +(FLOOR($3, FLAG(HOUR)),
3600000:INTERVAL HOUR)), <($1, +(FLOOR($3, FLAG(HOUR)), 7200000:INTERVAL
HOUR)))])
+- LogicalJoin(condition=[true], joinType=[inner])
:- LogicalWatermarkAssigner(rowtime=[b], watermark=[-($1,
3600000:INTERVAL MINUTE)])
: +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+- LogicalWatermarkAssigner(rowtime=[b], watermark=[-($1,
3600000:INTERVAL MINUTE)])
+- LogicalTableScan(table=[[default_catalog, default_database, T]])
== Optimized Physical Plan ==
Calc(select=[id AS sourceid, CAST(CAST(b)) AS source_startat, CAST(CAST(b0)) AS
target_startat])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=3600000, leftUpperBound=7199999, leftTimeIndex=1,
rightTimeIndex=1], where=[AND(=(id, id0), >=(b, +(FLOOR(b0, FLAG(HOUR)),
3600000:INTERVAL HOUR)), <(b, +(FLOOR(b0, FLAG(HOUR)), 7200000:INTERVAL
HOUR)))], select=[id, b, id0, b0])
:- Exchange(distribution=[hash[id]])
: +- Calc(select=[id, b], where=[SEARCH(id, Sarg[_UTF-16LE'1',
_UTF-16LE'2', _UTF-16LE'3']:CHAR(1) CHARACTER SET "UTF-16LE")])
: +- WatermarkAssigner(rowtime=[b], watermark=[-(b, 3600000:INTERVAL
MINUTE)])
: +- TableSourceScan(table=[[default_catalog, default_database, T]],
fields=[id, b])
+- Exchange(distribution=[hash[id]])
+- WatermarkAssigner(rowtime=[b], watermark=[-(b, 3600000:INTERVAL
MINUTE)])
+- TableSourceScan(table=[[default_catalog, default_database, T]],
fields=[id, b])
== Optimized Execution Plan ==
Calc(select=[id AS sourceid, CAST(CAST(b)) AS source_startat, CAST(CAST(b0)) AS
target_startat])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=3600000, leftUpperBound=7199999, leftTimeIndex=1,
rightTimeIndex=1], where=[((id = id0) AND (b >= (FLOOR(b0, FLAG(HOUR)) +
3600000:INTERVAL HOUR)) AND (b < (FLOOR(b0, FLAG(HOUR)) + 7200000:INTERVAL
HOUR)))], select=[id, b, id0, b0])
:- Exchange(distribution=[hash[id]])
: +- Calc(select=[id, b], where=[SEARCH(id, Sarg[_UTF-16LE'1',
_UTF-16LE'2', _UTF-16LE'3']:CHAR(1) CHARACTER SET "UTF-16LE")])
: +- WatermarkAssigner(rowtime=[b], watermark=[(b - 3600000:INTERVAL
MINUTE)])(reuse_id=[1])
: +- TableSourceScan(table=[[default_catalog, default_database, T]],
fields=[id, b])
+- Exchange(distribution=[hash[id]])
+- Reused(reference_id=[1])
{code}
You can see that the planner incorrectly changes this join to an interval join.
The generated condition for the interval join is also incorrect, which causes
the 1st line of the streaming result to be produced.
{code:java}
public class ConditionFunction$171
extends org.apache.flink.api.common.functions.AbstractRichFunction
implements org.apache.flink.table.runtime.generated.JoinCondition {
public ConditionFunction$171(Object[] references) throws Exception {}
@Override
public void open(org.apache.flink.configuration.Configuration parameters)
throws Exception {}
@Override
public boolean apply(
org.apache.flink.table.data.RowData in1,
org.apache.flink.table.data.RowData in2)
throws Exception {
return true;
}
@Override
public void close() throws Exception {
super.close();
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)