This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 63a3e2fe28ead18a388b0873e26a2f8ebc285e69 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Thu Apr 25 21:19:24 2024 -0700 [HUDI-7511] Fixing offset range calculation for kafka (#10875) Co-authored-by: Balaji Varadarajan <[email protected]> --- .../utilities/sources/helpers/KafkaOffsetGen.java | 10 +++++++++ .../sources/helpers/TestKafkaOffsetGen.java | 26 ++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 71fe7a7629a..6274f838f84 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources.helpers; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.utilities.config.KafkaSourceConfig; @@ -174,6 +175,15 @@ public class KafkaOffsetGen { } } } + // We need to ensure every partition is part of returned offset ranges even if we are not consuming any new msgs (for instance, if its already caught up). + // as this will be tracked as the checkpoint, we need to ensure all partitions are part of final ranges. + Map<TopicPartition, List<OffsetRange>> missedRanges = fromOffsetMap.entrySet().stream() + .filter((kv) -> !finalRanges.containsKey(kv.getKey())) + .map((kv) -> Pair.of(kv.getKey(), Collections.singletonList( + OffsetRange.create(kv.getKey(), kv.getValue(), kv.getValue())))) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + finalRanges.putAll(missedRanges); + OffsetRange[] sortedRangeArray = finalRanges.values().stream().flatMap(Collection::stream) .sorted(SORT_BY_PARTITION).toArray(OffsetRange[]::new); if (actualNumEvents == 0) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java index fc3ab90a036..ba85f04ebcb 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java @@ -164,6 +164,32 @@ public class TestKafkaOffsetGen { assertEquals(249, nextOffsetRanges[1].fromOffset()); assertEquals(399, nextOffsetRanges[1].untilOffset()); + // try w/ 1 partition already exhausted. both partitions need to be returned as part of offset ranges + lastCheckpointString = testTopicName + ",0:400,1:500"; + kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString); + nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics); + assertEquals(3, nextOffsetRanges.length); + assertEquals(400, nextOffsetRanges[0].fromOffset()); + assertEquals(450, nextOffsetRanges[0].untilOffset()); + assertEquals(450, nextOffsetRanges[1].fromOffset()); + assertEquals(500, nextOffsetRanges[1].untilOffset()); + assertEquals(0, nextOffsetRanges[1].partition()); + assertEquals(500, nextOffsetRanges[2].fromOffset()); + assertEquals(500, nextOffsetRanges[2].untilOffset()); + assertEquals(1, nextOffsetRanges[2].partition()); + + // if there is just 1 msg to consume from just 1 partition. + lastCheckpointString = testTopicName + ",0:499,1:500"; + kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString); + nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics); + assertEquals(2, nextOffsetRanges.length); + assertEquals(499, nextOffsetRanges[0].fromOffset()); + assertEquals(500, nextOffsetRanges[0].untilOffset()); + assertEquals(0, nextOffsetRanges[0].partition()); + assertEquals(500, nextOffsetRanges[1].fromOffset()); + assertEquals(500, nextOffsetRanges[1].untilOffset()); + assertEquals(1, nextOffsetRanges[1].partition()); + // committed offsets are not present for the consumer group kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group", "string")); nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics);
