This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 292b1d850dbce784826c8ca1194146206a7965a1
Author: Vinish Reddy <[email protected]>
AuthorDate: Wed Feb 28 17:43:35 2024 +0530

    [HUDI-7450] Address minor comments on Fix offset computation bug when 
allocedEvents > actualNumEvents (#10771)
    
    * Address comments
    
    * Addressed comments
---
 .../utilities/sources/helpers/KafkaOffsetGen.java  | 15 ++++++------
 .../sources/helpers/TestCheckpointUtils.java       | 27 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 7 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 9142f9d1968..57f5d38dd7c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -132,18 +132,18 @@ public class KafkaOffsetGen {
 
       boolean needSplitToMinPartitions = minPartitions > toOffsetMap.size();
       long totalEvents = totalNewMessages(ranges);
-      long allocedEvents = 0;
+      long allocatedEvents = 0;
       Set<Integer> exhaustedPartitions = new HashSet<>();
       List<OffsetRange> finalRanges = new ArrayList<>();
       // choose the actualNumEvents with min(totalEvents, numEvents)
       long actualNumEvents = Math.min(totalEvents, numEvents);
 
       // keep going until we have events to allocate and partitions still not 
exhausted.
-      while (allocedEvents < numEvents && exhaustedPartitions.size() < 
toOffsetMap.size()) {
+      while (allocatedEvents < numEvents && exhaustedPartitions.size() < 
toOffsetMap.size()) {
         // Allocate the remaining events to non-exhausted partitions, in round 
robin fashion
         Set<Integer> allocatedPartitionsThisLoop = new 
HashSet<>(exhaustedPartitions);
         for (int i = 0; i < ranges.length; i++) {
-          long remainingEvents = actualNumEvents - allocedEvents;
+          long remainingEvents = actualNumEvents - allocatedEvents;
           long remainingPartitions = toOffsetMap.size() - 
allocatedPartitionsThisLoop.size();
           // if need tp split into minPartitions, recalculate the 
remainingPartitions
           if (needSplitToMinPartitions) {
@@ -166,12 +166,13 @@ public class KafkaOffsetGen {
           if (toOffset == range.untilOffset()) {
             exhaustedPartitions.add(range.partition());
           }
-          // We need recompute toOffset if allocedEvents larger than 
actualNumEvents.
-          if (allocedEvents + (toOffset - range.fromOffset()) > 
actualNumEvents) {
-            long offsetsToAdd = Math.min(eventsPerPartition, (actualNumEvents 
- allocedEvents));
+          // We need recompute toOffset if we have allocatedEvents are more 
than actualNumEvents.
+          long totalAllocatedEvents = allocatedEvents + (toOffset - 
range.fromOffset());
+          if (totalAllocatedEvents > actualNumEvents) {
+            long offsetsToAdd = Math.min(eventsPerPartition, (actualNumEvents 
- allocatedEvents));
             toOffset = Math.min(range.untilOffset(), range.fromOffset() + 
offsetsToAdd);
           }
-          allocedEvents = allocedEvents + (toOffset - range.fromOffset());
+          allocatedEvents += toOffset - range.fromOffset();
           OffsetRange thisRange = OffsetRange.create(range.topicPartition(), 
range.fromOffset(), toOffset);
           finalRanges.add(thisRange);
           ranges[i] = OffsetRange.create(range.topicPartition(), 
range.fromOffset() + thisRange.count(), range.untilOffset());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java
index 9c7b764c267..b77fb15803f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
@@ -266,6 +267,32 @@ public class TestCheckpointUtils {
         throw new IllegalArgumentException("Invalid offset range " + range);
       }
     }
+    OffsetRange[] expectedRanges = new OffsetRange[] {
+        OffsetRange.apply(TEST_TOPIC_NAME, 0, 76888767, 76908767),
+        OffsetRange.apply(TEST_TOPIC_NAME, 0, 76908767, 76928767),
+        OffsetRange.apply(TEST_TOPIC_NAME, 0, 76928767, 76948767),
+        OffsetRange.apply(TEST_TOPIC_NAME, 0, 76948767, 76970879),
+        OffsetRange.apply(TEST_TOPIC_NAME, 0, 76970879, 76992990),
+        OffsetRange.apply(TEST_TOPIC_NAME, 1, 76725043, 76745043),
+        OffsetRange.apply(TEST_TOPIC_NAME, 1, 76745043, 76765043),
+        OffsetRange.apply(TEST_TOPIC_NAME, 1, 76765043, 76768151),
+        OffsetRange.apply(TEST_TOPIC_NAME, 2, 76899767, 76919767),
+        OffsetRange.apply(TEST_TOPIC_NAME, 2, 76919767, 76939767),
+        OffsetRange.apply(TEST_TOPIC_NAME, 2, 76939767, 76961879),
+        OffsetRange.apply(TEST_TOPIC_NAME, 2, 76961879, 76983990),
+        OffsetRange.apply(TEST_TOPIC_NAME, 2, 76983990, 76983990),
+        OffsetRange.apply(TEST_TOPIC_NAME, 3, 76833267, 76853267),
+        OffsetRange.apply(TEST_TOPIC_NAME, 3, 76853267, 76873267),
+        OffsetRange.apply(TEST_TOPIC_NAME, 3, 76873267, 76895379),
+        OffsetRange.apply(TEST_TOPIC_NAME, 3, 76895379, 76917490),
+        OffsetRange.apply(TEST_TOPIC_NAME, 3, 76917490, 76917490),
+        OffsetRange.apply(TEST_TOPIC_NAME, 4, 76952055, 76972055),
+        OffsetRange.apply(TEST_TOPIC_NAME, 4, 76972055, 76992055),
+        OffsetRange.apply(TEST_TOPIC_NAME, 4, 76992055, 77014167),
+        OffsetRange.apply(TEST_TOPIC_NAME, 4, 77014167, 77036278),
+        OffsetRange.apply(TEST_TOPIC_NAME, 4, 77036278, 77036278),
+    };
+    assertArrayEquals(expectedRanges, ranges);
   }
 
   private static Map<TopicPartition, Long> makeOffsetMap(int[] partitions, 
long[] offsets) {

Reply via email to