This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit fbe4d371632bed012c107ee8aae6042d1845c13e Author: Vinish Reddy <[email protected]> AuthorDate: Tue Feb 27 23:12:40 2024 +0530 [HUDI-7450] Fix offset computation bug when allocedEvents > actualNumEvents (#10768) --- .../utilities/sources/helpers/KafkaOffsetGen.java | 6 +++--- .../sources/helpers/TestCheckpointUtils.java | 23 ++++++++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 32df651d556..9142f9d1968 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -166,12 +166,12 @@ public class KafkaOffsetGen { if (toOffset == range.untilOffset()) { exhaustedPartitions.add(range.partition()); } - allocedEvents += toOffset - range.fromOffset(); // We need recompute toOffset if allocedEvents larger than actualNumEvents. - if (allocedEvents > actualNumEvents) { + if (allocedEvents + (toOffset - range.fromOffset()) > actualNumEvents) { long offsetsToAdd = Math.min(eventsPerPartition, (actualNumEvents - allocedEvents)); - toOffset = Math.min(range.untilOffset(), toOffset + offsetsToAdd); + toOffset = Math.min(range.untilOffset(), range.fromOffset() + offsetsToAdd); } + allocedEvents = allocedEvents + (toOffset - range.fromOffset()); OffsetRange thisRange = OffsetRange.create(range.topicPartition(), range.fromOffset(), toOffset); finalRanges.add(thisRange); ranges[i] = OffsetRange.create(range.topicPartition(), range.fromOffset() + thisRange.count(), range.untilOffset()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java index 49e27d0191b..9c7b764c267 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java @@ -245,6 +245,29 @@ public class TestCheckpointUtils { assertEquals(300, mergedRanges[0].untilOffset()); } + @Test + public void testNumAllocatedEventsGreaterThanNumActualEvents() { + int[] partitions = new int[] {0, 1, 2, 3, 4}; + long[] committedOffsets = + new long[] {76888767, 76725043, 76899767, 76833267, 76952055}; + long[] latestOffsets = + new long[] {77005407, 76768151, 76985456, 76917973, 77080447}; + OffsetRange[] ranges = + KafkaOffsetGen.CheckpointUtils.computeOffsetRanges( + makeOffsetMap(partitions, committedOffsets), + makeOffsetMap(partitions, latestOffsets), + 400000, + 20); + + long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(ranges); + assertEquals(400000, totalNewMsgs); + for (OffsetRange range : ranges) { + if (range.fromOffset() > range.untilOffset()) { + throw new IllegalArgumentException("Invalid offset range " + range); + } + } + } + private static Map<TopicPartition, Long> makeOffsetMap(int[] partitions, long[] offsets) { Map<TopicPartition, Long> map = new HashMap<>(); for (int i = 0; i < partitions.length; i++) {
