This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit fbe4d371632bed012c107ee8aae6042d1845c13e
Author: Vinish Reddy <[email protected]>
AuthorDate: Tue Feb 27 23:12:40 2024 +0530

    [HUDI-7450] Fix offset computation bug when allocedEvents > actualNumEvents 
(#10768)
---
 .../utilities/sources/helpers/KafkaOffsetGen.java  |  6 +++---
 .../sources/helpers/TestCheckpointUtils.java       | 23 ++++++++++++++++++++++
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 32df651d556..9142f9d1968 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -166,12 +166,12 @@ public class KafkaOffsetGen {
           if (toOffset == range.untilOffset()) {
             exhaustedPartitions.add(range.partition());
           }
-          allocedEvents += toOffset - range.fromOffset();
           // We need recompute toOffset if allocedEvents larger than 
actualNumEvents.
-          if (allocedEvents > actualNumEvents) {
+          if (allocedEvents + (toOffset - range.fromOffset()) > 
actualNumEvents) {
             long offsetsToAdd = Math.min(eventsPerPartition, (actualNumEvents 
- allocedEvents));
-            toOffset = Math.min(range.untilOffset(), toOffset + offsetsToAdd);
+            toOffset = Math.min(range.untilOffset(), range.fromOffset() + 
offsetsToAdd);
           }
+          allocedEvents = allocedEvents + (toOffset - range.fromOffset());
           OffsetRange thisRange = OffsetRange.create(range.topicPartition(), 
range.fromOffset(), toOffset);
           finalRanges.add(thisRange);
           ranges[i] = OffsetRange.create(range.topicPartition(), 
range.fromOffset() + thisRange.count(), range.untilOffset());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java
index 49e27d0191b..9c7b764c267 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java
@@ -245,6 +245,29 @@ public class TestCheckpointUtils {
     assertEquals(300, mergedRanges[0].untilOffset());
   }
 
+  @Test
+  public void testNumAllocatedEventsGreaterThanNumActualEvents() {
+    int[] partitions = new int[] {0, 1, 2, 3, 4};
+    long[] committedOffsets =
+        new long[] {76888767, 76725043, 76899767, 76833267, 76952055};
+    long[] latestOffsets =
+        new long[] {77005407, 76768151, 76985456, 76917973, 77080447};
+    OffsetRange[] ranges =
+        KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(
+            makeOffsetMap(partitions, committedOffsets),
+            makeOffsetMap(partitions, latestOffsets),
+            400000,
+            20);
+
+    long totalNewMsgs = 
KafkaOffsetGen.CheckpointUtils.totalNewMessages(ranges);
+    assertEquals(400000, totalNewMsgs);
+    for (OffsetRange range : ranges) {
+      if (range.fromOffset() > range.untilOffset()) {
+        throw new IllegalArgumentException("Invalid offset range " + range);
+      }
+    }
+  }
+
   private static Map<TopicPartition, Long> makeOffsetMap(int[] partitions, 
long[] offsets) {
     Map<TopicPartition, Long> map = new HashMap<>();
     for (int i = 0; i < partitions.length; i++) {

Reply via email to