[
https://issues.apache.org/jira/browse/HUDI-7506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vinish Reddy updated HUDI-7506:
-------------------------------
Description:
The current logic for computing offset ranges is leading to skews and negative
offsets because of the way they are calculated.
[https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java#L144]
Problems faced.
1. We are calculating eventsPerPartition based on available partitions that are
not exhausted this can lead to skews where one partition handles only 1-10
messages and the remaining one handles 100K messages, the idea for
minPartitions is to increase the parallelism and ensure that each spark task is
reading approximately the same number of events.
2. remainingPartitions can become negative when finalRanges exceeds the size of
minPartitions.
3. Complicated fork in code when minPartitions > toOffsetsMap, this is not
required IMO and the default minPartitions can always fall back
toOffsetsMap.size(), this takes care of situations when the partitions increase
in kafka as well.
{{ long remainingPartitions = toOffsetMap.size() -
allocatedPartitionsThisLoop.size();}}
{{ // if need tp split into minPartitions, recalculate the
remainingPartitions}}
{{ if (needSplitToMinPartitions) {}}
{{ remainingPartitions = minPartitions - finalRanges.size();}}
{
{ }
}}
{{ long eventsPerPartition = (long) Math.ceil((1.0 * remainingEvents)
/ remainingPartitions);}}
New Approach
1. Find _eventsPerPartition_ which would be Math.max(1L, actualNumEvents /
minPartitions);
2. Keep computing offsetRanges unless allocatedEvents < actualNumEvents,
compute them in a round-robin manner and keep the upper limit of
_eventsPerPartition_ messages for each range.
3. Return all the offsetRanges in the end after sorting them by partition
was:
The current logic for computing offset ranges is leading to skews and negative
offsets because of the way they are calculated [[Ref|#L144]]]
Problems faced.
1. We are calculating eventsPerPartition based on available partitions that are
not exhausted this can lead to skews where one partition handles only 1-10
messages and the remaining one handles 100K messages, the idea for
minPartitions is to increase the parallelism and ensure that each spark task is
reading approximately the same number of events.
2. remainingPartitions can become negative when finalRanges exceeds the size of
minPartitions.
3. Complicated fork in code when minPartitions > toOffsetsMap, this is not
required IMO and the default minPartitions can always fall back
toOffsetsMap.size(), this takes care of situations when the partitions increase
in kafka as well.
{{ long remainingPartitions = toOffsetMap.size() -
allocatedPartitionsThisLoop.size();}}
{{ // if need tp split into minPartitions, recalculate the
remainingPartitions}}
{{ if (needSplitToMinPartitions) {}}
{{ remainingPartitions = minPartitions - finalRanges.size();}}
{\\{ }
}}
{{ long eventsPerPartition = (long) Math.ceil((1.0 * remainingEvents)
/ remainingPartitions);}}
New Approach
1. Find _eventsPerPartition_ which would be Math.max(1L, actualNumEvents /
minPartitions);
2. Keep computing offsetRanges unless allocatedEvents < actualNumEvents,
compute them in a round-robin manner and keep the upper limit of
_eventsPerPartition_ messages for each range.
3. Return all the offsetRanges in the end after sorting them by partition
> Compute offsetRanges based on eventsPerPartition allocated in each range
> -------------------------------------------------------------------------
>
> Key: HUDI-7506
> URL: https://issues.apache.org/jira/browse/HUDI-7506
> Project: Apache Hudi
> Issue Type: Improvement
> Components: deltastreamer
> Reporter: Vinish Reddy
> Assignee: Vinish Reddy
> Priority: Critical
>
> The current logic for computing offset ranges is leading to skews and
> negative offsets because of the way they are calculated.
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java#L144]
>
> Problems faced.
> 1. We are calculating eventsPerPartition based on available partitions that
> are not exhausted this can lead to skews where one partition handles only
> 1-10 messages and the remaining one handles 100K messages, the idea for
> minPartitions is to increase the parallelism and ensure that each spark task
> is reading approximately the same number of events.
> 2. remainingPartitions can become negative when finalRanges exceeds the size
> of minPartitions.
> 3. Complicated fork in code when minPartitions > toOffsetsMap, this is not
> required IMO and the default minPartitions can always fall back
> toOffsetsMap.size(), this takes care of situations when the partitions
> increase in kafka as well.
> {{ long remainingPartitions = toOffsetMap.size() -
> allocatedPartitionsThisLoop.size();}}
> {{ // if need tp split into minPartitions, recalculate the
> remainingPartitions}}
> {{ if (needSplitToMinPartitions) {}}
> {{ remainingPartitions = minPartitions - finalRanges.size();}}
> {
> { }
> }}
> {{ long eventsPerPartition = (long) Math.ceil((1.0 *
> remainingEvents) / remainingPartitions);}}
> New Approach
> 1. Find _eventsPerPartition_ which would be Math.max(1L, actualNumEvents /
> minPartitions);
> 2. Keep computing offsetRanges unless allocatedEvents < actualNumEvents,
> compute them in a round-robin manner and keep the upper limit of
> _eventsPerPartition_ messages for each range.
> 3. Return all the offsetRanges in the end after sorting them by partition
--
This message was sent by Atlassian Jira
(v8.20.10#820010)