[ 
https://issues.apache.org/jira/browse/FLINK-17189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17087800#comment-17087800
 ] 

Jark Wu commented on FLINK-17189:
---------------------------------

Hi all, first of all, I admit it is a bug, but not a bug of TimestampKind 
serialization. The properties of DDL should only describe the information of 
the DDL, not include the derived DataType of the function. The DataType of the 
function should be inferred again by planner after deserialization from 
catalog. 

However, currently, we directly use the deserialized DataType of the function 
which will lose the TimestampKind information. I think we should fix that in 
{{DatabaseCalciteSchema}} to re-calcuate the output DataType of functions when 
get the CatalogTable from CatalogManager. 

> Table with processing time attribute can not be read from Hive catalog
> ----------------------------------------------------------------------
>
>                 Key: FLINK-17189
>                 URL: https://issues.apache.org/jira/browse/FLINK-17189
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Ecosystem, Table SQL / Planner
>            Reporter: Timo Walther
>            Priority: Major
>             Fix For: 1.11.0, 1.10.2
>
>
> DDL:
> {code}
> CREATE TABLE PROD_LINEITEM (
>   L_ORDERKEY       INTEGER,
>   L_PARTKEY        INTEGER,
>   L_SUPPKEY        INTEGER,
>   L_LINENUMBER     INTEGER,
>   L_QUANTITY       DOUBLE,
>   L_EXTENDEDPRICE  DOUBLE,
>   L_DISCOUNT       DOUBLE,
>   L_TAX            DOUBLE,
>   L_CURRENCY       STRING,
>   L_RETURNFLAG     STRING,
>   L_LINESTATUS     STRING,
>   L_ORDERTIME      TIMESTAMP(3),
>   L_SHIPINSTRUCT   STRING,
>   L_SHIPMODE       STRING,
>   L_COMMENT        STRING,
>   WATERMARK FOR L_ORDERTIME AS L_ORDERTIME - INTERVAL '5' MINUTE,
>   L_PROCTIME       AS PROCTIME()
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'Lineitem',
>   'connector.properties.zookeeper.connect' = 'not-needed',
>   'connector.properties.bootstrap.servers' = 'kafka:9092',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'csv',
>   'format.field-delimiter' = '|'
> );
> {code}
> Query:
> {code}
> SELECT * FROM prod_lineitem;
> {code}
> Result:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to preserve 
> datatypes:
> validated type:
> RecordType(INTEGER L_ORDERKEY, INTEGER L_PARTKEY, INTEGER L_SUPPKEY, INTEGER 
> L_LINENUMBER, DOUBLE L_QUANTITY, DOUBLE L_EXTENDEDPRICE, DOUBLE L_DISCOUNT, 
> DOUBLE L_TAX, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_CURRENCY, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_RETURNFLAG, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_LINESTATUS, TIME 
> ATTRIBUTE(ROWTIME) L_ORDERTIME, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> L_SHIPINSTRUCT, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPMODE, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_COMMENT, TIMESTAMP(3) NOT NULL 
> L_PROCTIME) NOT NULL
> converted type:
> RecordType(INTEGER L_ORDERKEY, INTEGER L_PARTKEY, INTEGER L_SUPPKEY, INTEGER 
> L_LINENUMBER, DOUBLE L_QUANTITY, DOUBLE L_EXTENDEDPRICE, DOUBLE L_DISCOUNT, 
> DOUBLE L_TAX, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_CURRENCY, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_RETURNFLAG, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_LINESTATUS, TIME 
> ATTRIBUTE(ROWTIME) L_ORDERTIME, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> L_SHIPINSTRUCT, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPMODE, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_COMMENT, TIME 
> ATTRIBUTE(PROCTIME) NOT NULL L_PROCTIME) NOT NULL
> rel:
> LogicalProject(L_ORDERKEY=[$0], L_PARTKEY=[$1], L_SUPPKEY=[$2], 
> L_LINENUMBER=[$3], L_QUANTITY=[$4], L_EXTENDEDPRICE=[$5], L_DISCOUNT=[$6], 
> L_TAX=[$7], L_CURRENCY=[$8], L_RETURNFLAG=[$9], L_LINESTATUS=[$10], 
> L_ORDERTIME=[$11], L_SHIPINSTRUCT=[$12], L_SHIPMODE=[$13], L_COMMENT=[$14], 
> L_PROCTIME=[$15])
>   LogicalWatermarkAssigner(rowtime=[L_ORDERTIME], watermark=[-($11, 
> 300000:INTERVAL MINUTE)])
>     LogicalProject(L_ORDERKEY=[$0], L_PARTKEY=[$1], L_SUPPKEY=[$2], 
> L_LINENUMBER=[$3], L_QUANTITY=[$4], L_EXTENDEDPRICE=[$5], L_DISCOUNT=[$6], 
> L_TAX=[$7], L_CURRENCY=[$8], L_RETURNFLAG=[$9], L_LINESTATUS=[$10], 
> L_ORDERTIME=[$11], L_SHIPINSTRUCT=[$12], L_SHIPMODE=[$13], L_COMMENT=[$14], 
> L_PROCTIME=[PROCTIME()])
>       LogicalTableScan(table=[[hcat, default, prod_lineitem, source: 
> [KafkaTableSource(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, 
> L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_CURRENCY, L_RETURNFLAG, L_LINESTATUS, 
> L_ORDERTIME, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT)]]])
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to