[
https://issues.apache.org/jira/browse/FLINK-22730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350033#comment-17350033
]
xmarker commented on FLINK-22730:
---------------------------------
Could you assign this jira to me ?
> Lookup join condition with CURRENT_DATE fails to filter records
> ---------------------------------------------------------------
>
> Key: FLINK-22730
> URL: https://issues.apache.org/jira/browse/FLINK-22730
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.12.0, 1.13.0, 1.14.0
> Reporter: Caizhi Weng
> Priority: Major
>
> Add the following test case to
> org.apache.flink.table.api.TableEnvironmentITCase to reproduce this bug.
> {code:scala}
> @Test
> def myTest(): Unit = {
> val id1 = TestValuesTableFactory.registerData(
> Seq(Row.of("abc", LocalDateTime.of(2000, 1, 1, 0, 0))))
> val ddl1 =
> s"""
> |CREATE TABLE Ta (
> | id VARCHAR,
> | ts TIMESTAMP,
> | proc AS PROCTIME()
> |) WITH (
> | 'connector' = 'values',
> | 'data-id' = '$id1',
> | 'bounded' = 'true'
> |)
> |""".stripMargin
> tEnv.executeSql(ddl1)
> val id2 = TestValuesTableFactory.registerData(
> Seq(Row.of("abc", LocalDateTime.of(2000, 1, 2, 0, 0))))
> val ddl2 =
> s"""
> |CREATE TABLE Tb (
> | id VARCHAR,
> | ts TIMESTAMP
> |) WITH (
> | 'connector' = 'values',
> | 'data-id' = '$id2',
> | 'bounded' = 'true'
> |)
> |""".stripMargin
> tEnv.executeSql(ddl2)
> val it = tEnv.executeSql(
> """
> |SELECT * FROM Ta AS t1
> |INNER JOIN Tb FOR SYSTEM_TIME AS OF t1.proc AS t2
> |ON t1.id = t2.id
> |WHERE CAST(coalesce(t1.ts, t2.ts) AS VARCHAR) >=
> CONCAT(CAST(CURRENT_DATE AS VARCHAR), ' 00:00:00')
> |""".stripMargin).collect()
> while (it.hasNext) {
> System.out.println(it.next())
> }
> }
> {code}
> The result is
> {code}
> +I[abc, 2000-01-01T00:00, 2021-05-20T14:30:47.735Z, abc, 2000-01-02T00:00]
> {code}
> which is obviously incorrect.
> The generated operator is as follows
> {code:java}
> public class JoinTableFuncCollector$22 extends
> org.apache.flink.table.runtime.collector.TableFunctionCollector {
> org.apache.flink.table.data.GenericRowData out = new
> org.apache.flink.table.data.GenericRowData(2);
> org.apache.flink.table.data.utils.JoinedRowData joinedRow$9 = new
> org.apache.flink.table.data.utils.JoinedRowData();
> private static final java.util.TimeZone timeZone =
> java.util.TimeZone.getTimeZone("Asia/Shanghai");
> private org.apache.flink.table.data.TimestampData timestamp;
> private org.apache.flink.table.data.TimestampData localTimestamp;
> private int date;
> private final org.apache.flink.table.data.binary.BinaryStringData str$17
> = org.apache.flink.table.data.binary.BinaryStringData.fromString(" 00:00:00");
> public JoinTableFuncCollector$22(Object[] references) throws Exception {
> }
> @Override
> public void open(org.apache.flink.configuration.Configuration parameters)
> throws Exception {
> }
> @Override
> public void collect(Object record) throws Exception {
> org.apache.flink.table.data.RowData in1 =
> (org.apache.flink.table.data.RowData) getInput();
> org.apache.flink.table.data.RowData in2 =
> (org.apache.flink.table.data.RowData) record;
> org.apache.flink.table.data.binary.BinaryStringData field$7;
> boolean isNull$7;
> org.apache.flink.table.data.TimestampData field$8;
> boolean isNull$8;
> org.apache.flink.table.data.TimestampData field$10;
> boolean isNull$10;
> boolean isNull$13;
> org.apache.flink.table.data.binary.BinaryStringData result$14;
> boolean isNull$15;
> org.apache.flink.table.data.binary.BinaryStringData result$16;
> boolean isNull$18;
> org.apache.flink.table.data.binary.BinaryStringData result$19;
> boolean isNull$20;
> boolean result$21;
> isNull$8 = in2.isNullAt(1);
> field$8 = null;
> if (!isNull$8) {
> field$8 = in2.getTimestamp(1, 6);
> }
> isNull$7 = in2.isNullAt(0);
> field$7 =
> org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
> if (!isNull$7) {
> field$7 = ((org.apache.flink.table.data.binary.BinaryStringData)
> in2.getString(0));
> }
> isNull$10 = in1.isNullAt(1);
> field$10 = null;
> if (!isNull$10) {
> field$10 = in1.getTimestamp(1, 6);
> }
> boolean result$11 = !isNull$10;
> org.apache.flink.table.data.TimestampData result$12 = null;
> boolean isNull$12;
> if (result$11) {
> isNull$12 = isNull$10;
> if (!isNull$12) {
> result$12 = field$10;
> }
> }
> else {
> isNull$12 = isNull$8;
> if (!isNull$12) {
> result$12 = field$8;
> }
> }
> isNull$13 = isNull$12;
> result$14 =
> org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
> if (!isNull$13) {
> result$14 =
> org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.flink.table.runtime.functions.SqlDateTimeUtils.timestampToString(result$12,
> 6));
> isNull$13 = (result$14 == null);
> }
> isNull$15 = false;
> result$16 =
> org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
> if (!isNull$15) {
> result$16 =
> org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.calcite.avatica.util.DateTimeUtils.unixDateToString(((int)
> date)));
> isNull$15 = (result$16 == null);
> }
> result$19 =
> org.apache.flink.table.data.binary.BinaryStringDataUtil.concat(( isNull$15 )
> ? null : (result$16), ( false ) ? null :
> (((org.apache.flink.table.data.binary.BinaryStringData) str$17)));
> isNull$18 = (result$19 == null);
> if (isNull$18) {
> result$19 =
> org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
> }
> isNull$20 = isNull$13 || isNull$18;
> result$21 = false;
> if (!isNull$20) {
> result$21 = ((result$14 == null) ? ((result$19 == null) ? 0 : -1)
> : ((result$19 == null) ? 1 : (result$14.compareTo(result$19)))) >= 0;
> }
> if (result$21) {
> if (isNull$7) {
> out.setField(0, null);
> } else {
> out.setField(0, field$7);
> }
> if (isNull$8) {
> out.setField(1, null);
> } else {
> out.setField(1, field$8);
> }
> joinedRow$9.replace(in1, out);
> joinedRow$9.setRowKind(in1.getRowKind());
> outputResult(joinedRow$9);
> }
> }
> @Override
> public void close() throws Exception {
> }
> }
> {code}
> The member variable {{date}} is not initialized before use, thus causing this
> bug.
> This is because
> {{LookupJoinCodeGenerator#generateTableFunctionCollectorForJoinTable}}
> forgets to use {{${ctx.reusePerRecordCode()}}} when generating {{collect}}
> method.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)