This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f19f6e7f54 [minor] Refactor the code for CkpMetadata (#7166)
f19f6e7f54 is described below

commit f19f6e7f544f3eb4fdf71ad292743ef4a79483ef
Author: Danny Chan <[email protected]>
AuthorDate: Wed Nov 9 17:36:13 2022 +0800

    [minor] Refactor the code for CkpMetadata (#7166)
---
 .../main/java/org/apache/hudi/sink/meta/CkpMetadata.java    | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index c20b263fa3..6d0174069f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -61,7 +61,9 @@ public class CkpMetadata implements Serializable {
 
   private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class);
 
-  protected static final int MAX_RETAIN_CKP_NUM = 3;
+  // 1 is actually enough for fetching the latest pending instant,
+  // keep 3 instants here for purpose of debugging.
+  private static final int MAX_RETAIN_CKP_NUM = 3;
 
   // the ckp metadata directory
   private static final String CKP_META = "ckp_meta";
@@ -106,15 +108,20 @@ public class CkpMetadata implements Serializable {
     } catch (IOException e) {
       throw new HoodieException("Exception while adding checkpoint start 
metadata for instant: " + instant, e);
     }
+    // cache the instant
+    cache(instant);
     // cleaning
-    clean(instant);
+    clean();
   }
 
-  private void clean(String newInstant) {
+  private void cache(String newInstant) {
     if (this.instantCache == null) {
       this.instantCache = new ArrayList<>();
     }
     this.instantCache.add(newInstant);
+  }
+
+  private void clean() {
     if (instantCache.size() > MAX_RETAIN_CKP_NUM) {
       final String instant = instantCache.get(0);
       boolean[] error = new boolean[1];

Reply via email to