This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 56c4937764461d3506fb7eb8555948a0acedca20
Author: Sampan S Nayak <[email protected]>
AuthorDate: Sat Mar 2 01:38:26 2024 +0530

    [HUDI-7462] Refactor checkTopicCheckpoint in KafkaOffsetGen for reusability 
(#10794)
---
 .../utilities/sources/helpers/KafkaOffsetGen.java  | 23 +++++++++++-----------
 1 file changed, 11 insertions(+), 12 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 57f5d38dd7c..9b1f8674ca8 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -59,6 +59,7 @@ import static 
org.apache.hudi.common.util.ConfigUtils.checkRequiredProperties;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
+import static 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils.checkTopicCheckpoint;
 
 /**
  * Source to read data from Kafka, incrementally.
@@ -68,16 +69,14 @@ public class KafkaOffsetGen {
   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetGen.class);
   private static final String METRIC_NAME_KAFKA_DELAY_COUNT = 
"kafkaDelayCount";
   private static final Comparator<OffsetRange> SORT_BY_PARTITION = 
Comparator.comparing(OffsetRange::partition);
-
   public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp";
 
-  /**
-   * kafka checkpoint Pattern.
-   * Format: topic_name,partition_num:offset,partition_num:offset,....
-   */
-  private final Pattern pattern = Pattern.compile(".*,.*:.*");
-
   public static class CheckpointUtils {
+    /**
+     * kafka checkpoint Pattern.
+     * Format: topic_name,partition_num:offset,partition_num:offset,....
+     */
+    private static final Pattern PATTERN = Pattern.compile(".*,.*:.*");
 
     /**
      * Reconstruct checkpoint from timeline.
@@ -210,6 +209,11 @@ public class KafkaOffsetGen {
     public static long totalNewMessages(OffsetRange[] ranges) {
       return Arrays.stream(ranges).mapToLong(OffsetRange::count).sum();
     }
+
+    public static boolean checkTopicCheckpoint(Option<String> 
lastCheckpointStr) {
+      Matcher matcher = PATTERN.matcher(lastCheckpointStr.get());
+      return matcher.matches();
+    }
   }
 
   private final Map<String, Object> kafkaParams;
@@ -425,11 +429,6 @@ public class KafkaOffsetGen {
     return result.containsKey(topicName);
   }
 
-  private boolean checkTopicCheckpoint(Option<String> lastCheckpointStr) {
-    Matcher matcher = pattern.matcher(lastCheckpointStr.get());
-    return matcher.matches();
-  }
-
   public String getTopicName() {
     return topicName;
   }

Reply via email to