vinishjail97 commented on code in PR #10869:
URL: https://github.com/apache/hudi/pull/10869#discussion_r1526598398
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java:
##########
@@ -129,63 +130,56 @@ public static OffsetRange[]
computeOffsetRanges(Map<TopicPartition, Long> fromOf
.toArray(new OffsetRange[toOffsetMap.size()]);
LOG.debug("numEvents {}, minPartitions {}, ranges {}", numEvents,
minPartitions, ranges);
- boolean needSplitToMinPartitions = minPartitions > toOffsetMap.size();
- long totalEvents = totalNewMessages(ranges);
- long allocatedEvents = 0;
- Set<Integer> exhaustedPartitions = new HashSet<>();
- List<OffsetRange> finalRanges = new ArrayList<>();
// choose the actualNumEvents with min(totalEvents, numEvents)
- long actualNumEvents = Math.min(totalEvents, numEvents);
-
- // keep going until we have events to allocate and partitions still not
exhausted.
- while (allocatedEvents < numEvents && exhaustedPartitions.size() <
toOffsetMap.size()) {
- // Allocate the remaining events to non-exhausted partitions, in round
robin fashion
- Set<Integer> allocatedPartitionsThisLoop = new
HashSet<>(exhaustedPartitions);
- for (int i = 0; i < ranges.length; i++) {
- long remainingEvents = actualNumEvents - allocatedEvents;
- long remainingPartitions = toOffsetMap.size() -
allocatedPartitionsThisLoop.size();
- // if need tp split into minPartitions, recalculate the
remainingPartitions
- if (needSplitToMinPartitions) {
- remainingPartitions = minPartitions - finalRanges.size();
- }
- long eventsPerPartition = (long) Math.ceil((1.0 * remainingEvents) /
remainingPartitions);
-
- OffsetRange range = ranges[i];
- if (exhaustedPartitions.contains(range.partition())) {
- continue;
+ long actualNumEvents = Math.min(totalNewMessages(ranges), numEvents);
+ minPartitions = Math.max(minPartitions, toOffsetMap.size());
+ // Each OffsetRange computed will have maximum of eventsPerPartition,
+ // this ensures all ranges are evenly distributed and there's no skew in
one particular range.
+ long eventsPerPartition = Math.max(1L, actualNumEvents / minPartitions);
+ long allocatedEvents = 0;
+ Map<TopicPartition, List<OffsetRange>> finalRanges = new HashMap<>();
+ // keep going until we have events to allocate.
+ while (allocatedEvents < actualNumEvents) {
+ // Allocate the remaining events in round-robin fashion.
+ for (OffsetRange range : ranges) {
+ // Compute startOffset.
+ long startOffset = range.fromOffset();
+ if (finalRanges.containsKey(range.topicPartition()) &&
!finalRanges.get(range.topicPartition()).isEmpty()) {
+ List<OffsetRange> offsetRangesForPartition =
finalRanges.get(range.topicPartition());
+ startOffset =
offsetRangesForPartition.get(offsetRangesForPartition.size() - 1).untilOffset();
Review Comment:
Addressed as mentioned in the second comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]