[
https://issues.apache.org/jira/browse/FLINK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Francesco Guardiani updated FLINK-26549:
----------------------------------------
Attachment: Least_restrictive_issue.patch
> INSERT INTO with VALUES leads to wrong type inference with nested types
> -----------------------------------------------------------------------
>
> Key: FLINK-26549
> URL: https://issues.apache.org/jira/browse/FLINK-26549
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Francesco Guardiani
> Assignee: Francesco Guardiani
> Priority: Major
> Attachments: Least_restrictive_issue.patch
>
>
> While working on casting, I've found out we have an interesting bug in the
> insert values type inference. This comes from the
> {{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in
> particular
> https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).
> The test scenario is an INSERT INTO VALUES query which is also pushing some
> metadata to a Kafka table, in particular is writing the headers metadata.
> The table is declared like that:
> {code:sql}
> CREATE TABLE kafka (
> `physical_1` STRING,
> `physical_2` INT,
> `timestamp-type` STRING METADATA VIRTUAL,
> `timestamp` TIMESTAMP(3) METADATA,
> `leader-epoch` INT METADATA VIRTUAL,
> `headers` MAP<STRING, BYTES> METADATA,
> `partition` INT METADATA VIRTUAL,
> `topic` STRING METADATA VIRTUAL,
> `physical_3` BOOLEAN
> ) WITH (
> 'connector' = 'kafka',
> [...]
> )
> {code}
> The insert into query looks like:
> {code:sql}
> INSERT INTO kafka VALUES
> ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2',
> x'BABE'], TRUE),
> ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP<STRING,
> BYTES>), FALSE),
> ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2',
> X'20'], TRUE)
> {code}
> Note that in the first row, the byte literal is of length 3, while in the
> last row the byte literal is of length 1.
> The generated plan of this INSERT INTO is:
> {code}
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.kafka],
> fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4],
> headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE",
> VARBINARY(2147483647)) MAP],
> timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
> +- LogicalUnion(all=[true])
> :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1],
> EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1',
> X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
> : +- LogicalValues(tuples=[[{ 0 }]])
> :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2],
> EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)],
> EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE",
> VARBINARY(2147483647)) MAP], EXPR$4=[false])
> : +- LogicalValues(tuples=[[{ 0 }]])
> +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3],
> EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1',
> X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
> +- LogicalValues(tuples=[[{ 0 }]])
> == Optimized Physical Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1,
> physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers,
> timestamp])
> :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS
> physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3),
> _UTF-16LE'k2', X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT
> NULL, BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
> : +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
> :- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS
> physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE",
> VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
> : +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
> +- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS
> physical_3, CAST(MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2',
> X'20':BINARY(1)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE",
> VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
> +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1,
> physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers,
> timestamp])
> :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS
> physical_3, CAST(CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (CHAR(2),
> BINARY(1)) MAP) AS (VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS
> headers, CAST(2020-03-08 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))
> AS timestamp])
> : +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
> :- Calc(select=['data 2' AS physical_1, 2 AS physical_2, false AS
> physical_3, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS headers,
> CAST(2020-03-09 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS
> timestamp])
> : +- Reused(reference_id=[1])
> +- Calc(select=['data 3' AS physical_1, 3 AS physical_2, true AS
> physical_3, CAST(MAP('k1', X'10', 'k2', X'20') AS (VARCHAR(2147483647),
> VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 12:12:11.123 AS
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
> +- Reused(reference_id=[1])
> {code}
> As you see, in the _Abstract Syntax Tree_ section a casting for the headers
> is injected (although unnecessary, as it should be an identity cast), but
> then in _Optimized Physical Plan_ another casting is injected:
> {code}
> CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2',
> X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT NULL,
> BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE",
> VARBINARY(2147483647)) MAP) AS headers
> {code}
> Which makes no sense, as it's casting the values of the map first to
> {{BINARY(1)}} and then to {{BYTES}}, causing to trim the last 2 bytes.
> Removing the last row to insert makes the VALUES type inference work properly:
> {code}
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.kafka],
> fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4],
> headers=[$3], timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
> +- LogicalUnion(all=[true])
> :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1],
> EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1',
> X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
> : +- LogicalValues(tuples=[[{ 0 }]])
> +- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2],
> EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)],
> EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE",
> VARBINARY(2147483647)) MAP], EXPR$4=[false])
> +- LogicalValues(tuples=[[{ 0 }]])
> == Optimized Physical Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1,
> physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers,
> timestamp])
> :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS
> physical_3, CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2',
> X'babe':VARBINARY(3)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE",
> VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
> : +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
> +- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS
> physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE",
> VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09
> 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
> +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1,
> physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers,
> timestamp])
> :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS
> physical_3, CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (VARCHAR(2147483647),
> VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 12:12:11.123 AS
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
> : +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
> +- Calc(select=['data 2' AS physical_1, 2 AS physical_2, false AS
> physical_3, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS headers,
> CAST(2020-03-09 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS
> timestamp])
> +- Reused(reference_id=[1])
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)