[
https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860506#comment-17860506
]
xuyang commented on FLINK-20539:
--------------------------------
1. It looks like the original fix didn't fix it cleanly on the table api. Let
me try to explain why the latest query failed briefly:
The type of the *cast* in the first *sqlQuery* is the wrong
{*}FULLY_QUALIFIED{*}; when the calcite tree stores in Flink as a temporary
table through {*}createTemporaryView{*}, it is converted to flink's own type
{*}RowType{*}; when executing *sqlQuery* again, flink *RowType* is converted to
*PEEK_FIELDS_NO_EXPAND* when it is converted to calcite's {*}Row{*}, which is
no longer consistent with the type of the original calcite tree.
({*}Row with {color:#FF0000}*FULLY_QUALIFIED in calcite*{color} -> RowType in
flink -> Row with{*} {color:#FF0000}*PEEK_FIELDS_NO_EXPAND in
calcite*{color}{*}{*})
_Too detailed to read:_
After executing sqlQuery, the *Row* type about *CAST* in the query statement
has become the wrong FULLY_QUALIFIED. However, when executing
{*}createTemporaryView{*}, we put the calcite tree into
{*}PlannerQueryOperation{*}, and also convert the *FULLY_QUALIFIED* *ROW* into
the *LogicalType* type in flink ({_}FlinkTypeFactory#toLogicalType{_}) as the
*ResolvedSchema* in Flink, and store it in the catalog manager as a temporary
table (i.e. {*}t1{*}).
When executing *sqlQuery* again, we need to convert the ResolvedSchema of the
*t1* table into a type that can be recognized by calcite
({_}FlinkTypeFactory#createFieldTypeFromLogicalType{_}). At this time, the cast
type becomes {*}PEEK_FIELDS_NO_EXPAND{*}. The difference between the type in
the calcite tree ({*}FULLY_QUALIFIED{*}) and the type of the *t1* table after
flink conversion ({*}PEEK_FIELDS_NO_EXPAND{*}) caused this bug.
2. By the way, I tried the following query and found that there was no error,
but there was a slight problem with the plan. (Although the same type of ITCase
did not report an error)
{code:java}
@Test
def test(): Unit = {
util.addTable(s"""
|create table t1(
| a int,
| b varchar,
| c as row(a, b)
|) with (
| 'connector' = 'datagen'
|)
""".stripMargin) util.verifyExecPlan(
"SELECT a, b, cast(row(a, b) as row(a_val string, b_val string)) as col
FROM t1")
}
// actual wrong plan
Calc(select=[a, b, CAST(ROW(a, b) AS RecordType(VARCHAR(2147483647) a_val,
VARCHAR(2147483647) b_val)) AS col])
+- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a,
b])
// expected correct plan
Calc(select=[a, b, CAST(ROW(a, b) AS
RecordType:peek_no_expand(VARCHAR(2147483647) a_val, VARCHAR(2147483647)
b_val)) AS col])
+- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a,
b]){code}
Now I have determined the cause of the problem and how to fix it, and I am
adding some cases and will create a pr later. Due to the inconsistency of row
types in Calcite and Flink, I cannot enumerate all possible error cases in the
future. If there are queries with the same error in the future, anyone can link
to this jira and I will solve it then.
> Type mismatch when using ROW in computed column
> -----------------------------------------------
>
> Key: FLINK-20539
> URL: https://issues.apache.org/jira/browse/FLINK-20539
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / API
> Reporter: Timo Walther
> Assignee: xuyang
> Priority: Major
> Labels: auto-unassigned, pull-request-available
> Fix For: 1.19.0, 1.18.2
>
>
> The following SQL:
> {code}
> env.executeSql(
> "CREATE TABLE Orders (\n"
> + " order_number BIGINT,\n"
> + " price INT,\n"
> + " first_name STRING,\n"
> + " last_name STRING,\n"
> + " buyer_name AS ROW(first_name, last_name)\n"
> + ") WITH (\n"
> + " 'connector' = 'datagen'\n"
> + ")");
> env.executeSql("SELECT * FROM Orders").print();
> {code}
> Fails with:
> {code}
> Exception in thread "main" java.lang.AssertionError: Conversion to relational
> algebra failed to preserve datatypes:
> validated type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
> last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT
> NULL buyer_name) NOT NULL
> converted type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
> last_name, RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT NULL buyer_name) NOT
> NULL
> rel:
> LogicalProject(order_number=[$0], price=[$1], first_name=[$2],
> last_name=[$3], buyer_name=[ROW($2, $3)])
> LogicalTableScan(table=[[default_catalog, default_database, Orders]])
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)