vinishjail97 opened a new pull request, #10869:
URL: https://github.com/apache/hudi/pull/10869

   ### Change Logs
   
   The current logic for computing offset ranges is leading to skews and 
negative offsets because of the way they are calculated. 
   
https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java#L144
 
   
   Problems faced. 
   1. We are calculating eventsPerPartition based on available partitions that 
are not exhausted this can lead to skews where one partition handles only 1-10 
messages and the remaining one handles 100K messages, the idea for 
minPartitions is to increase the parallelism and ensure that each spark task is 
reading approximately the same number of events. 
   2. remainingPartitions can become negative when finalRanges exceeds the size 
of minPartitions. 
   3. Complicated fork in code when minPartitions > toOffsetsMap, this is not 
required IMO and the default minPartitions can always fall back 
toOffsetsMap.size(), this takes care of situations when the partitions increase 
in kafka as well. 
   
   ```
             long remainingEvents = actualNumEvents - allocatedEvents;
             long remainingPartitions = toOffsetMap.size() - 
allocatedPartitionsThisLoop.size();
             // if need tp split into minPartitions, recalculate the 
remainingPartitions
             if (needSplitToMinPartitions) {
               remainingPartitions = minPartitions - finalRanges.size();
             }
             long eventsPerPartition = (long) Math.ceil((1.0 * remainingEvents) 
/ remainingPartitions);
   ```
   
   New Approach
   1. Find eventsPerPartition which would be Math.max(1L, actualNumEvents / 
minPartitions);
   2. Keep computing offsetRanges unless allocatedEvents < actualNumEvents, 
compute them in a round-robin manner and keep the upper limit of 
eventsPerPartition messages for each range.
   3.  Return all the offsetRanges in the end after sorting them by partition 
   
   ### Impact
   
   No change in public API, the skew in spark task  partitions and bug related 
to negative remainingPartitions when consuming from Kafka is being fixed.
   
   ### Risk level (write none, low medium or high below)
   
   Medium
   
   ### Documentation Update
   
   None, the existing config KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS is 
now being used the way it's supposed to be used. 
   
   ### Contributor's checklist
   
   - [x] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [x] Change Logs and Impact were stated clearly
   - [x] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to