This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 63a3e2fe28ead18a388b0873e26a2f8ebc285e69
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Apr 25 21:19:24 2024 -0700

    [HUDI-7511] Fixing offset range calculation for kafka (#10875)
    
    Co-authored-by: Balaji Varadarajan <[email protected]>
---
 .../utilities/sources/helpers/KafkaOffsetGen.java  | 10 +++++++++
 .../sources/helpers/TestKafkaOffsetGen.java        | 26 ++++++++++++++++++++++
 2 files changed, 36 insertions(+)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 71fe7a7629a..6274f838f84 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources.helpers;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.utilities.config.KafkaSourceConfig;
@@ -174,6 +175,15 @@ public class KafkaOffsetGen {
           }
         }
       }
+      // We need to ensure every partition is part of returned offset ranges 
even if we are not consuming any new msgs (for instance, if its already caught 
up).
+      // as this will be tracked as the checkpoint, we need to ensure all 
partitions are part of final ranges.
+      Map<TopicPartition, List<OffsetRange>> missedRanges = 
fromOffsetMap.entrySet().stream()
+              .filter((kv) -> !finalRanges.containsKey(kv.getKey()))
+              .map((kv) -> Pair.of(kv.getKey(), Collections.singletonList(
+                      OffsetRange.create(kv.getKey(), kv.getValue(), 
kv.getValue()))))
+              .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+      finalRanges.putAll(missedRanges);
+
       OffsetRange[] sortedRangeArray = 
finalRanges.values().stream().flatMap(Collection::stream)
           .sorted(SORT_BY_PARTITION).toArray(OffsetRange[]::new);
       if (actualNumEvents == 0) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
index fc3ab90a036..ba85f04ebcb 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
@@ -164,6 +164,32 @@ public class TestKafkaOffsetGen {
     assertEquals(249, nextOffsetRanges[1].fromOffset());
     assertEquals(399, nextOffsetRanges[1].untilOffset());
 
+    // try w/ 1 partition already exhausted. both partitions need to be 
returned as part of offset ranges
+    lastCheckpointString = testTopicName + ",0:400,1:500";
+    kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
+    nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, 
metrics);
+    assertEquals(3, nextOffsetRanges.length);
+    assertEquals(400, nextOffsetRanges[0].fromOffset());
+    assertEquals(450, nextOffsetRanges[0].untilOffset());
+    assertEquals(450, nextOffsetRanges[1].fromOffset());
+    assertEquals(500, nextOffsetRanges[1].untilOffset());
+    assertEquals(0, nextOffsetRanges[1].partition());
+    assertEquals(500, nextOffsetRanges[2].fromOffset());
+    assertEquals(500, nextOffsetRanges[2].untilOffset());
+    assertEquals(1, nextOffsetRanges[2].partition());
+
+    // if there is just 1 msg to consume from just 1 partition.
+    lastCheckpointString = testTopicName + ",0:499,1:500";
+    kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
+    nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, 
metrics);
+    assertEquals(2, nextOffsetRanges.length);
+    assertEquals(499, nextOffsetRanges[0].fromOffset());
+    assertEquals(500, nextOffsetRanges[0].untilOffset());
+    assertEquals(0, nextOffsetRanges[0].partition());
+    assertEquals(500, nextOffsetRanges[1].fromOffset());
+    assertEquals(500, nextOffsetRanges[1].untilOffset());
+    assertEquals(1, nextOffsetRanges[1].partition());
+
     // committed offsets are not present for the consumer group
     kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group", "string"));
     nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, 
metrics);

Reply via email to