[ 
https://issues.apache.org/jira/browse/FLINK-30598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-30598:
---------------------------------
    Fix Version/s: 1.18.0
                       (was: 1.17.0)

> Wrong code generated for WatermarkGenerator because of inconsistent source 
> type info when deserialized from exec plan
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30598
>                 URL: https://issues.apache.org/jira/browse/FLINK-30598
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.16.0
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Major
>             Fix For: 1.18.0
>
>
> When compile from an exist exec plan which contains watermark declaration and 
> it referred the metadata column, the generated code for WatermarkGenerator 
> maybe wrong
> because currently `DynamicTableSourceSpec`.getTableSource passes the user 
> defined schema to `SourceAbilitySpec` to perform optimization like 
> projection/watermark pushdown, while optimization path from sql use a fixed 
> reorder form: "PHYSICAL COLUMNS + METADATA COLUMNS", this may cause the 
> problem.
> a repro-case:
> {code}
> @Test
>     public void testWatermarkPushDownWithMetadata() throws Exception {
>         // to verify FLINK-: the case declares metadata field first, without 
> fix it will get a
>         // wrong code generated by WatermarkGeneratorCodeGenerator which 
> reference the incorrect
>         // varchar column as the watermark field.
>         createTestValuesSourceTable(
>                 "MyTable",
>                 JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
>                 new String[] {
>                     "ts timestamp(3) metadata",
>                     "a int",
>                     "b bigint",
>                     "c varchar",
>                     "watermark for ts as ts - interval '5' second"
>                 },
>                 new HashMap<String, String>() {
>                     {
>                         put("enable-watermark-push-down", "true");
>                         put("readable-metadata", "ts:timestamp(3)");
>                     }
>                 });
>         File sinkPath =
>                 createTestCsvSinkTable(
>                         "MySink", "a int", "b bigint", "c varchar", "ts 
> timestamp(3)");
>         compileSqlAndExecutePlan("insert into MySink select a, b, c, ts from 
> MyTable where b = 3")
>                 .await();
>         assertResult(
>                 Arrays.asList(
>                         "4,3,Hello world, how are you?," + 
> toLocalDateTime(4000L),
>                         "5,3,I am fine.," + toLocalDateTime(5000L),
>                         "6,3,Luke Skywalker," + toLocalDateTime(6000L)),
>                 sinkPath);
>     }
> {code}
> the wrong code snippet(`row.getString(3)` should be a TimestampData):
> {code}
> public Long currentWatermark(org.apache.flink.table.data.RowData row) throws 
> Exception {
>           
>           org.apache.flink.table.data.binary.BinaryStringData field$19;
>           boolean isNull$19;
>           org.apache.flink.table.data.binary.BinaryStringData field$21;
>           boolean isNull$22;
>           org.apache.flink.table.data.TimestampData result$23;
>           boolean isNull$24;
>           org.apache.flink.table.data.TimestampData result$25;
>           
>           isNull$19 = row.isNullAt(3);
>           field$19 = 
> org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
>           if (!isNull$19) {
>             field$19 = ((org.apache.flink.table.data.binary.BinaryStringData) 
> row.getString(3));
>           }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to