Caizhi Weng created FLINK-22730:
-----------------------------------
Summary: Lookup join condition with CURRENT_DATE fails to filter
records
Key: FLINK-22730
URL: https://issues.apache.org/jira/browse/FLINK-22730
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.13.0, 1.12.0
Reporter: Caizhi Weng
Add the following test case to
org.apache.flink.table.api.TableEnvironmentITCase to reproduce this bug.
{code:scala}
@Test
def myTest(): Unit = {
val id1 = TestValuesTableFactory.registerData(
Seq(Row.of("abc", LocalDateTime.of(2000, 1, 1, 0, 0))))
val ddl1 =
s"""
|CREATE TABLE Ta (
| id VARCHAR,
| ts TIMESTAMP,
| proc AS PROCTIME()
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$id1',
| 'bounded' = 'true'
|)
|""".stripMargin
tEnv.executeSql(ddl1)
val id2 = TestValuesTableFactory.registerData(
Seq(Row.of("abc", LocalDateTime.of(2000, 1, 2, 0, 0))))
val ddl2 =
s"""
|CREATE TABLE Tb (
| id VARCHAR,
| ts TIMESTAMP
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$id2',
| 'bounded' = 'true'
|)
|""".stripMargin
tEnv.executeSql(ddl2)
val it = tEnv.executeSql(
"""
|SELECT * FROM Ta AS t1
|INNER JOIN Tb FOR SYSTEM_TIME AS OF t1.proc AS t2
|ON t1.id = t2.id
|WHERE CAST(coalesce(t1.ts, t2.ts) AS VARCHAR) >=
CONCAT(CAST(CURRENT_DATE AS VARCHAR), ' 00:00:00')
|""".stripMargin).collect()
while (it.hasNext) {
System.out.println(it.next())
}
}
{code}
The result is
{code}
+I[abc, 2000-01-01T00:00, 2021-05-20T14:30:47.735Z, abc, 2000-01-02T00:00]
{code}
which is obviously incorrect.
The generated operator is as follows
{code:java}
public class JoinTableFuncCollector$22 extends
org.apache.flink.table.runtime.collector.TableFunctionCollector {
org.apache.flink.table.data.GenericRowData out = new
org.apache.flink.table.data.GenericRowData(2);
org.apache.flink.table.data.utils.JoinedRowData joinedRow$9 = new
org.apache.flink.table.data.utils.JoinedRowData();
private static final java.util.TimeZone timeZone =
java.util.TimeZone.getTimeZone("Asia/Shanghai");
private org.apache.flink.table.data.TimestampData timestamp;
private org.apache.flink.table.data.TimestampData localTimestamp;
private int date;
private final org.apache.flink.table.data.binary.BinaryStringData str$17 =
org.apache.flink.table.data.binary.BinaryStringData.fromString(" 00:00:00");
public JoinTableFuncCollector$22(Object[] references) throws Exception {
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters)
throws Exception {
}
@Override
public void collect(Object record) throws Exception {
org.apache.flink.table.data.RowData in1 =
(org.apache.flink.table.data.RowData) getInput();
org.apache.flink.table.data.RowData in2 =
(org.apache.flink.table.data.RowData) record;
org.apache.flink.table.data.binary.BinaryStringData field$7;
boolean isNull$7;
org.apache.flink.table.data.TimestampData field$8;
boolean isNull$8;
org.apache.flink.table.data.TimestampData field$10;
boolean isNull$10;
boolean isNull$13;
org.apache.flink.table.data.binary.BinaryStringData result$14;
boolean isNull$15;
org.apache.flink.table.data.binary.BinaryStringData result$16;
boolean isNull$18;
org.apache.flink.table.data.binary.BinaryStringData result$19;
boolean isNull$20;
boolean result$21;
isNull$8 = in2.isNullAt(1);
field$8 = null;
if (!isNull$8) {
field$8 = in2.getTimestamp(1, 6);
}
isNull$7 = in2.isNullAt(0);
field$7 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$7) {
field$7 = ((org.apache.flink.table.data.binary.BinaryStringData)
in2.getString(0));
}
isNull$10 = in1.isNullAt(1);
field$10 = null;
if (!isNull$10) {
field$10 = in1.getTimestamp(1, 6);
}
boolean result$11 = !isNull$10;
org.apache.flink.table.data.TimestampData result$12 = null;
boolean isNull$12;
if (result$11) {
isNull$12 = isNull$10;
if (!isNull$12) {
result$12 = field$10;
}
}
else {
isNull$12 = isNull$8;
if (!isNull$12) {
result$12 = field$8;
}
}
isNull$13 = isNull$12;
result$14 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$13) {
result$14 =
org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.flink.table.runtime.functions.SqlDateTimeUtils.timestampToString(result$12,
6));
isNull$13 = (result$14 == null);
}
isNull$15 = false;
result$16 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$15) {
result$16 =
org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.calcite.avatica.util.DateTimeUtils.unixDateToString(((int)
date)));
isNull$15 = (result$16 == null);
}
result$19 =
org.apache.flink.table.data.binary.BinaryStringDataUtil.concat(( isNull$15 ) ?
null : (result$16), ( false ) ? null :
(((org.apache.flink.table.data.binary.BinaryStringData) str$17)));
isNull$18 = (result$19 == null);
if (isNull$18) {
result$19 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
}
isNull$20 = isNull$13 || isNull$18;
result$21 = false;
if (!isNull$20) {
result$21 = ((result$14 == null) ? ((result$19 == null) ? 0 : -1) :
((result$19 == null) ? 1 : (result$14.compareTo(result$19)))) >= 0;
}
if (result$21) {
if (isNull$7) {
out.setField(0, null);
} else {
out.setField(0, field$7);
}
if (isNull$8) {
out.setField(1, null);
} else {
out.setField(1, field$8);
}
joinedRow$9.replace(in1, out);
joinedRow$9.setRowKind(in1.getRowKind());
outputResult(joinedRow$9);
}
}
@Override
public void close() throws Exception {
}
}
{code}
The member variable {{date}} is not initialized before use, thus causing this
bug.
This is because
{{LookupJoinCodeGenerator#generateTableFunctionCollectorForJoinTable}} forgets
to use {{${ctx.reusePerRecordCode()}}} when generating {{collect}} method.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)