[
https://issues.apache.org/jira/browse/HUDI-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Y Ethan Guo updated HUDI-8299:
------------------------------
Description:
We saw an issue in Hudi 0.14 that `HoodieAvroParquetReader` returns null on the
list-typed field on reading a parquet file generated by replacecommit
(clustering), see screenshot below. This caused NullPointerException during
indexing with global index using HoodieMergedReadHandle
{code:java}
org.apache.hudi.exception.HoodieException: Unable to instantiate payload class
at
org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:101)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:154)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:122)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.avro.HoodieAvroUtils.createHoodieRecordFromAvro(HoodieAvroUtils.java:1404)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.model.HoodieAvroIndexedRecord.wrapIntoHoodieRecordPayloadWithParams(HoodieAvroIndexedRecord.java:159)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.io.HoodieMergedReadHandle.doMergedRead(HoodieMergedReadHandle.java:164)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.io.HoodieMergedReadHandle.getMergedRecords(HoodieMergedReadHandle.java:91)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.index.HoodieIndexUtils.lambda$getExistingRecords$c7e45d15$1(HoodieIndexUtils.java:246)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
~[scala-library-2.12.19.jar:?]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
~[scala-library-2.12.19.jar:?]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
~[scala-library-2.12.19.jar:?]
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at org.apache.spark.scheduler.Task.run(Task.scala:131)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_412]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_412]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_412]
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source)
~[?:?]
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[?:1.8.0_412]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
~[?:1.8.0_412]
at
org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
~[hudi-utilities-bundle_2.12.jar]
... 23 more
Caused by: java.lang.NullPointerException: null value for (non-nullable)
List<tip_history> at triprec.tip_history
at
org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:88)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:30)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:84)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:189)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:181)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53)
~[hudi-utilities-bundle_2.12.jar]
at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source)
~[?:?]
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[?:1.8.0_412]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
~[?:1.8.0_412]
at
org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
~[hudi-utilities-bundle_2.12.jar]
... 23 more
Caused by: java.lang.NullPointerException
at
org.apache.avro.generic.GenericDatumWriter.getArraySize(GenericDatumWriter.java:315)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:281)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:151)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:245)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:189)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:181)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53)
~[hudi-utilities-bundle_2.12.jar]
at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source)
~[?:?]
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[?:1.8.0_412]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
~[?:1.8.0_412]
at
org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
~[hudi-utilities-bundle_2.12.jar]
... 23 more
{code}
However, the parquet file itself has non-null values for the field (see
"tip_history" field):
{code:java}
FROM replacecommit:
parquet.avro.schema:
{"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
writer.model.name: avroSchema:
message triprec {
..
required group tip_history (LIST) = 25 {
repeated group array = 31 {
required double amount = 32;
required binary currency (STRING) = 33;
}
}
}Row group 0: count: 123 101.62 B records start: 4 total(compressed):
12.206 kB total(uncompressed):19.198 kB
--------------------------------------------------------------------------------
type encodings count avg size nulls
min / max
tip_history.array.amount DOUBLE G _ 123 8.44 B 0
"1.5205662454434887" / "99.8588537812955"
tip_history.array.currency BINARY G _ R 123 0.82 B 0
"USD" / "USD" {code}
Comparing the parquet metadata with the parquet files generated the deltacommit
does not reveal any difference in schema:
{code:java}
FROM deltacommit:
parquet.avro.schema:
{"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
writer.model.name: avro
Schema:
message triprec {
..
required group tip_history (LIST) {
repeated group array {
required double amount;
required binary currency (STRING);
}
}
}
Row group 0: count: 12 269.67 B records start: 4 total(compressed): 3.160
kB total(uncompressed):2.990 kB
--------------------------------------------------------------------------------
type encodings count avg size nulls
min / max
tip_history.array.amount DOUBLE G _ 12 12.83 B 0
"9.152136419558687" / "98.82597551791528"
tip_history.array.currency BINARY G _ R 12 8.33 B 0
"USD" / "USD" {code}
When looking into the parquet reader
("org.apache.parquet.hadoop.InternalParquetRecordReader" used under the hood
for reading the records), we see that whenever the list-typed field data is
lost, the configuration somehow contains
"parquet.avro.write-old-list-structure=false" causing AvroSchemaConverter to
set "writeOldListStructure" to false (default value is true if the config is
not set), which controls how the requestedSchema is generated (requestedSchema
is used to read the data).
{code:java}
InternalParquetRecordReader:
ReadSupport.ReadContext readContext = this.readSupport.init(new
InitContext(conf, toSetMultiMap(fileMetadata), this.fileSchema));
this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
this.requestedSchema = readContext.getRequestedSchema();
this.columnCount = this.requestedSchema.getPaths().size(); {code}
{code:java}
public AvroSchemaConverter(Configuration conf) {
this.assumeRepeatedIsListElement =
conf.getBoolean("parquet.avro.add-list-element-records", true);
this.writeOldListStructure =
conf.getBoolean("parquet.avro.write-old-list-structure", true);
this.writeParquetUUID = conf.getBoolean("parquet.avro.write-parquet-uuid",
false);
this.readInt96AsFixed = conf.getBoolean("parquet.avro.readInt96AsFixed",
false);
}
In private org.apache.parquet.schema.Type convertField(String fieldName, Schema
schema, org.apache.parquet.schema.Type.Repetition repetition):
if (type.equals(Type.ARRAY)) {
if (this.writeOldListStructure) {
return ConversionPatterns.listType(repetition, fieldName,
this.convertField("array", schema.getElementType(), Repetition.REPEATED));
}
return ConversionPatterns.listOfElements(repetition, fieldName,
this.convertField("element", schema.getElementType()));
} {code}
When "writeOldListStructure" is set to false, the new list structure with 3
levels is used, which is different from the file schema which uses the 2-level
old list structure in the parquet schema.
{code:java}
required group tip_history (LIST) {
repeated group list {
required group element {
required double amount;
required binary currency (STRING);
}
}
} {code}
This means that in other places "writeOldListStructure" is set to true by
default, writing old list structure to the parquet files. We need to make this
consistent to avoid such data loss on read.
Explicitly setting "parquet.avro.write-old-list-structure=false" in the Hadoop
config for all Hudi Spark jobs can get around this issue.
was:
We saw an issue in Hudi 0.14 that `HoodieAvroParquetReader` returns null on the
list-typed field on reading a parquet file generated by replacecommit
(clustering), see screenshot below. This caused NullPointerException during
indexing with global index using HoodieMergedReadHandle
{code:java}
org.apache.hudi.exception.HoodieException: Unable to instantiate payload class
at
org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:101)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:154)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:122)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.avro.HoodieAvroUtils.createHoodieRecordFromAvro(HoodieAvroUtils.java:1404)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.model.HoodieAvroIndexedRecord.wrapIntoHoodieRecordPayloadWithParams(HoodieAvroIndexedRecord.java:159)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.io.HoodieMergedReadHandle.doMergedRead(HoodieMergedReadHandle.java:164)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.io.HoodieMergedReadHandle.getMergedRecords(HoodieMergedReadHandle.java:91)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.index.HoodieIndexUtils.lambda$getExistingRecords$c7e45d15$1(HoodieIndexUtils.java:246)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
~[scala-library-2.12.19.jar:?]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
~[scala-library-2.12.19.jar:?]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
~[scala-library-2.12.19.jar:?]
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at org.apache.spark.scheduler.Task.run(Task.scala:131)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
~[spark-core_2.12-3.2.3.jar:3.2.3]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_412]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_412]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_412]
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source)
~[?:?]
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[?:1.8.0_412]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
~[?:1.8.0_412]
at
org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
~[hudi-utilities-bundle_2.12.jar]
... 23 more
Caused by: java.lang.NullPointerException: null value for (non-nullable)
List<tip_history> at triprec.tip_history
at
org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:88)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:30)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:84)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:189)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:181)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53)
~[hudi-utilities-bundle_2.12.jar]
at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source)
~[?:?]
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[?:1.8.0_412]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
~[?:1.8.0_412]
at
org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
~[hudi-utilities-bundle_2.12.jar]
... 23 more
Caused by: java.lang.NullPointerException
at
org.apache.avro.generic.GenericDatumWriter.getArraySize(GenericDatumWriter.java:315)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:281)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:151)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:245)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
~[avro-1.11.3.jar:1.11.3]
at
org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:189)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:181)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44)
~[hudi-utilities-bundle_2.12.jar]
at
org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53)
~[hudi-utilities-bundle_2.12.jar]
at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source)
~[?:?]
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[?:1.8.0_412]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
~[?:1.8.0_412]
at
org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
~[hudi-utilities-bundle_2.12.jar]
... 23 more
{code}
However, the parquet file itself has non-null values for the field (see
"tip_history" field):
{code:java}
FROM replacecommit:
parquet.avro.schema:
{"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
writer.model.name: avroSchema:
message triprec {
..
required group tip_history (LIST) = 25 {
repeated group array = 31 {
required double amount = 32;
required binary currency (STRING) = 33;
}
}
}Row group 0: count: 123 101.62 B records start: 4 total(compressed):
12.206 kB total(uncompressed):19.198 kB
--------------------------------------------------------------------------------
type encodings count avg size nulls
min / max
tip_history.array.amount DOUBLE G _ 123 8.44 B 0
"1.5205662454434887" / "99.8588537812955"
tip_history.array.currency BINARY G _ R 123 0.82 B 0
"USD" / "USD" {code}
Comparing the parquet metadata with the parquet files generated the deltacommit
does not reveal any difference in schema:
{code:java}
FROM deltacommit:
parquet.avro.schema:
{"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
writer.model.name: avro
Schema:
message triprec {
..
required group tip_history (LIST) {
repeated group array {
required double amount;
required binary currency (STRING);
}
}
}
Row group 0: count: 12 269.67 B records start: 4 total(compressed): 3.160
kB total(uncompressed):2.990 kB
--------------------------------------------------------------------------------
type encodings count avg size nulls
min / max
tip_history.array.amount DOUBLE G _ 12 12.83 B 0
"9.152136419558687" / "98.82597551791528"
tip_history.array.currency BINARY G _ R 12 8.33 B 0
"USD" / "USD" {code}
When looking into the parquet reader
("org.apache.parquet.hadoop.InternalParquetRecordReader" used under the hood
for reading the records), we see that whenever the list-typed field data is
lost, the configuration somehow contains
"parquet.avro.write-old-list-structure=false" causing AvroSchemaConverter to
set "writeOldListStructure" to false (default value is true if the config is
not set), which controls how the requestedSchema is generated (requestedSchema
is used to read the data).
{code:java}
InternalParquetRecordReader:
ReadSupport.ReadContext readContext = this.readSupport.init(new
InitContext(conf, toSetMultiMap(fileMetadata), this.fileSchema));
this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
this.requestedSchema = readContext.getRequestedSchema();
this.columnCount = this.requestedSchema.getPaths().size(); {code}
{code:java}
public AvroSchemaConverter(Configuration conf) {
this.assumeRepeatedIsListElement =
conf.getBoolean("parquet.avro.add-list-element-records", true);
this.writeOldListStructure =
conf.getBoolean("parquet.avro.write-old-list-structure", true);
this.writeParquetUUID = conf.getBoolean("parquet.avro.write-parquet-uuid",
false);
this.readInt96AsFixed = conf.getBoolean("parquet.avro.readInt96AsFixed",
false);
}
In private org.apache.parquet.schema.Type convertField(String fieldName, Schema
schema, org.apache.parquet.schema.Type.Repetition repetition):
if (type.equals(Type.ARRAY)) {
if (this.writeOldListStructure) {
return ConversionPatterns.listType(repetition, fieldName,
this.convertField("array", schema.getElementType(), Repetition.REPEATED));
}
return ConversionPatterns.listOfElements(repetition, fieldName,
this.convertField("element", schema.getElementType()));
} {code}
When "writeOldListStructure" is set to false, the new list structure with 3
levels is used, which is different from the file schema which uses the 2-level
old list structure in the parquet schema.
{code:java}
required group tip_history (LIST) {
repeated group list {
required group element {
required double amount;
required binary currency (STRING);
}
}
} {code}
This means that in other places "writeOldListStructure" is set to true by
default, writing old list structure to the parquet files. We need to make this
consistent to avoid such data loss on read.
Explicitly setting "parquet.avro.write-old-list-structure=false" in the Hadoop
config for all Hudi Spark jobs can get around this issue.
> Different parquet reader config on list-typed fields is used to read parquet
> file generated by clustering
> ---------------------------------------------------------------------------------------------------------
>
> Key: HUDI-8299
> URL: https://issues.apache.org/jira/browse/HUDI-8299
> Project: Apache Hudi
> Issue Type: Bug
> Affects Versions: 0.14.1
> Reporter: Y Ethan Guo
> Priority: Critical
> Fix For: 1.0.0
>
> Attachments: Screenshot 2024-10-03 at 14.31.41.png
>
>
> We saw an issue in Hudi 0.14 that `HoodieAvroParquetReader` returns null on
> the list-typed field on reading a parquet file generated by replacecommit
> (clustering), see screenshot below. This caused NullPointerException during
> indexing with global index using HoodieMergedReadHandle
> {code:java}
> org.apache.hudi.exception.HoodieException: Unable to instantiate payload
> class
> at
> org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:101)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:154)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:122)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.avro.HoodieAvroUtils.createHoodieRecordFromAvro(HoodieAvroUtils.java:1404)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.model.HoodieAvroIndexedRecord.wrapIntoHoodieRecordPayloadWithParams(HoodieAvroIndexedRecord.java:159)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.io.HoodieMergedReadHandle.doMergedRead(HoodieMergedReadHandle.java:164)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.io.HoodieMergedReadHandle.getMergedRecords(HoodieMergedReadHandle.java:91)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.index.HoodieIndexUtils.lambda$getExistingRecords$c7e45d15$1(HoodieIndexUtils.java:246)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
> ~[scala-library-2.12.19.jar:?]
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
> ~[scala-library-2.12.19.jar:?]
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> ~[scala-library-2.12.19.jar:?]
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_412]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_412]
> at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_412]
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source)
> ~[?:?]
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> ~[?:1.8.0_412]
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> ~[?:1.8.0_412]
> at
> org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
> ~[hudi-utilities-bundle_2.12.jar]
> ... 23 more
> Caused by: java.lang.NullPointerException: null value for (non-nullable)
> List<tip_history> at triprec.tip_history
> at
> org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:88)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:30)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:84)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:189)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:181)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53)
> ~[hudi-utilities-bundle_2.12.jar]
> at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source)
> ~[?:?]
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> ~[?:1.8.0_412]
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> ~[?:1.8.0_412]
> at
> org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
> ~[hudi-utilities-bundle_2.12.jar]
> ... 23 more
> Caused by: java.lang.NullPointerException
> at
> org.apache.avro.generic.GenericDatumWriter.getArraySize(GenericDatumWriter.java:315)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:281)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:151)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:245)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:189)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:181)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53)
> ~[hudi-utilities-bundle_2.12.jar]
> at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source)
> ~[?:?]
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> ~[?:1.8.0_412]
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> ~[?:1.8.0_412]
> at
> org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
> ~[hudi-utilities-bundle_2.12.jar]
> ... 23 more
> {code}
> However, the parquet file itself has non-null values for the field (see
> "tip_history" field):
> {code:java}
> FROM replacecommit:
> parquet.avro.schema:
> {"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
> writer.model.name: avroSchema:
> message triprec {
> ..
> required group tip_history (LIST) = 25 {
> repeated group array = 31 {
> required double amount = 32;
> required binary currency (STRING) = 33;
> }
> }
> }Row group 0: count: 123 101.62 B records start: 4 total(compressed):
> 12.206 kB total(uncompressed):19.198 kB
> --------------------------------------------------------------------------------
> type encodings count avg size nulls
> min / max
> tip_history.array.amount DOUBLE G _ 123 8.44 B 0
> "1.5205662454434887" / "99.8588537812955"
> tip_history.array.currency BINARY G _ R 123 0.82 B 0
> "USD" / "USD" {code}
>
> Comparing the parquet metadata with the parquet files generated the
> deltacommit does not reveal any difference in schema:
> {code:java}
> FROM deltacommit:
> parquet.avro.schema:
> {"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
> writer.model.name: avro
> Schema:
> message triprec {
> ..
> required group tip_history (LIST) {
> repeated group array {
> required double amount;
> required binary currency (STRING);
> }
> }
> }
> Row group 0: count: 12 269.67 B records start: 4 total(compressed): 3.160
> kB total(uncompressed):2.990 kB
> --------------------------------------------------------------------------------
> type encodings count avg size nulls
> min / max
> tip_history.array.amount DOUBLE G _ 12 12.83 B 0
> "9.152136419558687" / "98.82597551791528"
> tip_history.array.currency BINARY G _ R 12 8.33 B 0
> "USD" / "USD" {code}
>
> When looking into the parquet reader
> ("org.apache.parquet.hadoop.InternalParquetRecordReader" used under the hood
> for reading the records), we see that whenever the list-typed field data is
> lost, the configuration somehow contains
> "parquet.avro.write-old-list-structure=false" causing AvroSchemaConverter to
> set "writeOldListStructure" to false (default value is true if the config is
> not set), which controls how the requestedSchema is generated
> (requestedSchema is used to read the data).
>
> {code:java}
> InternalParquetRecordReader:
> ReadSupport.ReadContext readContext = this.readSupport.init(new
> InitContext(conf, toSetMultiMap(fileMetadata), this.fileSchema));
> this.columnIOFactory = new
> ColumnIOFactory(parquetFileMetadata.getCreatedBy());
> this.requestedSchema = readContext.getRequestedSchema();
> this.columnCount = this.requestedSchema.getPaths().size(); {code}
>
> {code:java}
> public AvroSchemaConverter(Configuration conf) {
> this.assumeRepeatedIsListElement =
> conf.getBoolean("parquet.avro.add-list-element-records", true);
> this.writeOldListStructure =
> conf.getBoolean("parquet.avro.write-old-list-structure", true);
> this.writeParquetUUID = conf.getBoolean("parquet.avro.write-parquet-uuid",
> false);
> this.readInt96AsFixed = conf.getBoolean("parquet.avro.readInt96AsFixed",
> false);
> }
> In private org.apache.parquet.schema.Type convertField(String fieldName,
> Schema schema, org.apache.parquet.schema.Type.Repetition repetition):
> if (type.equals(Type.ARRAY)) {
> if (this.writeOldListStructure) {
> return ConversionPatterns.listType(repetition, fieldName,
> this.convertField("array", schema.getElementType(), Repetition.REPEATED));
> }
> return ConversionPatterns.listOfElements(repetition, fieldName,
> this.convertField("element", schema.getElementType()));
> } {code}
>
> When "writeOldListStructure" is set to false, the new list structure with 3
> levels is used, which is different from the file schema which uses the
> 2-level old list structure in the parquet schema.
> {code:java}
> required group tip_history (LIST) {
> repeated group list {
> required group element {
> required double amount;
> required binary currency (STRING);
> }
> }
> } {code}
>
> This means that in other places "writeOldListStructure" is set to true by
> default, writing old list structure to the parquet files. We need to make
> this consistent to avoid such data loss on read.
> Explicitly setting "parquet.avro.write-old-list-structure=false" in the
> Hadoop config for all Hudi Spark jobs can get around this issue.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)