This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ee0bcc417be [HUDI-7597] Add logs of Kafka offsets when the checkpoint
is out of bound (#10987)
ee0bcc417be is described below
commit ee0bcc417beb076d0be009d0d57c9575da4caacd
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Apr 10 03:03:45 2024 -0700
[HUDI-7597] Add logs of Kafka offsets when the checkpoint is out of bound
(#10987)
* [HUDI-7597] Add logs of Kafka offsets when the checkpoint is out of bound
* Adjust test
---
.../utilities/sources/helpers/KafkaOffsetGen.java | 29 +++++++++++++++-------
.../utilities/sources/BaseTestKafkaSource.java | 16 ++++++------
2 files changed, 27 insertions(+), 18 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 442046cd948..71fe7a7629a 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -331,24 +331,35 @@ public class KafkaOffsetGen {
/**
* Fetch checkpoint offsets for each partition.
- * @param consumer instance of {@link KafkaConsumer} to fetch offsets from.
+ *
+ * @param consumer instance of {@link KafkaConsumer} to fetch
offsets from.
* @param lastCheckpointStr last checkpoint string.
- * @param topicPartitions set of topic partitions.
+ * @param topicPartitions set of topic partitions.
* @return a map of Topic partitions to offsets.
*/
private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer,
- Option<String>
lastCheckpointStr, Set<TopicPartition> topicPartitions) {
+ Option<String>
lastCheckpointStr, Set<TopicPartition> topicPartitions) {
Map<TopicPartition, Long> earliestOffsets =
consumer.beginningOffsets(topicPartitions);
Map<TopicPartition, Long> checkpointOffsets =
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
- boolean isCheckpointOutOfBounds = checkpointOffsets.entrySet().stream()
- .anyMatch(offset -> offset.getValue() <
earliestOffsets.get(offset.getKey()));
+ List<TopicPartition> outOfBoundPartitionList =
checkpointOffsets.entrySet().stream()
+ .filter(offset -> offset.getValue() <
earliestOffsets.get(offset.getKey()))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ boolean isCheckpointOutOfBounds = !outOfBoundPartitionList.isEmpty();
+
if (isCheckpointOutOfBounds) {
+ String outOfBoundOffsets = outOfBoundPartitionList.stream()
+ .map(p -> p.toString() + ":{checkpoint=" + checkpointOffsets.get(p)
+ + ",earliestOffset=" + earliestOffsets.get(p) + "}")
+ .collect(Collectors.joining(","));
+ String message = "Some data may have been lost because they are not
available in Kafka any more;"
+ + " either the data was aged out by Kafka or the topic may have been
deleted before all the data in the topic was processed. "
+ + "Kafka partitions that have out-of-bound checkpoints: " +
outOfBoundOffsets + " .";
+
if (getBooleanWithAltKeys(this.props,
KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS)) {
- throw new HoodieStreamerException("Some data may have been lost
because they are not available in Kafka any more;"
- + " either the data was aged out by Kafka or the topic may have
been deleted before all the data in the topic was processed.");
+ throw new HoodieStreamerException(message);
} else {
- LOG.warn("Some data may have been lost because they are not available
in Kafka any more;"
- + " either the data was aged out by Kafka or the topic may have
been deleted before all the data in the topic was processed."
+ LOG.warn(message
+ " If you want Hudi Streamer to fail on such cases, set \"" +
KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS.key() + "\" to \"true\".");
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index c5fc7bfaafa..e45d10e7a61 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -53,6 +53,7 @@ import static
org.apache.hudi.utilities.config.KafkaSourceConfig.ENABLE_KAFKA_CO
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -254,7 +255,7 @@ abstract class BaseTestKafkaSource extends
SparkClientFunctionalTestHarness {
final String topic = TEST_TOPIC_PREFIX + "testFailOnDataLoss";
Properties topicConfig = new Properties();
topicConfig.setProperty("retention.ms", "8000");
- testUtils.createTopic(topic, 1, topicConfig);
+ testUtils.createTopic(topic, 2, topicConfig);
TypedProperties failOnDataLossProps = createPropsForKafkaSource(topic,
null, "earliest");
failOnDataLossProps.setProperty(KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS.key(),
Boolean.toString(true));
@@ -269,17 +270,14 @@ abstract class BaseTestKafkaSource extends
SparkClientFunctionalTestHarness {
Throwable t = assertThrows(HoodieStreamerException.class, () -> {
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()),
Long.MAX_VALUE);
});
- assertEquals(
- "Some data may have been lost because they are not available in Kafka
any more;"
- + " either the data was aged out by Kafka or the topic may have
been deleted before all the data in the topic was processed.",
- t.getMessage());
+ String errorMessagePrefix = "Some data may have been lost because they are
not available in Kafka any more;"
+ + " either the data was aged out by Kafka or the topic may have been
deleted before all the data in the topic was processed. "
+ + "Kafka partitions that have out-of-bound checkpoints:";
+ assertTrue(t.getMessage().startsWith(errorMessagePrefix));
t = assertThrows(HoodieStreamerException.class, () -> {
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()),
Long.MAX_VALUE);
});
- assertEquals(
- "Some data may have been lost because they are not available in Kafka
any more;"
- + " either the data was aged out by Kafka or the topic may have
been deleted before all the data in the topic was processed.",
- t.getMessage());
+ assertTrue(t.getMessage().startsWith(errorMessagePrefix));
}
@Test