[
https://issues.apache.org/jira/browse/FLINK-26349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503603#comment-17503603
]
Jing Ge commented on FLINK-26349:
---------------------------------
The user schema in AvroParquetRecordFormatTest is defined only for Avro
GenericRecord. In order to make it support ReflectData read, a namespace is
required, so that the program could find the class to do reflection.
I have updated the user schema and add one UT to cover this case. Thanks for
pointing out it.
------------------------ FYI ----------------------------
parquet file created by the user schema has the following meta:
{code:java}
creator: parquet-mr version 1.12.2 (build
77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra: parquet.avro.schema =
{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favoriteNumber","type":["int","null"]},{"name":"favoriteColor","type":["string","null"]}]}
extra: writer.model.name = avro
file schema: User
--------------------------------------------------------------------------------
name: REQUIRED BINARY L:STRING R:0 D:0
favoriteNumber: OPTIONAL INT32 R:0 D:1
favoriteColor: OPTIONAL BINARY L:STRING R:0 D:1
row group 1: RC:3 TS:143 OFFSET:4
--------------------------------------------------------------------------------
name: BINARY UNCOMPRESSED DO:0 FPO:4 SZ:47/47/1.00 VC:3
ENC:PLAIN,BIT_PACKED ST:[min: Jack, max: Tom, num_nulls: 0]
favoriteNumber: INT32 UNCOMPRESSED DO:0 FPO:51 SZ:41/41/1.00 VC:3
ENC:RLE,PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
favoriteColor: BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3
ENC:RLE,PLAIN,BIT_PACKED ST:[min: green, max: yellow, num_nulls: 0]
{code}
parquet file created by Datum POJO class has the following meta:
{code:java}
creator: parquet-mr version 1.12.2 (build
77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra: parquet.avro.schema =
{"type":"record","name":"Datum","namespace":"org.apache.flink.formats.parquet.avro","fields":[{"name":"a","type":"string"},{"name":"b","type":"int"}]}
extra: writer.model.name = avro
file schema: org.apache.flink.formats.parquet.avro.Datum
--------------------------------------------------------------------------------
a: REQUIRED BINARY L:STRING R:0 D:0
b: REQUIRED INT32 R:0 D:0
row group 1: RC:3 TS:73 OFFSET:4
--------------------------------------------------------------------------------
a: BINARY UNCOMPRESSED DO:0 FPO:4 SZ:38/38/1.00 VC:3
ENC:PLAIN,BIT_PACKED ST:[min: a, max: c, num_nulls: 0]
b: INT32 UNCOMPRESSED DO:0 FPO:42 SZ:35/35/1.00 VC:3
ENC:PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
{code}
> AvroParquetReaders does not work with ReflectData
> -------------------------------------------------
>
> Key: FLINK-26349
> URL: https://issues.apache.org/jira/browse/FLINK-26349
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.15.0
> Reporter: Dawid Wysakowicz
> Assignee: Jing Ge
> Priority: Critical
> Fix For: 1.15.0
>
>
> I tried to change the {{AvroParquetFileReadITCase}} to read the data as
> {{ReflectData}} and I stumbled on a problem. The scenario is that I use exact
> same code for writing parquet files, but changed the reading part to:
> {code}
> public static final class User {
> private final String name;
> private final Integer favoriteNumber;
> private final String favoriteColor;
> public User(String name, Integer favoriteNumber, String
> favoriteColor) {
> this.name = name;
> this.favoriteNumber = favoriteNumber;
> this.favoriteColor = favoriteColor;
> }
> }
> final FileSource<User> source =
> FileSource.forRecordStreamFormat(
>
> AvroParquetReaders.forReflectRecord(User.class),
>
> Path.fromLocalFile(TEMPORARY_FOLDER.getRoot()))
> .monitorContinuously(Duration.ofMillis(5))
> .build();
> {code}
> I get an error:
> {code}
> 819020 [flink-akka.actor.default-dispatcher-9] DEBUG
> org.apache.flink.runtime.jobmaster.JobMaster [] - Archive local failure
> causing attempt cc9f5e814ea9a3a5b397018dbffcb6a9 to fail:
> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> Serialization trace:
> reserved (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
> at
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException
> at
> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> ... 30 more
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)