[ 
https://issues.apache.org/jira/browse/HUDI-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Y Ethan Guo updated HUDI-8299:
------------------------------
    Description: 
We saw an issue in Hudi 0.14 that `HoodieAvroParquetReader` returns null on the 
list-typed field on reading a parquet file generated by replacecommit 
(clustering), see screenshot below.  This caused NullPointerException during 
indexing with global index using HoodieMergedReadHandle
{code:java}
org.apache.hudi.exception.HoodieException: Unable to instantiate payload class 
    at 
org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:101)
 ~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:154)
 ~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:122)
 ~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.hudi.avro.HoodieAvroUtils.createHoodieRecordFromAvro(HoodieAvroUtils.java:1404)
 ~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.hudi.common.model.HoodieAvroIndexedRecord.wrapIntoHoodieRecordPayloadWithParams(HoodieAvroIndexedRecord.java:159)
 ~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.hudi.io.HoodieMergedReadHandle.doMergedRead(HoodieMergedReadHandle.java:164)
 ~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.hudi.io.HoodieMergedReadHandle.getMergedRecords(HoodieMergedReadHandle.java:91)
 ~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.hudi.index.HoodieIndexUtils.lambda$getExistingRecords$c7e45d15$1(HoodieIndexUtils.java:246)
 ~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137)
 ~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125) 
~[spark-core_2.12-3.2.3.jar:3.2.3]
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) 
~[scala-library-2.12.19.jar:?]
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) 
~[scala-library-2.12.19.jar:?]
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
~[scala-library-2.12.19.jar:?]
    at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
 ~[spark-core_2.12-3.2.3.jar:3.2.3]
    at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
 ~[spark-core_2.12-3.2.3.jar:3.2.3]
    at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 
~[spark-core_2.12-3.2.3.jar:3.2.3]
    at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) 
~[spark-core_2.12-3.2.3.jar:3.2.3]
    at org.apache.spark.scheduler.Task.run(Task.scala:131) 
~[spark-core_2.12-3.2.3.jar:3.2.3]
    at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
 ~[spark-core_2.12-3.2.3.jar:3.2.3]
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) 
~[spark-core_2.12-3.2.3.jar:3.2.3]
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) 
~[spark-core_2.12-3.2.3.jar:3.2.3]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_412]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_412]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_412]
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source) 
~[?:?]
    at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 ~[?:1.8.0_412]
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
~[?:1.8.0_412]
    at 
org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
 ~[hudi-utilities-bundle_2.12.jar]
    ... 23 more
Caused by: java.lang.NullPointerException: null value for (non-nullable) 
List<tip_history> at triprec.tip_history
    at 
org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:88)
 ~[avro-1.11.3.jar:1.11.3]
    at 
org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:30)
 ~[avro-1.11.3.jar:1.11.3]
    at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:84) 
~[avro-1.11.3.jar:1.11.3]
    at 
org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:189)
 ~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:181) 
~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53) 
~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44)
 ~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53)
 ~[hudi-utilities-bundle_2.12.jar]
    at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source) 
~[?:?]
    at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 ~[?:1.8.0_412]
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
~[?:1.8.0_412]
    at 
org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
 ~[hudi-utilities-bundle_2.12.jar]
    ... 23 more
Caused by: java.lang.NullPointerException
    at 
org.apache.avro.generic.GenericDatumWriter.getArraySize(GenericDatumWriter.java:315)
 ~[avro-1.11.3.jar:1.11.3]
    at 
org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:281)
 ~[avro-1.11.3.jar:1.11.3]
    at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:151)
 ~[avro-1.11.3.jar:1.11.3]
    at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) 
~[avro-1.11.3.jar:1.11.3]
    at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:245)
 ~[avro-1.11.3.jar:1.11.3]
    at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234)
 ~[avro-1.11.3.jar:1.11.3]
    at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145)
 ~[avro-1.11.3.jar:1.11.3]
    at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) 
~[avro-1.11.3.jar:1.11.3]
    at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) 
~[avro-1.11.3.jar:1.11.3]
    at 
org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:189)
 ~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:181) 
~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53) 
~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44)
 ~[hudi-utilities-bundle_2.12.jar]
    at 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53)
 ~[hudi-utilities-bundle_2.12.jar]
    at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source) 
~[?:?]
    at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 ~[?:1.8.0_412]
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
~[?:1.8.0_412]
    at 
org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
 ~[hudi-utilities-bundle_2.12.jar]
    ... 23 more
 {code}
However, the parquet file itself has non-null values for the field (see 
"tip_history" field):
{code:java}
FROM replacecommit:
parquet.avro.schema: 
{"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
writer.model.name: avroSchema:
message triprec {
  ..
  required group tip_history (LIST) = 25 {
    repeated group array = 31 {
      required double amount = 32;
      required binary currency (STRING) = 33;
    }
  }
}Row group 0:  count: 123  101.62 B records  start: 4  total(compressed): 
12.206 kB total(uncompressed):19.198 kB 
--------------------------------------------------------------------------------
                               type      encodings count     avg size   nulls   
min / max
tip_history.array.amount       DOUBLE    G   _     123       8.44 B     0       
"1.5205662454434887" / "99.8588537812955"
tip_history.array.currency     BINARY    G _ R     123       0.82 B     0       
"USD" / "USD" {code}
Comparing the parquet metadata with the parquet files generated the deltacommit 
does not reveal any difference in schema:
{code:java}
FROM deltacommit:
parquet.avro.schema: 
{"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
writer.model.name: avro
Schema:
message triprec {
  ..
  required group tip_history (LIST) {
    repeated group array {
      required double amount;
      required binary currency (STRING);
    }
  }
}
Row group 0:  count: 12  269.67 B records  start: 4  total(compressed): 3.160 
kB total(uncompressed):2.990 kB 
--------------------------------------------------------------------------------
                               type      encodings count     avg size   nulls   
min / max
tip_history.array.amount       DOUBLE    G   _     12        12.83 B    0       
"9.152136419558687" / "98.82597551791528"
tip_history.array.currency     BINARY    G _ R     12        8.33 B     0       
"USD" / "USD" {code}
When looking into the parquet reader 
("org.apache.parquet.hadoop.InternalParquetRecordReader" used under the hood 
for reading the records), we see that whenever the list-typed field data is 
lost, the configuration somehow contains 
"parquet.avro.write-old-list-structure=false" causing AvroSchemaConverter to 
set "writeOldListStructure" to false (default value is true if the config is 
not set), which controls how the requestedSchema is generated (requestedSchema 
is used to read the data).

 
{code:java}
InternalParquetRecordReader:

ReadSupport.ReadContext readContext = this.readSupport.init(new 
InitContext(conf, toSetMultiMap(fileMetadata), this.fileSchema));
this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
this.requestedSchema = readContext.getRequestedSchema();
this.columnCount = this.requestedSchema.getPaths().size(); {code}
 
{code:java}
public AvroSchemaConverter(Configuration conf) {
  this.assumeRepeatedIsListElement = 
conf.getBoolean("parquet.avro.add-list-element-records", true);
  this.writeOldListStructure = 
conf.getBoolean("parquet.avro.write-old-list-structure", true);
  this.writeParquetUUID = conf.getBoolean("parquet.avro.write-parquet-uuid", 
false);
  this.readInt96AsFixed = conf.getBoolean("parquet.avro.readInt96AsFixed", 
false);
}

In private org.apache.parquet.schema.Type convertField(String fieldName, Schema 
schema, org.apache.parquet.schema.Type.Repetition repetition):

if (type.equals(Type.ARRAY)) {
  if (this.writeOldListStructure) {
    return ConversionPatterns.listType(repetition, fieldName, 
this.convertField("array", schema.getElementType(), Repetition.REPEATED));
  }

  return ConversionPatterns.listOfElements(repetition, fieldName, 
this.convertField("element", schema.getElementType()));
} {code}
 

When "writeOldListStructure" is set to false, the new list structure with 3 
levels is used, which is different from the file schema which uses the 2-level 
old list structure in the parquet schema.

 
{code:java}
  required group tip_history (LIST) {
    repeated group list {
      required group element {
        required double amount;
        required binary currency (STRING);
      }
    }
  } {code}
 

This means that in other places "writeOldListStructure" is set to true by 
default, writing old list structure to the parquet files. We need to make this 
consistent to avoid such data loss on read.

Explicitly setting "parquet.avro.write-old-list-structure=false" in the Hadoop 
config for all Hudi Spark jobs can get around this issue.

 

  was:
We saw an issue in Hudi 0.14 that `HoodieAvroParquetReader` returns null on the 
list-typed field on reading a parquet file generated by replacecommit 
(clustering), see screenshot below.  This caused NullPointerException

However, the parquet file itself has non-null values for the field (see 
"tip_history" field):
{code:java}
FROM replacecommit:
parquet.avro.schema: 
{"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
writer.model.name: avroSchema:
message triprec {
  ..
  required group tip_history (LIST) = 25 {
    repeated group array = 31 {
      required double amount = 32;
      required binary currency (STRING) = 33;
    }
  }
}Row group 0:  count: 123  101.62 B records  start: 4  total(compressed): 
12.206 kB total(uncompressed):19.198 kB 
--------------------------------------------------------------------------------
                               type      encodings count     avg size   nulls   
min / max
tip_history.array.amount       DOUBLE    G   _     123       8.44 B     0       
"1.5205662454434887" / "99.8588537812955"
tip_history.array.currency     BINARY    G _ R     123       0.82 B     0       
"USD" / "USD" {code}
Comparing the parquet metadata with the parquet files generated the deltacommit 
does not reveal any difference in schema:
{code:java}
FROM deltacommit:
parquet.avro.schema: 
{"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
writer.model.name: avro
Schema:
message triprec {
  ..
  required group tip_history (LIST) {
    repeated group array {
      required double amount;
      required binary currency (STRING);
    }
  }
}
Row group 0:  count: 12  269.67 B records  start: 4  total(compressed): 3.160 
kB total(uncompressed):2.990 kB 
--------------------------------------------------------------------------------
                               type      encodings count     avg size   nulls   
min / max
tip_history.array.amount       DOUBLE    G   _     12        12.83 B    0       
"9.152136419558687" / "98.82597551791528"
tip_history.array.currency     BINARY    G _ R     12        8.33 B     0       
"USD" / "USD" {code}
When looking into the parquet reader 
("org.apache.parquet.hadoop.InternalParquetRecordReader" used under the hood 
for reading the records), we see that whenever the list-typed field data is 
lost, the configuration somehow contains 
"parquet.avro.write-old-list-structure=false" causing AvroSchemaConverter to 
set "writeOldListStructure" to false (default value is true if the config is 
not set), which controls how the requestedSchema is generated (requestedSchema 
is used to read the data).

 
{code:java}
InternalParquetRecordReader:

ReadSupport.ReadContext readContext = this.readSupport.init(new 
InitContext(conf, toSetMultiMap(fileMetadata), this.fileSchema));
this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
this.requestedSchema = readContext.getRequestedSchema();
this.columnCount = this.requestedSchema.getPaths().size(); {code}
 
{code:java}
public AvroSchemaConverter(Configuration conf) {
  this.assumeRepeatedIsListElement = 
conf.getBoolean("parquet.avro.add-list-element-records", true);
  this.writeOldListStructure = 
conf.getBoolean("parquet.avro.write-old-list-structure", true);
  this.writeParquetUUID = conf.getBoolean("parquet.avro.write-parquet-uuid", 
false);
  this.readInt96AsFixed = conf.getBoolean("parquet.avro.readInt96AsFixed", 
false);
}

In private org.apache.parquet.schema.Type convertField(String fieldName, Schema 
schema, org.apache.parquet.schema.Type.Repetition repetition):

if (type.equals(Type.ARRAY)) {
  if (this.writeOldListStructure) {
    return ConversionPatterns.listType(repetition, fieldName, 
this.convertField("array", schema.getElementType(), Repetition.REPEATED));
  }

  return ConversionPatterns.listOfElements(repetition, fieldName, 
this.convertField("element", schema.getElementType()));
} {code}
 

When "writeOldListStructure" is set to false, the new list structure with 3 
levels is used, which is different from the file schema which uses the 2-level 
old list structure in the parquet schema.

 
{code:java}
  required group tip_history (LIST) {
    repeated group list {
      required group element {
        required double amount;
        required binary currency (STRING);
      }
    }
  } {code}
 

This means that in other places "writeOldListStructure" is set to true by 
default, writing old list structure to the parquet files. We need to make this 
consistent to avoid such data loss on read.

Explicitly setting "parquet.avro.write-old-list-structure=false" in the Hadoop 
config for all Hudi Spark jobs can get around this issue.

 


> Different parquet reader config on list-typed fields is used to read parquet 
> file generated by clustering
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-8299
>                 URL: https://issues.apache.org/jira/browse/HUDI-8299
>             Project: Apache Hudi
>          Issue Type: Bug
>    Affects Versions: 0.14.1
>            Reporter: Y Ethan Guo
>            Priority: Critical
>             Fix For: 1.0.0
>
>         Attachments: Screenshot 2024-10-03 at 14.31.41.png
>
>
> We saw an issue in Hudi 0.14 that `HoodieAvroParquetReader` returns null on 
> the list-typed field on reading a parquet file generated by replacecommit 
> (clustering), see screenshot below.  This caused NullPointerException during 
> indexing with global index using HoodieMergedReadHandle
> {code:java}
> org.apache.hudi.exception.HoodieException: Unable to instantiate payload 
> class 
>     at 
> org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:101)
>  ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:154)
>  ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:122)
>  ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.hudi.avro.HoodieAvroUtils.createHoodieRecordFromAvro(HoodieAvroUtils.java:1404)
>  ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.hudi.common.model.HoodieAvroIndexedRecord.wrapIntoHoodieRecordPayloadWithParams(HoodieAvroIndexedRecord.java:159)
>  ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.hudi.io.HoodieMergedReadHandle.doMergedRead(HoodieMergedReadHandle.java:164)
>  ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.hudi.io.HoodieMergedReadHandle.getMergedRecords(HoodieMergedReadHandle.java:91)
>  ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.hudi.index.HoodieIndexUtils.lambda$getExistingRecords$c7e45d15$1(HoodieIndexUtils.java:246)
>  ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137)
>  ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
>  ~[spark-core_2.12-3.2.3.jar:3.2.3]
>     at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) 
> ~[scala-library-2.12.19.jar:?]
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) 
> ~[scala-library-2.12.19.jar:?]
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
> ~[scala-library-2.12.19.jar:?]
>     at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
>  ~[spark-core_2.12-3.2.3.jar:3.2.3]
>     at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>  ~[spark-core_2.12-3.2.3.jar:3.2.3]
>     at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
>     at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) 
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
>     at org.apache.spark.scheduler.Task.run(Task.scala:131) 
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>  ~[spark-core_2.12-3.2.3.jar:3.2.3]
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) 
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) 
> ~[spark-core_2.12-3.2.3.jar:3.2.3]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[?:1.8.0_412]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[?:1.8.0_412]
>     at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_412]
> Caused by: java.lang.reflect.InvocationTargetException
>     at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source) 
> ~[?:?]
>     at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  ~[?:1.8.0_412]
>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
> ~[?:1.8.0_412]
>     at 
> org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
>  ~[hudi-utilities-bundle_2.12.jar]
>     ... 23 more
> Caused by: java.lang.NullPointerException: null value for (non-nullable) 
> List<tip_history> at triprec.tip_history
>     at 
> org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:88)
>  ~[avro-1.11.3.jar:1.11.3]
>     at 
> org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:30)
>  ~[avro-1.11.3.jar:1.11.3]
>     at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:84) 
> ~[avro-1.11.3.jar:1.11.3]
>     at 
> org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:189)
>  ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:181) 
> ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53) 
> ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44)
>  ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53)
>  ~[hudi-utilities-bundle_2.12.jar]
>     at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source) 
> ~[?:?]
>     at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  ~[?:1.8.0_412]
>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
> ~[?:1.8.0_412]
>     at 
> org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
>  ~[hudi-utilities-bundle_2.12.jar]
>     ... 23 more
> Caused by: java.lang.NullPointerException
>     at 
> org.apache.avro.generic.GenericDatumWriter.getArraySize(GenericDatumWriter.java:315)
>  ~[avro-1.11.3.jar:1.11.3]
>     at 
> org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:281)
>  ~[avro-1.11.3.jar:1.11.3]
>     at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:151)
>  ~[avro-1.11.3.jar:1.11.3]
>     at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) 
> ~[avro-1.11.3.jar:1.11.3]
>     at 
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:245)
>  ~[avro-1.11.3.jar:1.11.3]
>     at 
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234)
>  ~[avro-1.11.3.jar:1.11.3]
>     at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145)
>  ~[avro-1.11.3.jar:1.11.3]
>     at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) 
> ~[avro-1.11.3.jar:1.11.3]
>     at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) 
> ~[avro-1.11.3.jar:1.11.3]
>     at 
> org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:189)
>  ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:181) 
> ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53) 
> ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44)
>  ~[hudi-utilities-bundle_2.12.jar]
>     at 
> org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53)
>  ~[hudi-utilities-bundle_2.12.jar]
>     at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source) 
> ~[?:?]
>     at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  ~[?:1.8.0_412]
>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
> ~[?:1.8.0_412]
>     at 
> org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99)
>  ~[hudi-utilities-bundle_2.12.jar]
>     ... 23 more
>  {code}
> However, the parquet file itself has non-null values for the field (see 
> "tip_history" field):
> {code:java}
> FROM replacecommit:
> parquet.avro.schema: 
> {"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
> writer.model.name: avroSchema:
> message triprec {
>   ..
>   required group tip_history (LIST) = 25 {
>     repeated group array = 31 {
>       required double amount = 32;
>       required binary currency (STRING) = 33;
>     }
>   }
> }Row group 0:  count: 123  101.62 B records  start: 4  total(compressed): 
> 12.206 kB total(uncompressed):19.198 kB 
> --------------------------------------------------------------------------------
>                                type      encodings count     avg size   nulls 
>   min / max
> tip_history.array.amount       DOUBLE    G   _     123       8.44 B     0     
>   "1.5205662454434887" / "99.8588537812955"
> tip_history.array.currency     BINARY    G _ R     123       0.82 B     0     
>   "USD" / "USD" {code}
> Comparing the parquet metadata with the parquet files generated the 
> deltacommit does not reveal any difference in schema:
> {code:java}
> FROM deltacommit:
> parquet.avro.schema: 
> {"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
> writer.model.name: avro
> Schema:
> message triprec {
>   ..
>   required group tip_history (LIST) {
>     repeated group array {
>       required double amount;
>       required binary currency (STRING);
>     }
>   }
> }
> Row group 0:  count: 12  269.67 B records  start: 4  total(compressed): 3.160 
> kB total(uncompressed):2.990 kB 
> --------------------------------------------------------------------------------
>                                type      encodings count     avg size   nulls 
>   min / max
> tip_history.array.amount       DOUBLE    G   _     12        12.83 B    0     
>   "9.152136419558687" / "98.82597551791528"
> tip_history.array.currency     BINARY    G _ R     12        8.33 B     0     
>   "USD" / "USD" {code}
> When looking into the parquet reader 
> ("org.apache.parquet.hadoop.InternalParquetRecordReader" used under the hood 
> for reading the records), we see that whenever the list-typed field data is 
> lost, the configuration somehow contains 
> "parquet.avro.write-old-list-structure=false" causing AvroSchemaConverter to 
> set "writeOldListStructure" to false (default value is true if the config is 
> not set), which controls how the requestedSchema is generated 
> (requestedSchema is used to read the data).
>  
> {code:java}
> InternalParquetRecordReader:
> ReadSupport.ReadContext readContext = this.readSupport.init(new 
> InitContext(conf, toSetMultiMap(fileMetadata), this.fileSchema));
> this.columnIOFactory = new 
> ColumnIOFactory(parquetFileMetadata.getCreatedBy());
> this.requestedSchema = readContext.getRequestedSchema();
> this.columnCount = this.requestedSchema.getPaths().size(); {code}
>  
> {code:java}
> public AvroSchemaConverter(Configuration conf) {
>   this.assumeRepeatedIsListElement = 
> conf.getBoolean("parquet.avro.add-list-element-records", true);
>   this.writeOldListStructure = 
> conf.getBoolean("parquet.avro.write-old-list-structure", true);
>   this.writeParquetUUID = conf.getBoolean("parquet.avro.write-parquet-uuid", 
> false);
>   this.readInt96AsFixed = conf.getBoolean("parquet.avro.readInt96AsFixed", 
> false);
> }
> In private org.apache.parquet.schema.Type convertField(String fieldName, 
> Schema schema, org.apache.parquet.schema.Type.Repetition repetition):
> if (type.equals(Type.ARRAY)) {
>   if (this.writeOldListStructure) {
>     return ConversionPatterns.listType(repetition, fieldName, 
> this.convertField("array", schema.getElementType(), Repetition.REPEATED));
>   }
>   return ConversionPatterns.listOfElements(repetition, fieldName, 
> this.convertField("element", schema.getElementType()));
> } {code}
>  
> When "writeOldListStructure" is set to false, the new list structure with 3 
> levels is used, which is different from the file schema which uses the 
> 2-level old list structure in the parquet schema.
>  
> {code:java}
>   required group tip_history (LIST) {
>     repeated group list {
>       required group element {
>         required double amount;
>         required binary currency (STRING);
>       }
>     }
>   } {code}
>  
> This means that in other places "writeOldListStructure" is set to true by 
> default, writing old list structure to the parquet files. We need to make 
> this consistent to avoid such data loss on read.
> Explicitly setting "parquet.avro.write-old-list-structure=false" in the 
> Hadoop config for all Hudi Spark jobs can get around this issue.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to