[ 
https://issues.apache.org/jira/browse/FLINK-26349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17498080#comment-17498080
 ] 

Jing Ge commented on FLINK-26349:
---------------------------------

hmm, I think it is a question of how the standard AvroParquet works beyond 
Flink format. The issue you found shows that it only works when the Avro data 
model is compatible between write and read. Specifically, it is not possible to 
read parquet file with ReflectData that is created with GenericData, because 
ReflectData is a subclass of GenericData.

> AvroParquetReaders does not work with ReflectData
> -------------------------------------------------
>
>                 Key: FLINK-26349
>                 URL: https://issues.apache.org/jira/browse/FLINK-26349
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.15.0
>            Reporter: Dawid Wysakowicz
>            Assignee: Jing Ge
>            Priority: Critical
>             Fix For: 1.15.0
>
>
> I tried to change the {{AvroParquetFileReadITCase}} to read the data as 
> {{ReflectData}} and I stumbled on a problem. The scenario is that I use exact 
> same code for writing parquet files, but changed the reading part to:
> {code}
>     public static final class User {
>         private final String name;
>         private final Integer favoriteNumber;
>         private final String favoriteColor;
>         public User(String name, Integer favoriteNumber, String 
> favoriteColor) {
>             this.name = name;
>             this.favoriteNumber = favoriteNumber;
>             this.favoriteColor = favoriteColor;
>         }
>     }
>         final FileSource<User> source =
>                 FileSource.forRecordStreamFormat(
>                                 
> AvroParquetReaders.forReflectRecord(User.class),
>                                 
> Path.fromLocalFile(TEMPORARY_FOLDER.getRoot()))
>                         .monitorContinuously(Duration.ofMillis(5))
>                         .build();
> {code}
> I get an error:
> {code}
> 819020 [flink-akka.actor.default-dispatcher-9] DEBUG 
> org.apache.flink.runtime.jobmaster.JobMaster [] - Archive local failure 
> causing attempt cc9f5e814ea9a3a5b397018dbffcb6a9 to fail: 
> com.esotericsoftware.kryo.KryoException: 
> java.lang.UnsupportedOperationException
> Serialization trace:
> reserved (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>       at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>       at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>       at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>       at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>       at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
>       at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
>       at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>       at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException
>       at 
> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
>       at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
>       at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>       ... 30 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to