[
https://issues.apache.org/jira/browse/FLINK-26349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17498100#comment-17498100
]
Jing Ge commented on FLINK-26349:
---------------------------------
Thanks for the input, I will do further research on it.
> AvroParquetReaders does not work with ReflectData
> -------------------------------------------------
>
> Key: FLINK-26349
> URL: https://issues.apache.org/jira/browse/FLINK-26349
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.15.0
> Reporter: Dawid Wysakowicz
> Assignee: Jing Ge
> Priority: Critical
> Fix For: 1.15.0
>
>
> I tried to change the {{AvroParquetFileReadITCase}} to read the data as
> {{ReflectData}} and I stumbled on a problem. The scenario is that I use exact
> same code for writing parquet files, but changed the reading part to:
> {code}
> public static final class User {
> private final String name;
> private final Integer favoriteNumber;
> private final String favoriteColor;
> public User(String name, Integer favoriteNumber, String
> favoriteColor) {
> this.name = name;
> this.favoriteNumber = favoriteNumber;
> this.favoriteColor = favoriteColor;
> }
> }
> final FileSource<User> source =
> FileSource.forRecordStreamFormat(
>
> AvroParquetReaders.forReflectRecord(User.class),
>
> Path.fromLocalFile(TEMPORARY_FOLDER.getRoot()))
> .monitorContinuously(Duration.ofMillis(5))
> .build();
> {code}
> I get an error:
> {code}
> 819020 [flink-akka.actor.default-dispatcher-9] DEBUG
> org.apache.flink.runtime.jobmaster.JobMaster [] - Archive local failure
> causing attempt cc9f5e814ea9a3a5b397018dbffcb6a9 to fail:
> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> Serialization trace:
> reserved (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
> at
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException
> at
> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> ... 30 more
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)