vinishjail97 opened a new pull request, #11014:
URL: https://github.com/apache/hudi/pull/11014

   ### Change Logs
   
   NOTE: This PR handles only AVRO code paths, there will be follow-up patch 
for RowWriter code paths as well. 
   
   There are two problems with BULK_INSERT and partitioners.  
   1. Passing user defined partitioner using 
`hoodie.bulkinsert.user.defined.partitioner.class` is not honoured in the 
StreamSync code path and the data is written in a non-sort mode and can lead to 
OOM errors because of too many open writeHandles. 
   2. There is another problem with `RDDCustomColumnsSortPartitioner` where 
data is globally sorted but too many files are written because data is actually 
not pre-pending the partition keys in the sort columns. The unit test fails 
with this error for existing code.
   ```
   org.opentest4j.AssertionFailedError: 
   Expected :654
   Actual   :3
   <Click to see difference> 
   
   // Verify each partition has one base file because parallelism is 1.
   assertEquals(baseFiles.size(), partitions.size());
   ```
   
   
https://github.com/onehouseinc/hudi-internal/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java#L60
 
   ```
    @Override
     public JavaRDD<HoodieRecord<T>> 
repartitionRecords(JavaRDD<HoodieRecord<T>> records,
                                                        int 
outputSparkPartitions) {
       final String[] sortColumns = this.sortColumnNames;
       final SerializableSchema schema = this.serializableSchema;
       final boolean consistentLogicalTimestampEnabled = 
this.consistentLogicalTimestampEnabled;
       return records.sortBy(
           record -> {
             Object[] columnValues = record.getColumnValues(schema.get(), 
sortColumns, consistentLogicalTimestampEnabled);
             return FlatLists.ofComparableArray(columnValues);
           },
           true, outputSparkPartitions);
     } 
   ```
   
   But `_hoodie_partition_path` is returned as null here using 
record.getColumnValues, added the screenshots from debugger because these 
fields are actually added as part of `HoodieAvroParquetWriter`. 
   
https://github.com/onehouseinc/hudi-internal/blob/master/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java#L64
 
   ```
     @Override
     public void writeAvroWithMetadata(HoodieKey key, IndexedRecord avroRecord) 
throws IOException {
       if (populateMetaFields) {
         prepRecordWithMetadata(key, avroRecord, instantTime,
             taskContextSupplier.getPartitionIdSupplier().get(), 
getWrittenRecordCount(), fileName);
         super.write(avroRecord);
         writeSupport.add(key.getRecordKey());
       } else {
         super.write(avroRecord);
       }
     }
   
     default void prepRecordWithMetadata(HoodieKey key, IndexedRecord 
avroRecord, String instantTime, Integer partitionId, long recordIndex, String 
fileName) {
       String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, 
recordIndex);
       HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, 
key.getRecordKey(), key.getPartitionPath(), fileName);
       HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, 
instantTime, seqId);
     }
   
   ```
   Attaching the screenshots below where _hoodie_partition_path column is null. 
   
![image](https://github.com/onehouseinc/hudi-internal/assets/16958856/131fcf7c-7bc6-49db-aa82-d2871bf49a8f)
    
   
   
![image](https://github.com/onehouseinc/hudi-internal/assets/16958856/3bed6d38-cad8-4287-b892-a1ca84b738f6)
   
   
   ### Impact
   
   No impact, fixing the bugs related to BULK_INSERT user defined partitioners 
to ensure it sorts the data correctly.
   
   ### Risk level (write none, low medium or high below)
   
   Medium.
   
   ### Documentation Update
   
   None.
   
   ### Contributor's checklist
   
   - [x] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [x] Change Logs and Impact were stated clearly
   - [x] Adequate tests were added if applicable
   - [x] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to