[
https://issues.apache.org/jira/browse/FLINK-33129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhaoyang Shao updated FLINK-33129:
----------------------------------
Priority: Critical (was: Blocker)
> Can't create RowDataToAvroConverter for LocalZonedTimestampType logical type
> ----------------------------------------------------------------------------
>
> Key: FLINK-33129
> URL: https://issues.apache.org/jira/browse/FLINK-33129
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.17.1
> Reporter: Zhaoyang Shao
> Priority: Critical
> Fix For: 1.17.1
>
> Original Estimate: 1h
> Remaining Estimate: 1h
>
> While creating converter using `RowDataToAvroConverters.createConverter` with
> LocalZonedTimestampType logical type, the method will throw exception. This
> is because the switch clause is missing a clause for
> `LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZON`.
> Code:
> [https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java#L75]
>
> We can convert the value to `LocalDateTime` and then `TimestampData` using
> method below. Then we can apply the same converter as
> TIMESTAMP_WITHOUT_TIME_ZONE?
>
> `TimestampData fromLocalDateTime(LocalDateTime dateTime)`
> Can Flink team help adding the support for this logical type and logical type
> root?
> This is now a blocker for creating Flink Iceberg consumer with Avro
> GenericRecord when IcebergTable has `TimestampTZ` type field which will be
> converted to LocalZonedTimestampType.
> See error below:
> Unsupported type: TIMESTAMP_LTZ(6)
> stack: [ [-]
>
> org.apache.flink.formats.avro.RowDataToAvroConverters.createConverter(RowDataToAvroConverters.java:186)
>
>
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>
>
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)
>
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>
>
> java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517)
>
> org.apache.flink.formats.avro.RowDataToAvroConverters.createRowConverter(RowDataToAvroConverters.java:224)
>
>
> org.apache.flink.formats.avro.RowDataToAvroConverters.createConverter(RowDataToAvroConverters.java:178)
>
>
> org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter.<init>(RowDataToAvroGenericRecordConverter.java:46)
>
>
> org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter.fromIcebergSchema(RowDataToAvroGenericRecordConverter.java:60)
>
>
> org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction.lazyConverter(AvroGenericRecordReaderFunction.java:93)
>
>
> org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction.createDataIterator(AvroGenericRecordReaderFunction.java:85)
>
>
> org.apache.iceberg.flink.source.reader.DataIteratorReaderFunction.apply(DataIteratorReaderFunction.java:39)
>
>
> org.apache.iceberg.flink.source.reader.DataIteratorReaderFunction.apply(DataIteratorReaderFunction.java:27)
>
>
> org.apache.iceberg.flink.source.reader.IcebergSourceSplitReader.fetch(IcebergSourceSplitReader.java:74)
>
>
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>
>
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
>
>
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
>
>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> java.util.concurrent.FutureTask.run(FutureTask.java:264)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>
> java.lang.Thread.run(Thread.java:829)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)