This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 292b1d850dbce784826c8ca1194146206a7965a1 Author: Vinish Reddy <[email protected]> AuthorDate: Wed Feb 28 17:43:35 2024 +0530 [HUDI-7450] Address minor comments on Fix offset computation bug when allocedEvents > actualNumEvents (#10771) * Address comments * Addressed comments --- .../utilities/sources/helpers/KafkaOffsetGen.java | 15 ++++++------ .../sources/helpers/TestCheckpointUtils.java | 27 ++++++++++++++++++++++ 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 9142f9d1968..57f5d38dd7c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -132,18 +132,18 @@ public class KafkaOffsetGen { boolean needSplitToMinPartitions = minPartitions > toOffsetMap.size(); long totalEvents = totalNewMessages(ranges); - long allocedEvents = 0; + long allocatedEvents = 0; Set<Integer> exhaustedPartitions = new HashSet<>(); List<OffsetRange> finalRanges = new ArrayList<>(); // choose the actualNumEvents with min(totalEvents, numEvents) long actualNumEvents = Math.min(totalEvents, numEvents); // keep going until we have events to allocate and partitions still not exhausted. - while (allocedEvents < numEvents && exhaustedPartitions.size() < toOffsetMap.size()) { + while (allocatedEvents < numEvents && exhaustedPartitions.size() < toOffsetMap.size()) { // Allocate the remaining events to non-exhausted partitions, in round robin fashion Set<Integer> allocatedPartitionsThisLoop = new HashSet<>(exhaustedPartitions); for (int i = 0; i < ranges.length; i++) { - long remainingEvents = actualNumEvents - allocedEvents; + long remainingEvents = actualNumEvents - allocatedEvents; long remainingPartitions = toOffsetMap.size() - allocatedPartitionsThisLoop.size(); // if need tp split into minPartitions, recalculate the remainingPartitions if (needSplitToMinPartitions) { @@ -166,12 +166,13 @@ public class KafkaOffsetGen { if (toOffset == range.untilOffset()) { exhaustedPartitions.add(range.partition()); } - // We need recompute toOffset if allocedEvents larger than actualNumEvents. - if (allocedEvents + (toOffset - range.fromOffset()) > actualNumEvents) { - long offsetsToAdd = Math.min(eventsPerPartition, (actualNumEvents - allocedEvents)); + // We need recompute toOffset if we have allocatedEvents are more than actualNumEvents. + long totalAllocatedEvents = allocatedEvents + (toOffset - range.fromOffset()); + if (totalAllocatedEvents > actualNumEvents) { + long offsetsToAdd = Math.min(eventsPerPartition, (actualNumEvents - allocatedEvents)); toOffset = Math.min(range.untilOffset(), range.fromOffset() + offsetsToAdd); } - allocedEvents = allocedEvents + (toOffset - range.fromOffset()); + allocatedEvents += toOffset - range.fromOffset(); OffsetRange thisRange = OffsetRange.create(range.topicPartition(), range.fromOffset(), toOffset); finalRanges.add(thisRange); ranges[i] = OffsetRange.create(range.topicPartition(), range.fromOffset() + thisRange.count(), range.untilOffset()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java index 9c7b764c267..b77fb15803f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -266,6 +267,32 @@ public class TestCheckpointUtils { throw new IllegalArgumentException("Invalid offset range " + range); } } + OffsetRange[] expectedRanges = new OffsetRange[] { + OffsetRange.apply(TEST_TOPIC_NAME, 0, 76888767, 76908767), + OffsetRange.apply(TEST_TOPIC_NAME, 0, 76908767, 76928767), + OffsetRange.apply(TEST_TOPIC_NAME, 0, 76928767, 76948767), + OffsetRange.apply(TEST_TOPIC_NAME, 0, 76948767, 76970879), + OffsetRange.apply(TEST_TOPIC_NAME, 0, 76970879, 76992990), + OffsetRange.apply(TEST_TOPIC_NAME, 1, 76725043, 76745043), + OffsetRange.apply(TEST_TOPIC_NAME, 1, 76745043, 76765043), + OffsetRange.apply(TEST_TOPIC_NAME, 1, 76765043, 76768151), + OffsetRange.apply(TEST_TOPIC_NAME, 2, 76899767, 76919767), + OffsetRange.apply(TEST_TOPIC_NAME, 2, 76919767, 76939767), + OffsetRange.apply(TEST_TOPIC_NAME, 2, 76939767, 76961879), + OffsetRange.apply(TEST_TOPIC_NAME, 2, 76961879, 76983990), + OffsetRange.apply(TEST_TOPIC_NAME, 2, 76983990, 76983990), + OffsetRange.apply(TEST_TOPIC_NAME, 3, 76833267, 76853267), + OffsetRange.apply(TEST_TOPIC_NAME, 3, 76853267, 76873267), + OffsetRange.apply(TEST_TOPIC_NAME, 3, 76873267, 76895379), + OffsetRange.apply(TEST_TOPIC_NAME, 3, 76895379, 76917490), + OffsetRange.apply(TEST_TOPIC_NAME, 3, 76917490, 76917490), + OffsetRange.apply(TEST_TOPIC_NAME, 4, 76952055, 76972055), + OffsetRange.apply(TEST_TOPIC_NAME, 4, 76972055, 76992055), + OffsetRange.apply(TEST_TOPIC_NAME, 4, 76992055, 77014167), + OffsetRange.apply(TEST_TOPIC_NAME, 4, 77014167, 77036278), + OffsetRange.apply(TEST_TOPIC_NAME, 4, 77036278, 77036278), + }; + assertArrayEquals(expectedRanges, ranges); } private static Map<TopicPartition, Long> makeOffsetMap(int[] partitions, long[] offsets) {
