[
https://issues.apache.org/jira/browse/HUDI-7506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vinish Reddy updated HUDI-7506:
-------------------------------
Issue Type: Bug (was: Improvement)
> Compute offsetRanges based on eventsPerPartition allocated in each range
> -------------------------------------------------------------------------
>
> Key: HUDI-7506
> URL: https://issues.apache.org/jira/browse/HUDI-7506
> Project: Apache Hudi
> Issue Type: Bug
> Components: deltastreamer
> Reporter: Vinish Reddy
> Assignee: Vinish Reddy
> Priority: Critical
> Labels: pull-request-available
>
> The current logic for computing offset ranges is leading to skews and
> negative offsets because of the way they are calculated.
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java#L144]
>
> Problems faced.
> 1. We are calculating eventsPerPartition based on available partitions that
> are not exhausted this can lead to skews where one partition handles only
> 1-10 messages and the remaining one handles 100K messages, the idea for
> minPartitions is to increase the parallelism and ensure that each spark task
> is reading approximately the same number of events.
> 2. remainingPartitions can become negative when finalRanges exceeds the size
> of minPartitions.
> 3. Complicated fork in code when minPartitions > toOffsetsMap, this is not
> required IMO and the default minPartitions can always fall back
> toOffsetsMap.size(), this takes care of situations when the partitions
> increase in kafka as well.
>
> New Approach
> 1. Find _eventsPerPartition_ which would be Math.max(1L, actualNumEvents /
> minPartitions);
> 2. Keep computing offsetRanges unless allocatedEvents < actualNumEvents,
> compute them in a round-robin manner and keep the upper limit of
> _eventsPerPartition_ messages for each range.
> 3. Return all the offsetRanges in the end after sorting them by partition
--
This message was sent by Atlassian Jira
(v8.20.10#820010)