Ahmed Elhassany created FLINK-33039:
---------------------------------------
Summary: Avro Specific Record Logical timestamp is not serialized
in Parquet
Key: FLINK-33039
URL: https://issues.apache.org/jira/browse/FLINK-33039
Project: Flink
Issue Type: Bug
Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.17.1
Reporter: Ahmed Elhassany
I'm trying to save a SpecificRecord to S3 Parquet, which contains a field with
a logical timestmap. It's defined as
{code:java}
{
"name": "ts",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
{code}
And I'm using the following method to save it
{code:java}
final FileSink<MyObj> sinkFlowAggregationAvro =
FileSink.forBulkFormat(path,
AvroParquetWriters.forSpecificRecord(MyObj.class))
.withOutputFileConfig(OutputFileConfig
.builder()
.withPartSuffix(".parquet")
.build())
.build(); {code}
However, I'm getting the following casting errors:
{noformat}
flink-taskmanager-b467cbff9-n28zp taskmanager
2023-09-05T16:10:02.124425478+02:00 Caused by: java.lang.ClassCastException:
class java.time.Instant cannot be cast to class java.lang.Number
(java.time.Instant and java.lang.Number are in module java.base of loader
'bootstrap')
flink-taskmanager-b467cbff9-n28zp taskmanager
2023-09-05T16:10:02.124425478+02:00 at
org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:340)
~[blob_p-22acff48719adf70603f57842bd158d7f5538a47-e40c3e350efab078d53261fe2bc38640:?]
flink-taskmanager-b467cbff9-wt8p9 taskmanager
2023-09-05T16:10:01.868385407+02:00 at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
~[flink-dist-1.17.1.jar:1.17.1]
flink-taskmanager-b467cbff9-n28zp taskmanager
2023-09-05T16:10:02.124425478+02:00 at
org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:288)
~[blob_p-22acff48719adf70603f57842bd158d7f5538a47-e40c3e350efab078d53261fe2bc38640:?]
flink-taskmanager-b467cbff9-wt8p9 taskmanager
2023-09-05T16:10:01.868385407+02:00 at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
~[flink-dist-1.17.1.jar:1.17.1]
flink-taskmanager-b467cbff9-wt8p9 taskmanager
2023-09-05T16:10:01.868385407+02:00 at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
~[flink-dist-1.17.1.jar:1.17.1]
flink-taskmanager-b467cbff9-wt8p9 taskmanager
2023-09-05T16:10:01.868385407+02:00 ... 21 more
flink-taskmanager-b467cbff9-m5gdt taskmanager
2023-09-05T16:10:01.979428558+02:00 at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
~[flink-dist-1.17.1.jar:1.17.1]
flink-taskmanager-b467cbff9-wt8p9 taskmanager
2023-09-05T16:10:01.868385407+02:00 Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
flink-taskmanager-b467cbff9-2xn5w taskmanager
2023-09-05T16:10:01.871644827+02:00 at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144)
~[flink-connector-base-1.17.1.jar:1.17.1]
flink-taskmanager-b467cbff9-m5gdt taskmanager
2023-09-05T16:10:01.979428558+02:00 at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
~[flink-dist-1.17.1.jar:1.17.1]
flink-taskmanager-b467cbff9-pqjqr taskmanager
2023-09-05T16:10:02.276107852+02:00 ... 21 more
flink-taskmanager-b467cbff9-m5gdt taskmanager
2023-09-05T16:10:01.979428558+02:00 at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput^Cflink-taskmanager-b467cbff9-m5gdt
taskmanager 2023-09-05T16:10:01.979428558+02:00 ... 21 more{noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)