This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ee0bcc417be [HUDI-7597] Add logs of Kafka offsets when the checkpoint 
is out of bound (#10987)
ee0bcc417be is described below

commit ee0bcc417beb076d0be009d0d57c9575da4caacd
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Apr 10 03:03:45 2024 -0700

    [HUDI-7597] Add logs of Kafka offsets when the checkpoint is out of bound 
(#10987)
    
    * [HUDI-7597] Add logs of Kafka offsets when the checkpoint is out of bound
    
    * Adjust test
---
 .../utilities/sources/helpers/KafkaOffsetGen.java  | 29 +++++++++++++++-------
 .../utilities/sources/BaseTestKafkaSource.java     | 16 ++++++------
 2 files changed, 27 insertions(+), 18 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 442046cd948..71fe7a7629a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -331,24 +331,35 @@ public class KafkaOffsetGen {
 
   /**
    * Fetch checkpoint offsets for each partition.
-   * @param consumer instance of {@link KafkaConsumer} to fetch offsets from.
+   *
+   * @param consumer          instance of {@link KafkaConsumer} to fetch 
offsets from.
    * @param lastCheckpointStr last checkpoint string.
-   * @param topicPartitions set of topic partitions.
+   * @param topicPartitions   set of topic partitions.
    * @return a map of Topic partitions to offsets.
    */
   private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer,
-                                                        Option<String> 
lastCheckpointStr, Set<TopicPartition> topicPartitions) {
+                                                      Option<String> 
lastCheckpointStr, Set<TopicPartition> topicPartitions) {
     Map<TopicPartition, Long> earliestOffsets = 
consumer.beginningOffsets(topicPartitions);
     Map<TopicPartition, Long> checkpointOffsets = 
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
-    boolean isCheckpointOutOfBounds = checkpointOffsets.entrySet().stream()
-        .anyMatch(offset -> offset.getValue() < 
earliestOffsets.get(offset.getKey()));
+    List<TopicPartition> outOfBoundPartitionList = 
checkpointOffsets.entrySet().stream()
+        .filter(offset -> offset.getValue() < 
earliestOffsets.get(offset.getKey()))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+    boolean isCheckpointOutOfBounds = !outOfBoundPartitionList.isEmpty();
+
     if (isCheckpointOutOfBounds) {
+      String outOfBoundOffsets = outOfBoundPartitionList.stream()
+          .map(p -> p.toString() + ":{checkpoint=" + checkpointOffsets.get(p)
+              + ",earliestOffset=" + earliestOffsets.get(p) + "}")
+          .collect(Collectors.joining(","));
+      String message = "Some data may have been lost because they are not 
available in Kafka any more;"
+          + " either the data was aged out by Kafka or the topic may have been 
deleted before all the data in the topic was processed. "
+          + "Kafka partitions that have out-of-bound checkpoints: " + 
outOfBoundOffsets + " .";
+
       if (getBooleanWithAltKeys(this.props, 
KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS)) {
-        throw new HoodieStreamerException("Some data may have been lost 
because they are not available in Kafka any more;"
-            + " either the data was aged out by Kafka or the topic may have 
been deleted before all the data in the topic was processed.");
+        throw new HoodieStreamerException(message);
       } else {
-        LOG.warn("Some data may have been lost because they are not available 
in Kafka any more;"
-            + " either the data was aged out by Kafka or the topic may have 
been deleted before all the data in the topic was processed."
+        LOG.warn(message
             + " If you want Hudi Streamer to fail on such cases, set \"" + 
KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS.key() + "\" to \"true\".");
       }
     }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index c5fc7bfaafa..e45d10e7a61 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -53,6 +53,7 @@ import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.ENABLE_KAFKA_CO
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -254,7 +255,7 @@ abstract class BaseTestKafkaSource extends 
SparkClientFunctionalTestHarness {
     final String topic = TEST_TOPIC_PREFIX + "testFailOnDataLoss";
     Properties topicConfig = new Properties();
     topicConfig.setProperty("retention.ms", "8000");
-    testUtils.createTopic(topic, 1, topicConfig);
+    testUtils.createTopic(topic, 2, topicConfig);
 
     TypedProperties failOnDataLossProps = createPropsForKafkaSource(topic, 
null, "earliest");
     
failOnDataLossProps.setProperty(KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS.key(),
 Boolean.toString(true));
@@ -269,17 +270,14 @@ abstract class BaseTestKafkaSource extends 
SparkClientFunctionalTestHarness {
     Throwable t = assertThrows(HoodieStreamerException.class, () -> {
       
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()),
 Long.MAX_VALUE);
     });
-    assertEquals(
-        "Some data may have been lost because they are not available in Kafka 
any more;"
-            + " either the data was aged out by Kafka or the topic may have 
been deleted before all the data in the topic was processed.",
-        t.getMessage());
+    String errorMessagePrefix = "Some data may have been lost because they are 
not available in Kafka any more;"
+        + " either the data was aged out by Kafka or the topic may have been 
deleted before all the data in the topic was processed. "
+        + "Kafka partitions that have out-of-bound checkpoints:";
+    assertTrue(t.getMessage().startsWith(errorMessagePrefix));
     t = assertThrows(HoodieStreamerException.class, () -> {
       
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()),
 Long.MAX_VALUE);
     });
-    assertEquals(
-        "Some data may have been lost because they are not available in Kafka 
any more;"
-            + " either the data was aged out by Kafka or the topic may have 
been deleted before all the data in the topic was processed.",
-        t.getMessage());
+    assertTrue(t.getMessage().startsWith(errorMessagePrefix));
   }
 
   @Test

Reply via email to