This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 56c4937764461d3506fb7eb8555948a0acedca20 Author: Sampan S Nayak <[email protected]> AuthorDate: Sat Mar 2 01:38:26 2024 +0530 [HUDI-7462] Refactor checkTopicCheckpoint in KafkaOffsetGen for reusability (#10794) --- .../utilities/sources/helpers/KafkaOffsetGen.java | 23 +++++++++++----------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 57f5d38dd7c..9b1f8674ca8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -59,6 +59,7 @@ import static org.apache.hudi.common.util.ConfigUtils.checkRequiredProperties; import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; +import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils.checkTopicCheckpoint; /** * Source to read data from Kafka, incrementally. @@ -68,16 +69,14 @@ public class KafkaOffsetGen { private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetGen.class); private static final String METRIC_NAME_KAFKA_DELAY_COUNT = "kafkaDelayCount"; private static final Comparator<OffsetRange> SORT_BY_PARTITION = Comparator.comparing(OffsetRange::partition); - public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp"; - /** - * kafka checkpoint Pattern. - * Format: topic_name,partition_num:offset,partition_num:offset,.... - */ - private final Pattern pattern = Pattern.compile(".*,.*:.*"); - public static class CheckpointUtils { + /** + * kafka checkpoint Pattern. + * Format: topic_name,partition_num:offset,partition_num:offset,.... + */ + private static final Pattern PATTERN = Pattern.compile(".*,.*:.*"); /** * Reconstruct checkpoint from timeline. @@ -210,6 +209,11 @@ public class KafkaOffsetGen { public static long totalNewMessages(OffsetRange[] ranges) { return Arrays.stream(ranges).mapToLong(OffsetRange::count).sum(); } + + public static boolean checkTopicCheckpoint(Option<String> lastCheckpointStr) { + Matcher matcher = PATTERN.matcher(lastCheckpointStr.get()); + return matcher.matches(); + } } private final Map<String, Object> kafkaParams; @@ -425,11 +429,6 @@ public class KafkaOffsetGen { return result.containsKey(topicName); } - private boolean checkTopicCheckpoint(Option<String> lastCheckpointStr) { - Matcher matcher = pattern.matcher(lastCheckpointStr.get()); - return matcher.matches(); - } - public String getTopicName() { return topicName; }
