[
https://issues.apache.org/jira/browse/HUDI-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Y Ethan Guo updated HUDI-8299:
------------------------------
Affects Version/s: 0.14.1
> Different parquet reader config on list-typed fields is used to read parquet
> file generated by clustering
> ---------------------------------------------------------------------------------------------------------
>
> Key: HUDI-8299
> URL: https://issues.apache.org/jira/browse/HUDI-8299
> Project: Apache Hudi
> Issue Type: Bug
> Affects Versions: 0.14.1
> Reporter: Y Ethan Guo
> Priority: Critical
> Fix For: 1.0.0
>
> Attachments: Screenshot 2024-10-03 at 14.31.41.png
>
>
> We saw an issue in Hudi 0.14 that `HoodieAvroParquetReader` returns null on
> the list-typed field on reading a parquet file generated by replacecommit
> (clustering), see screenshot below. This caused NullPointerException
> However, the parquet file itself has non-null values for the field (see
> "tip_history" field):
> {code:java}
> FROM replacecommit:
> parquet.avro.schema:
> {"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
> writer.model.name: avroSchema:
> message triprec {
> ..
> required group tip_history (LIST) = 25 {
> repeated group array = 31 {
> required double amount = 32;
> required binary currency (STRING) = 33;
> }
> }
> }Row group 0: count: 123 101.62 B records start: 4 total(compressed):
> 12.206 kB total(uncompressed):19.198 kB
> --------------------------------------------------------------------------------
> type encodings count avg size nulls
> min / max
> tip_history.array.amount DOUBLE G _ 123 8.44 B 0
> "1.5205662454434887" / "99.8588537812955"
> tip_history.array.currency BINARY G _ R 123 0.82 B 0
> "USD" / "USD" {code}
> Comparing the parquet metadata with the parquet files generated the
> deltacommit does not reveal any difference in schema:
> {code:java}
> FROM deltacommit:
> parquet.avro.schema:
> {"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
> writer.model.name: avro
> Schema:
> message triprec {
> ..
> required group tip_history (LIST) {
> repeated group array {
> required double amount;
> required binary currency (STRING);
> }
> }
> }
> Row group 0: count: 12 269.67 B records start: 4 total(compressed): 3.160
> kB total(uncompressed):2.990 kB
> --------------------------------------------------------------------------------
> type encodings count avg size nulls
> min / max
> tip_history.array.amount DOUBLE G _ 12 12.83 B 0
> "9.152136419558687" / "98.82597551791528"
> tip_history.array.currency BINARY G _ R 12 8.33 B 0
> "USD" / "USD" {code}
> When looking into the parquet reader
> ("org.apache.parquet.hadoop.InternalParquetRecordReader" used under the hood
> for reading the records), we see that whenever the list-typed field data is
> lost, the configuration somehow contains
> "parquet.avro.write-old-list-structure=false" causing AvroSchemaConverter to
> set "writeOldListStructure" to false (default value is true if the config is
> not set), which controls how the requestedSchema is generated
> (requestedSchema is used to read the data).
>
> {code:java}
> InternalParquetRecordReader:
> ReadSupport.ReadContext readContext = this.readSupport.init(new
> InitContext(conf, toSetMultiMap(fileMetadata), this.fileSchema));
> this.columnIOFactory = new
> ColumnIOFactory(parquetFileMetadata.getCreatedBy());
> this.requestedSchema = readContext.getRequestedSchema();
> this.columnCount = this.requestedSchema.getPaths().size(); {code}
>
> {code:java}
> public AvroSchemaConverter(Configuration conf) {
> this.assumeRepeatedIsListElement =
> conf.getBoolean("parquet.avro.add-list-element-records", true);
> this.writeOldListStructure =
> conf.getBoolean("parquet.avro.write-old-list-structure", true);
> this.writeParquetUUID = conf.getBoolean("parquet.avro.write-parquet-uuid",
> false);
> this.readInt96AsFixed = conf.getBoolean("parquet.avro.readInt96AsFixed",
> false);
> }
> In private org.apache.parquet.schema.Type convertField(String fieldName,
> Schema schema, org.apache.parquet.schema.Type.Repetition repetition):
> if (type.equals(Type.ARRAY)) {
> if (this.writeOldListStructure) {
> return ConversionPatterns.listType(repetition, fieldName,
> this.convertField("array", schema.getElementType(), Repetition.REPEATED));
> }
> return ConversionPatterns.listOfElements(repetition, fieldName,
> this.convertField("element", schema.getElementType()));
> } {code}
>
> When "writeOldListStructure" is set to false, the new list structure with 3
> levels is used, which is different from the file schema which uses the
> 2-level old list structure in the parquet schema.
>
> {code:java}
> required group tip_history (LIST) {
> repeated group list {
> required group element {
> required double amount;
> required binary currency (STRING);
> }
> }
> } {code}
>
> This means that in other places "writeOldListStructure" is set to true by
> default, writing old list structure to the parquet files. We need to make
> this consistent to avoid such data loss on read.
> Explicitly setting "parquet.avro.write-old-list-structure=false" in the
> Hadoop config for all Hudi Spark jobs can get around this issue.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)