[
https://issues.apache.org/jira/browse/HUDI-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan updated HUDI-8299:
--------------------------------------
Remaining Estimate: 4h
Original Estimate: 4h
> Different parquet reader config on list-typed fields is used to read parquet
> file generated by clustering
> ---------------------------------------------------------------------------------------------------------
>
> Key: HUDI-8299
> URL: https://issues.apache.org/jira/browse/HUDI-8299
> Project: Apache Hudi
> Issue Type: Sub-task
> Affects Versions: 0.14.1
> Reporter: Y Ethan Guo
> Assignee: Jonathan Vexler
> Priority: Critical
> Fix For: 1.0.2
>
> Attachments: Screenshot 2024-10-03 at 14.31.41.png
>
> Original Estimate: 4h
> Remaining Estimate: 4h
>
> We saw an issue in Hudi 0.14 that `HoodieAvroParquetReader` returns null on
> the list-typed field on reading a parquet file generated by replacecommit
> (clustering), see screenshot below. This caused NullPointerException during
> indexing with global index using HoodieMergedReadHandle
> {code:java}
> org.apache.hudi.exception.HoodieException: Unable to instantiate payload
> class
> at
> org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:101)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:154)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:122)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.avro.HoodieAvroUtils.createHoodieRecordFromAvro(HoodieAvroUtils.java:1404)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.model.HoodieAvroIndexedRecord.wrapIntoHoodieRecordPayloadWithParams(HoodieAvroIndexedRecord.java:159)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.io.HoodieMergedReadHandle.doMergedRead(HoodieMergedReadHandle.java:164)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.io.HoodieMergedReadHandle.getMergedRecords(HoodieMergedReadHandle.java:91)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.index.HoodieIndexUtils.lambda$getExistingRecords$c7e45d15$1(HoodieIndexUtils.java:246)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
> ~[scala-library-2.12.19.jar:?]
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
> ~[scala-library-2.12.19.jar:?]
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> ~[scala-library-2.12.19.jar:?]
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_412]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_412]
> at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_412]
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source)
> ~[?:?]
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> ~[?:1.8.0_412]
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> ~[?:1.8.0_412]
> at
> org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
> ~[hudi-utilities-bundle_2.12.jar]
> ... 23 more
> Caused by: java.lang.NullPointerException: null value for (non-nullable)
> List<tip_history> at triprec.tip_history
> at
> org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:88)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:30)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:84)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:189)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:181)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53)
> ~[hudi-utilities-bundle_2.12.jar]
> at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source)
> ~[?:?]
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> ~[?:1.8.0_412]
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> ~[?:1.8.0_412]
> at
> org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
> ~[hudi-utilities-bundle_2.12.jar]
> ... 23 more
> Caused by: java.lang.NullPointerException
> at
> org.apache.avro.generic.GenericDatumWriter.getArraySize(GenericDatumWriter.java:315)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:281)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:151)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:245)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> ~[avro-1.11.3.jar:1.11.3]
> at
> org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:189)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:181)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44)
> ~[hudi-utilities-bundle_2.12.jar]
> at
> org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53)
> ~[hudi-utilities-bundle_2.12.jar]
> at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source)
> ~[?:?]
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> ~[?:1.8.0_412]
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> ~[?:1.8.0_412]
> at
> org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
> ~[hudi-utilities-bundle_2.12.jar]
> ... 23 more
> {code}
> However, the parquet file itself has non-null values for the field (see
> "tip_history" field):
> {code:java}
> FROM replacecommit:
> parquet.avro.schema:
> {"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
> writer.model.name: avroSchema:
> message triprec {
> ..
> required group tip_history (LIST) = 25 {
> repeated group array = 31 {
> required double amount = 32;
> required binary currency (STRING) = 33;
> }
> }
> }Row group 0: count: 123 101.62 B records start: 4 total(compressed):
> 12.206 kB total(uncompressed):19.198 kB
> --------------------------------------------------------------------------------
> type encodings count avg size nulls
> min / max
> tip_history.array.amount DOUBLE G _ 123 8.44 B 0
> "1.5205662454434887" / "99.8588537812955"
> tip_history.array.currency BINARY G _ R 123 0.82 B 0
> "USD" / "USD" {code}
>
> Comparing the parquet metadata with the parquet files generated the
> deltacommit does not reveal any difference in schema:
> {code:java}
> FROM deltacommit:
> parquet.avro.schema:
> {"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
> writer.model.name: avro
> Schema:
> message triprec {
> ..
> required group tip_history (LIST) {
> repeated group array {
> required double amount;
> required binary currency (STRING);
> }
> }
> }
> Row group 0: count: 12 269.67 B records start: 4 total(compressed): 3.160
> kB total(uncompressed):2.990 kB
> --------------------------------------------------------------------------------
> type encodings count avg size nulls
> min / max
> tip_history.array.amount DOUBLE G _ 12 12.83 B 0
> "9.152136419558687" / "98.82597551791528"
> tip_history.array.currency BINARY G _ R 12 8.33 B 0
> "USD" / "USD" {code}
>
> When looking into the parquet reader
> ("org.apache.parquet.hadoop.InternalParquetRecordReader" used under the hood
> for reading the records), we see that whenever the list-typed field data is
> lost, the configuration somehow contains
> "parquet.avro.write-old-list-structure=false" causing AvroSchemaConverter to
> set "writeOldListStructure" to false (default value is true if the config is
> not set), which controls how the requestedSchema is generated
> (requestedSchema is used to read the data).
>
> {code:java}
> InternalParquetRecordReader:
> ReadSupport.ReadContext readContext = this.readSupport.init(new
> InitContext(conf, toSetMultiMap(fileMetadata), this.fileSchema));
> this.columnIOFactory = new
> ColumnIOFactory(parquetFileMetadata.getCreatedBy());
> this.requestedSchema = readContext.getRequestedSchema();
> this.columnCount = this.requestedSchema.getPaths().size(); {code}
>
> {code:java}
> public AvroSchemaConverter(Configuration conf) {
> this.assumeRepeatedIsListElement =
> conf.getBoolean("parquet.avro.add-list-element-records", true);
> this.writeOldListStructure =
> conf.getBoolean("parquet.avro.write-old-list-structure", true);
> this.writeParquetUUID = conf.getBoolean("parquet.avro.write-parquet-uuid",
> false);
> this.readInt96AsFixed = conf.getBoolean("parquet.avro.readInt96AsFixed",
> false);
> }
> In private org.apache.parquet.schema.Type convertField(String fieldName,
> Schema schema, org.apache.parquet.schema.Type.Repetition repetition):
> if (type.equals(Type.ARRAY)) {
> if (this.writeOldListStructure) {
> return ConversionPatterns.listType(repetition, fieldName,
> this.convertField("array", schema.getElementType(), Repetition.REPEATED));
> }
> return ConversionPatterns.listOfElements(repetition, fieldName,
> this.convertField("element", schema.getElementType()));
> } {code}
>
> When "writeOldListStructure" is set to false, the new list structure with 3
> levels is used, which is different from the file schema which uses the
> 2-level old list structure in the parquet schema.
> {code:java}
> required group tip_history (LIST) {
> repeated group list {
> required group element {
> required double amount;
> required binary currency (STRING);
> }
> }
> } {code}
>
> This means that in other places "writeOldListStructure" is set to true by
> default, writing old list structure to the parquet files. We need to make
> this consistent to avoid such data loss on read.
> Explicitly setting "parquet.avro.write-old-list-structure=false" in the
> Hadoop config for all Hudi Spark jobs can get around this issue.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)