codope commented on code in PR #10987:
URL: https://github.com/apache/hudi/pull/10987#discussion_r1558838076
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java:
##########
@@ -331,24 +331,35 @@ private List<PartitionInfo>
fetchPartitionInfos(KafkaConsumer consumer, String t
/**
* Fetch checkpoint offsets for each partition.
- * @param consumer instance of {@link KafkaConsumer} to fetch offsets from.
+ *
+ * @param consumer instance of {@link KafkaConsumer} to fetch
offsets from.
* @param lastCheckpointStr last checkpoint string.
- * @param topicPartitions set of topic partitions.
+ * @param topicPartitions set of topic partitions.
* @return a map of Topic partitions to offsets.
*/
private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer,
- Option<String>
lastCheckpointStr, Set<TopicPartition> topicPartitions) {
+ Option<String>
lastCheckpointStr, Set<TopicPartition> topicPartitions) {
Map<TopicPartition, Long> earliestOffsets =
consumer.beginningOffsets(topicPartitions);
Map<TopicPartition, Long> checkpointOffsets =
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
- boolean isCheckpointOutOfBounds = checkpointOffsets.entrySet().stream()
- .anyMatch(offset -> offset.getValue() <
earliestOffsets.get(offset.getKey()));
+ List<TopicPartition> outOfBoundPartitionList =
checkpointOffsets.entrySet().stream()
+ .filter(offset -> offset.getValue() <
earliestOffsets.get(offset.getKey()))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ boolean isCheckpointOutOfBounds = !outOfBoundPartitionList.isEmpty();
+
if (isCheckpointOutOfBounds) {
+ String outOfBoundOffsets = outOfBoundPartitionList.stream()
+ .map(p -> p.toString() + ":{checkpoint=" + checkpointOffsets.get(p)
+ + ",earliestOffset=" + earliestOffsets.get(p) + "}")
+ .collect(Collectors.joining(","));
+ String message = "Some data may have been lost because they are not
available in Kafka any more;"
+ + " either the data was aged out by Kafka or the topic may have been
deleted before all the data in the topic was processed. "
+ + "Kafka partitions that have out-of-bound checkpoints: " +
outOfBoundOffsets + " .";
+
Review Comment:
Can we make the error message less verbose and extract to constant?
Something like a static error code KAFKA_OFFSET_OUT_OF_BOUND_CHECKPOINT:
outOfBoundOffsets? And just assert the presence of this string literal in error
message in test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]