This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 51d5346d3db3b57f531edcbc366c4cca39b064e3 Author: Y Ethan Guo <[email protected]> AuthorDate: Wed Apr 10 03:03:45 2024 -0700 [HUDI-7597] Add logs of Kafka offsets when the checkpoint is out of bound (#10987) * [HUDI-7597] Add logs of Kafka offsets when the checkpoint is out of bound * Adjust test --- .../utilities/sources/helpers/KafkaOffsetGen.java | 29 +++++++++++++++------- .../utilities/sources/BaseTestKafkaSource.java | 16 ++++++------ 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 442046cd948..71fe7a7629a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -331,24 +331,35 @@ public class KafkaOffsetGen { /** * Fetch checkpoint offsets for each partition. - * @param consumer instance of {@link KafkaConsumer} to fetch offsets from. + * + * @param consumer instance of {@link KafkaConsumer} to fetch offsets from. * @param lastCheckpointStr last checkpoint string. - * @param topicPartitions set of topic partitions. + * @param topicPartitions set of topic partitions. * @return a map of Topic partitions to offsets. */ private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer, - Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) { + Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) { Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions); Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); - boolean isCheckpointOutOfBounds = checkpointOffsets.entrySet().stream() - .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey())); + List<TopicPartition> outOfBoundPartitionList = checkpointOffsets.entrySet().stream() + .filter(offset -> offset.getValue() < earliestOffsets.get(offset.getKey())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + boolean isCheckpointOutOfBounds = !outOfBoundPartitionList.isEmpty(); + if (isCheckpointOutOfBounds) { + String outOfBoundOffsets = outOfBoundPartitionList.stream() + .map(p -> p.toString() + ":{checkpoint=" + checkpointOffsets.get(p) + + ",earliestOffset=" + earliestOffsets.get(p) + "}") + .collect(Collectors.joining(",")); + String message = "Some data may have been lost because they are not available in Kafka any more;" + + " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. " + + "Kafka partitions that have out-of-bound checkpoints: " + outOfBoundOffsets + " ."; + if (getBooleanWithAltKeys(this.props, KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS)) { - throw new HoodieStreamerException("Some data may have been lost because they are not available in Kafka any more;" - + " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed."); + throw new HoodieStreamerException(message); } else { - LOG.warn("Some data may have been lost because they are not available in Kafka any more;" - + " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed." + LOG.warn(message + " If you want Hudi Streamer to fail on such cases, set \"" + KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS.key() + "\" to \"true\"."); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java index c5fc7bfaafa..e45d10e7a61 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java @@ -53,6 +53,7 @@ import static org.apache.hudi.utilities.config.KafkaSourceConfig.ENABLE_KAFKA_CO import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -254,7 +255,7 @@ abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness { final String topic = TEST_TOPIC_PREFIX + "testFailOnDataLoss"; Properties topicConfig = new Properties(); topicConfig.setProperty("retention.ms", "8000"); - testUtils.createTopic(topic, 1, topicConfig); + testUtils.createTopic(topic, 2, topicConfig); TypedProperties failOnDataLossProps = createPropsForKafkaSource(topic, null, "earliest"); failOnDataLossProps.setProperty(KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS.key(), Boolean.toString(true)); @@ -269,17 +270,14 @@ abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness { Throwable t = assertThrows(HoodieStreamerException.class, () -> { kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); }); - assertEquals( - "Some data may have been lost because they are not available in Kafka any more;" - + " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.", - t.getMessage()); + String errorMessagePrefix = "Some data may have been lost because they are not available in Kafka any more;" + + " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. " + + "Kafka partitions that have out-of-bound checkpoints:"; + assertTrue(t.getMessage().startsWith(errorMessagePrefix)); t = assertThrows(HoodieStreamerException.class, () -> { kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); }); - assertEquals( - "Some data may have been lost because they are not available in Kafka any more;" - + " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.", - t.getMessage()); + assertTrue(t.getMessage().startsWith(errorMessagePrefix)); } @Test
