[
https://issues.apache.org/jira/browse/FLINK-17189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17087800#comment-17087800
]
Jark Wu commented on FLINK-17189:
---------------------------------
Hi all, first of all, I admit it is a bug, but not a bug of TimestampKind
serialization. The properties of DDL should only describe the information of
the DDL, not include the derived DataType of the function. The DataType of the
function should be inferred again by planner after deserialization from
catalog.
However, currently, we directly use the deserialized DataType of the function
which will lose the TimestampKind information. I think we should fix that in
{{DatabaseCalciteSchema}} to re-calcuate the output DataType of functions when
get the CatalogTable from CatalogManager.
> Table with processing time attribute can not be read from Hive catalog
> ----------------------------------------------------------------------
>
> Key: FLINK-17189
> URL: https://issues.apache.org/jira/browse/FLINK-17189
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Ecosystem, Table SQL / Planner
> Reporter: Timo Walther
> Priority: Major
> Fix For: 1.11.0, 1.10.2
>
>
> DDL:
> {code}
> CREATE TABLE PROD_LINEITEM (
> L_ORDERKEY INTEGER,
> L_PARTKEY INTEGER,
> L_SUPPKEY INTEGER,
> L_LINENUMBER INTEGER,
> L_QUANTITY DOUBLE,
> L_EXTENDEDPRICE DOUBLE,
> L_DISCOUNT DOUBLE,
> L_TAX DOUBLE,
> L_CURRENCY STRING,
> L_RETURNFLAG STRING,
> L_LINESTATUS STRING,
> L_ORDERTIME TIMESTAMP(3),
> L_SHIPINSTRUCT STRING,
> L_SHIPMODE STRING,
> L_COMMENT STRING,
> WATERMARK FOR L_ORDERTIME AS L_ORDERTIME - INTERVAL '5' MINUTE,
> L_PROCTIME AS PROCTIME()
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic' = 'Lineitem',
> 'connector.properties.zookeeper.connect' = 'not-needed',
> 'connector.properties.bootstrap.servers' = 'kafka:9092',
> 'connector.startup-mode' = 'earliest-offset',
> 'format.type' = 'csv',
> 'format.field-delimiter' = '|'
> );
> {code}
> Query:
> {code}
> SELECT * FROM prod_lineitem;
> {code}
> Result:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to preserve
> datatypes:
> validated type:
> RecordType(INTEGER L_ORDERKEY, INTEGER L_PARTKEY, INTEGER L_SUPPKEY, INTEGER
> L_LINENUMBER, DOUBLE L_QUANTITY, DOUBLE L_EXTENDEDPRICE, DOUBLE L_DISCOUNT,
> DOUBLE L_TAX, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_CURRENCY,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_RETURNFLAG,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_LINESTATUS, TIME
> ATTRIBUTE(ROWTIME) L_ORDERTIME, VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
> L_SHIPINSTRUCT, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPMODE,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_COMMENT, TIMESTAMP(3) NOT NULL
> L_PROCTIME) NOT NULL
> converted type:
> RecordType(INTEGER L_ORDERKEY, INTEGER L_PARTKEY, INTEGER L_SUPPKEY, INTEGER
> L_LINENUMBER, DOUBLE L_QUANTITY, DOUBLE L_EXTENDEDPRICE, DOUBLE L_DISCOUNT,
> DOUBLE L_TAX, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_CURRENCY,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_RETURNFLAG,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_LINESTATUS, TIME
> ATTRIBUTE(ROWTIME) L_ORDERTIME, VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
> L_SHIPINSTRUCT, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPMODE,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_COMMENT, TIME
> ATTRIBUTE(PROCTIME) NOT NULL L_PROCTIME) NOT NULL
> rel:
> LogicalProject(L_ORDERKEY=[$0], L_PARTKEY=[$1], L_SUPPKEY=[$2],
> L_LINENUMBER=[$3], L_QUANTITY=[$4], L_EXTENDEDPRICE=[$5], L_DISCOUNT=[$6],
> L_TAX=[$7], L_CURRENCY=[$8], L_RETURNFLAG=[$9], L_LINESTATUS=[$10],
> L_ORDERTIME=[$11], L_SHIPINSTRUCT=[$12], L_SHIPMODE=[$13], L_COMMENT=[$14],
> L_PROCTIME=[$15])
> LogicalWatermarkAssigner(rowtime=[L_ORDERTIME], watermark=[-($11,
> 300000:INTERVAL MINUTE)])
> LogicalProject(L_ORDERKEY=[$0], L_PARTKEY=[$1], L_SUPPKEY=[$2],
> L_LINENUMBER=[$3], L_QUANTITY=[$4], L_EXTENDEDPRICE=[$5], L_DISCOUNT=[$6],
> L_TAX=[$7], L_CURRENCY=[$8], L_RETURNFLAG=[$9], L_LINESTATUS=[$10],
> L_ORDERTIME=[$11], L_SHIPINSTRUCT=[$12], L_SHIPMODE=[$13], L_COMMENT=[$14],
> L_PROCTIME=[PROCTIME()])
> LogicalTableScan(table=[[hcat, default, prod_lineitem, source:
> [KafkaTableSource(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY,
> L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_CURRENCY, L_RETURNFLAG, L_LINESTATUS,
> L_ORDERTIME, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT)]]])
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)