vinishjail97 opened a new pull request, #11014: URL: https://github.com/apache/hudi/pull/11014
### Change Logs NOTE: This PR handles only AVRO code paths, there will be follow-up patch for RowWriter code paths as well. There are two problems with BULK_INSERT and partitioners. 1. Passing user defined partitioner using `hoodie.bulkinsert.user.defined.partitioner.class` is not honoured in the StreamSync code path and the data is written in a non-sort mode and can lead to OOM errors because of too many open writeHandles. 2. There is another problem with `RDDCustomColumnsSortPartitioner` where data is globally sorted but too many files are written because data is actually not pre-pending the partition keys in the sort columns. The unit test fails with this error for existing code. ``` org.opentest4j.AssertionFailedError: Expected :654 Actual :3 <Click to see difference> // Verify each partition has one base file because parallelism is 1. assertEquals(baseFiles.size(), partitions.size()); ``` https://github.com/onehouseinc/hudi-internal/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java#L60 ``` @Override public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) { final String[] sortColumns = this.sortColumnNames; final SerializableSchema schema = this.serializableSchema; final boolean consistentLogicalTimestampEnabled = this.consistentLogicalTimestampEnabled; return records.sortBy( record -> { Object[] columnValues = record.getColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled); return FlatLists.ofComparableArray(columnValues); }, true, outputSparkPartitions); } ``` But `_hoodie_partition_path` is returned as null here using record.getColumnValues, added the screenshots from debugger because these fields are actually added as part of `HoodieAvroParquetWriter`. https://github.com/onehouseinc/hudi-internal/blob/master/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java#L64 ``` @Override public void writeAvroWithMetadata(HoodieKey key, IndexedRecord avroRecord) throws IOException { if (populateMetaFields) { prepRecordWithMetadata(key, avroRecord, instantTime, taskContextSupplier.getPartitionIdSupplier().get(), getWrittenRecordCount(), fileName); super.write(avroRecord); writeSupport.add(key.getRecordKey()); } else { super.write(avroRecord); } } default void prepRecordWithMetadata(HoodieKey key, IndexedRecord avroRecord, String instantTime, Integer partitionId, long recordIndex, String fileName) { String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex); HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, key.getRecordKey(), key.getPartitionPath(), fileName); HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId); } ``` Attaching the screenshots below where _hoodie_partition_path column is null.   ### Impact No impact, fixing the bugs related to BULK_INSERT user defined partitioners to ensure it sorts the data correctly. ### Risk level (write none, low medium or high below) Medium. ### Documentation Update None. ### Contributor's checklist - [x] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute) - [x] Change Logs and Impact were stated clearly - [x] Adequate tests were added if applicable - [x] CI passed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
