Vinish Reddy created HUDI-7506:
----------------------------------

             Summary: Compute offsetRanges based on eventsPerPartition 
allocated in each range 
                 Key: HUDI-7506
                 URL: https://issues.apache.org/jira/browse/HUDI-7506
             Project: Apache Hudi
          Issue Type: Improvement
          Components: deltastreamer
            Reporter: Vinish Reddy
            Assignee: Vinish Reddy


The current logic for computing offset ranges is leading to skews because of 
the way they are calculated [[Ref| 
[https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java#L144]]]

Problems faced. 
1. We are calculating eventsPerPartition based on available partitions that are 
not exhausted this can lead to skews where one partition handles only 1-10 
messages and the remaining one handles 100K messages, the idea for 
minPartitions is to increase the parallelism and ensure that each spark task is 
reading approximately the same number of events. 
2. remainingPartitions can become negative when finalRanges exceeds the size of 
minPartitions. 
3. Complicated fork in code when minPartitions > toOffsetsMap, this is not 
required IMO and the default minPartitions can always fall back 
toOffsetsMap.size(), this takes care of situations when the partitions increase 
in kafka as well. 

{{           long remainingPartitions = toOffsetMap.size() - 
allocatedPartitionsThisLoop.size();}}
{{          // if need tp split into minPartitions, recalculate the 
remainingPartitions}}
{{          if (needSplitToMinPartitions) {}}
{{            remainingPartitions = minPartitions - finalRanges.size();}}
{{          }}}
{{          long eventsPerPartition = (long) Math.ceil((1.0 * remainingEvents) 
/ remainingPartitions);}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to