Grzegorz Liter created FLINK-39036:
--------------------------------------
Summary: RowDataToAvroConverter ignores logical type when
converting timestamps
Key: FLINK-39036
URL: https://issues.apache.org/jira/browse/FLINK-39036
Project: Flink
Issue Type: Bug
Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 2.1.1, 2.2.0
Reporter: Grzegorz Liter
When reading RowData into Generic Avro record converter ignores the logical
type.
https://github.com/apache/flink/blob/e8ac5b12b4f7188b86cb3ecd9fc9ee5d9ccc3410/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java#L158
{{case TIMESTAMP_WITHOUT_TIME_ZONE:}}
{{ if (legacyTimestampMapping) {}}
{{ converter =}}
{{ new RowDataToAvroConverter() {}}
{{ private static final long serialVersionUID =
1L;}}{{ @Override}}
{{ public Object convert(Schema schema, Object
object) {}}
{{ return ((TimestampData)
object).toInstant().toEpochMilli();}}
{{ }}}
{{ };}}
{{ } else {}}
{{ converter =}}
{{ new RowDataToAvroConverter() {}}
{{ private static final long serialVersionUID =
1L;}}{{ @Override}}
{{ public Object convert(Schema schema, Object
object) {}}
{{ return ((TimestampData) object)}}
{{ .toLocalDateTime()}}
{{ .toInstant(ZoneOffset.UTC)}}
{{ .toEpochMilli();}}
{{ }}}
{{ };}}
{{ }}}
{{ }}
If Schema is of type:
{{{"type":"long","adjust-to-utc":true,"logicalType":"timestamp-micros"}}}
Code above puts the value in milliseconds, which if later is interpreted
according to logical schema for example with
{{GenericRecordConverter.convertToSpecific(genericRecord, avroSchema);}}
Resulting timestamp will be shifted by 1000
--
This message was sent by Atlassian Jira
(v8.20.10#820010)