nsivabalan commented on code in PR #10869:
URL: https://github.com/apache/hudi/pull/10869#discussion_r1525659089


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java:
##########
@@ -129,63 +130,56 @@ public static OffsetRange[] 
computeOffsetRanges(Map<TopicPartition, Long> fromOf
           .toArray(new OffsetRange[toOffsetMap.size()]);
       LOG.debug("numEvents {}, minPartitions {}, ranges {}", numEvents, 
minPartitions, ranges);
 
-      boolean needSplitToMinPartitions = minPartitions > toOffsetMap.size();
-      long totalEvents = totalNewMessages(ranges);
-      long allocatedEvents = 0;
-      Set<Integer> exhaustedPartitions = new HashSet<>();
-      List<OffsetRange> finalRanges = new ArrayList<>();
       // choose the actualNumEvents with min(totalEvents, numEvents)
-      long actualNumEvents = Math.min(totalEvents, numEvents);
-
-      // keep going until we have events to allocate and partitions still not 
exhausted.
-      while (allocatedEvents < numEvents && exhaustedPartitions.size() < 
toOffsetMap.size()) {
-        // Allocate the remaining events to non-exhausted partitions, in round 
robin fashion
-        Set<Integer> allocatedPartitionsThisLoop = new 
HashSet<>(exhaustedPartitions);
-        for (int i = 0; i < ranges.length; i++) {
-          long remainingEvents = actualNumEvents - allocatedEvents;
-          long remainingPartitions = toOffsetMap.size() - 
allocatedPartitionsThisLoop.size();
-          // if need tp split into minPartitions, recalculate the 
remainingPartitions
-          if (needSplitToMinPartitions) {
-            remainingPartitions = minPartitions - finalRanges.size();
-          }
-          long eventsPerPartition = (long) Math.ceil((1.0 * remainingEvents) / 
remainingPartitions);
-
-          OffsetRange range = ranges[i];
-          if (exhaustedPartitions.contains(range.partition())) {
-            continue;
+      long actualNumEvents = Math.min(totalNewMessages(ranges), numEvents);
+      minPartitions = Math.max(minPartitions, toOffsetMap.size());
+      // Each OffsetRange computed will have maximum of eventsPerPartition,
+      // this ensures all ranges are evenly distributed and there's no skew in 
one particular range.
+      long eventsPerPartition = Math.max(1L, actualNumEvents / minPartitions);
+      long allocatedEvents = 0;
+      Map<TopicPartition, List<OffsetRange>> finalRanges = new HashMap<>();
+      // keep going until we have events to allocate.
+      while (allocatedEvents < actualNumEvents) {
+        // Allocate the remaining events in round-robin fashion.
+        for (OffsetRange range : ranges) {
+          // Compute startOffset.
+          long startOffset = range.fromOffset();
+          if (finalRanges.containsKey(range.topicPartition()) && 
!finalRanges.get(range.topicPartition()).isEmpty()) {
+            List<OffsetRange> offsetRangesForPartition = 
finalRanges.get(range.topicPartition());
+            startOffset = 
offsetRangesForPartition.get(offsetRangesForPartition.size() - 1).untilOffset();

Review Comment:
   should we maintain a hashmap of partition -> latestEndOffset 
   rather than going over finalRanges and computing each time.
   reads simpler. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java:
##########
@@ -129,63 +130,56 @@ public static OffsetRange[] 
computeOffsetRanges(Map<TopicPartition, Long> fromOf
           .toArray(new OffsetRange[toOffsetMap.size()]);
       LOG.debug("numEvents {}, minPartitions {}, ranges {}", numEvents, 
minPartitions, ranges);
 
-      boolean needSplitToMinPartitions = minPartitions > toOffsetMap.size();
-      long totalEvents = totalNewMessages(ranges);
-      long allocatedEvents = 0;
-      Set<Integer> exhaustedPartitions = new HashSet<>();
-      List<OffsetRange> finalRanges = new ArrayList<>();
       // choose the actualNumEvents with min(totalEvents, numEvents)
-      long actualNumEvents = Math.min(totalEvents, numEvents);
-
-      // keep going until we have events to allocate and partitions still not 
exhausted.
-      while (allocatedEvents < numEvents && exhaustedPartitions.size() < 
toOffsetMap.size()) {
-        // Allocate the remaining events to non-exhausted partitions, in round 
robin fashion
-        Set<Integer> allocatedPartitionsThisLoop = new 
HashSet<>(exhaustedPartitions);
-        for (int i = 0; i < ranges.length; i++) {
-          long remainingEvents = actualNumEvents - allocatedEvents;
-          long remainingPartitions = toOffsetMap.size() - 
allocatedPartitionsThisLoop.size();
-          // if need tp split into minPartitions, recalculate the 
remainingPartitions
-          if (needSplitToMinPartitions) {
-            remainingPartitions = minPartitions - finalRanges.size();
-          }
-          long eventsPerPartition = (long) Math.ceil((1.0 * remainingEvents) / 
remainingPartitions);
-
-          OffsetRange range = ranges[i];
-          if (exhaustedPartitions.contains(range.partition())) {
-            continue;
+      long actualNumEvents = Math.min(totalNewMessages(ranges), numEvents);
+      minPartitions = Math.max(minPartitions, toOffsetMap.size());
+      // Each OffsetRange computed will have maximum of eventsPerPartition,
+      // this ensures all ranges are evenly distributed and there's no skew in 
one particular range.
+      long eventsPerPartition = Math.max(1L, actualNumEvents / minPartitions);
+      long allocatedEvents = 0;
+      Map<TopicPartition, List<OffsetRange>> finalRanges = new HashMap<>();
+      // keep going until we have events to allocate.
+      while (allocatedEvents < actualNumEvents) {
+        // Allocate the remaining events in round-robin fashion.
+        for (OffsetRange range : ranges) {
+          // Compute startOffset.
+          long startOffset = range.fromOffset();
+          if (finalRanges.containsKey(range.topicPartition()) && 
!finalRanges.get(range.topicPartition()).isEmpty()) {
+            List<OffsetRange> offsetRangesForPartition = 
finalRanges.get(range.topicPartition());
+            startOffset = 
offsetRangesForPartition.get(offsetRangesForPartition.size() - 1).untilOffset();
           }
-
+          // Compute toOffset.
           long toOffset = -1L;
-          if (range.fromOffset() + eventsPerPartition > range.fromOffset()) {
-            toOffset = Math.min(range.untilOffset(), range.fromOffset() + 
eventsPerPartition);
+          if (startOffset + eventsPerPartition > startOffset) {
+            toOffset = Math.min(range.untilOffset(), startOffset + 
eventsPerPartition);
           } else {
             // handling Long overflow
             toOffset = range.untilOffset();
           }
-          if (toOffset == range.untilOffset()) {
-            exhaustedPartitions.add(range.partition());
-          }
-          // We need recompute toOffset if we have allocatedEvents are more 
than actualNumEvents.
-          long totalAllocatedEvents = allocatedEvents + (toOffset - 
range.fromOffset());
+          // We need recompute toOffset if we allocatedEvents are more than 
actualNumEvents.
+          long totalAllocatedEvents = allocatedEvents + (toOffset - 
startOffset);
           if (totalAllocatedEvents > actualNumEvents) {
             long offsetsToAdd = Math.min(eventsPerPartition, (actualNumEvents 
- allocatedEvents));
-            toOffset = Math.min(range.untilOffset(), range.fromOffset() + 
offsetsToAdd);
+            toOffset = Math.min(range.untilOffset(), startOffset + 
offsetsToAdd);

Review Comment:
   L160 to 164 seems complex. 
   I tried simplifying this entire code block. can you check it out. 
   ```
        // choose the actualNumEvents with min(totalEvents, numEvents)
         long actualNumEvents = Math.min(totalNewMessages(ranges), numEvents);
         minPartitions = Math.max(minPartitions, toOffsetMap.size());
         // Each OffsetRange computed will have maximum of eventsPerPartition,
         // this ensures all ranges are evenly distributed and there's no skew 
in one particular range.
         long eventsPerPartition = Math.max(1L, actualNumEvents / 
minPartitions);
         long allocatedEvents = 0;
         Map<TopicPartition, List<OffsetRange>> finalRanges = new HashMap<>();
         Map<TopicPartition, Long> partitionToAllocatedOffset = new HashMap<>();
         // keep going until we have events to allocate.
         while (allocatedEvents < actualNumEvents) {
           // Allocate the remaining events in round-robin fashion.
           for (OffsetRange range : ranges) {
             // if we have already allocated required no of events, exit
             if (allocatedEvents == actualNumEvents) {
               break;
             }
             // Compute startOffset.
             long startOffset = range.fromOffset();
             if 
(partitionToAllocatedOffset.containsKey(range.topicPartition())) {
               startOffset = 
partitionToAllocatedOffset.get(range.topicPartition());
             }
   
             // for last bucket, we may not have full eventsPerPartition msgs.
             long eventsForThisPartition = Math.min(eventsPerPartition, 
(actualNumEvents - allocatedEvents));
             // Compute toOffset.
             long toOffset = -1L;
             if (startOffset + eventsForThisPartition > startOffset) {
               toOffset = Math.min(range.untilOffset(), startOffset + 
eventsForThisPartition);
             } else {
               // handling Long overflow
               toOffset = range.untilOffset();
             }
             allocatedEvents += toOffset - startOffset;
             OffsetRange thisRange = OffsetRange.create(range.topicPartition(), 
startOffset, toOffset);
             // Add the offsetRange(startOffset,toOffset) to finalRanges.
             if (!finalRanges.containsKey(range.topicPartition())) {
               finalRanges.put(range.topicPartition(), new 
ArrayList<>(Collections.singleton(thisRange)));
               partitionToAllocatedOffset.put(range.topicPartition(), 
thisRange.untilOffset());
             } else if (toOffset > startOffset) {
               finalRanges.get(range.topicPartition()).add(thisRange);
               partitionToAllocatedOffset.put(range.topicPartition(), 
thisRange.untilOffset());
             }
            }
         }
   ```
   
   All the existing tests succeed. and is simpler to read and maintain.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to