[ 
https://issues.apache.org/jira/browse/FLINK-16693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17063331#comment-17063331
 ] 

Paul Lin commented on FLINK-16693:
----------------------------------

[~twalthr] Thanks a lot for the information!

After some debugging on the table sink side, I might have found the root cause. 
In the previous version, the logical timestamp type is converted to 
`Types.SQL_TIMESTAMP`, which matches the query result types with both planners. 
But since FLINK-14645, the logical timestamp type is converted to 
`Types.SQL_TIMESTAMP`, `Types.LOCAL_DATE_TIME` or `Types.LOCAL_TIME`, depending 
on the conversion class within the logical timestamp type[1]. However, 
`DescriptorProperties` does not carry conversion class, so when the 
TableSinkFactory creates TableSchema, the default conversion class is always 
used. As for timestamp, it's `java.time.LocalDateTime`, which leads to 
`Types.LOCAL_DATE_TIME`.

1. 
https://github.com/apache/flink/blob/1be88b13cfb14445aaf72635bc51072264f0c32d/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java#L216

> Legacy planner incompatible with Timestamp backed by LocalDateTime
> ------------------------------------------------------------------
>
>                 Key: FLINK-16693
>                 URL: https://issues.apache.org/jira/browse/FLINK-16693
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Legacy Planner
>    Affects Versions: 1.10.0
>            Reporter: Paul Lin
>            Priority: Major
>
> Recently I upgraded a simple application that inserts static data into a 
> table from 1.9.0 to 1.10.0, and 
> encountered a timestamp type incompatibility problem during the table sink 
> validation.
> The SQL is like:
> ```
> insert into kafka.test.tbl_a # schema: (user_name STRING, user_id INT, 
> login_time TIMESTAMP)
> select ("ann", 1000, TIMESTAMP "2019-12-30 00:00:00")
> ```
> And the error thrown:
> ```
> Field types of query result and registered TableSink `kafka`.`test`.`tbl_a` 
> do not match.
>       Query result schema: [EXPR$0: String, EXPR$1: Integer, EXPR$2: 
> Timestamp]
>       TableSink schema:    [user_name: String, user_id: Integer, login_time: 
> LocalDateTime]
> ```
> After some digging, I found the root cause might be that since FLINK-14645 
> timestamp fields defined via TableFactory had been bridged to LocalDateTime, 
> but timestamp functions are still backed by java.sql.Timestamp.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to