vinishjail97 opened a new pull request, #10869: URL: https://github.com/apache/hudi/pull/10869
### Change Logs The current logic for computing offset ranges is leading to skews and negative offsets because of the way they are calculated. https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java#L144 Problems faced. 1. We are calculating eventsPerPartition based on available partitions that are not exhausted this can lead to skews where one partition handles only 1-10 messages and the remaining one handles 100K messages, the idea for minPartitions is to increase the parallelism and ensure that each spark task is reading approximately the same number of events. 2. remainingPartitions can become negative when finalRanges exceeds the size of minPartitions. 3. Complicated fork in code when minPartitions > toOffsetsMap, this is not required IMO and the default minPartitions can always fall back toOffsetsMap.size(), this takes care of situations when the partitions increase in kafka as well. ``` long remainingEvents = actualNumEvents - allocatedEvents; long remainingPartitions = toOffsetMap.size() - allocatedPartitionsThisLoop.size(); // if need tp split into minPartitions, recalculate the remainingPartitions if (needSplitToMinPartitions) { remainingPartitions = minPartitions - finalRanges.size(); } long eventsPerPartition = (long) Math.ceil((1.0 * remainingEvents) / remainingPartitions); ``` New Approach 1. Find eventsPerPartition which would be Math.max(1L, actualNumEvents / minPartitions); 2. Keep computing offsetRanges unless allocatedEvents < actualNumEvents, compute them in a round-robin manner and keep the upper limit of eventsPerPartition messages for each range. 3. Return all the offsetRanges in the end after sorting them by partition ### Impact No change in public API, the skew in spark task partitions and bug related to negative remainingPartitions when consuming from Kafka is being fixed. ### Risk level (write none, low medium or high below) Medium ### Documentation Update None, the existing config KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS is now being used the way it's supposed to be used. ### Contributor's checklist - [x] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute) - [x] Change Logs and Impact were stated clearly - [x] Adequate tests were added if applicable - [ ] CI passed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
