This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f19f6e7f54 [minor] Refactor the code for CkpMetadata (#7166)
f19f6e7f54 is described below
commit f19f6e7f544f3eb4fdf71ad292743ef4a79483ef
Author: Danny Chan <[email protected]>
AuthorDate: Wed Nov 9 17:36:13 2022 +0800
[minor] Refactor the code for CkpMetadata (#7166)
---
.../main/java/org/apache/hudi/sink/meta/CkpMetadata.java | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index c20b263fa3..6d0174069f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -61,7 +61,9 @@ public class CkpMetadata implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class);
- protected static final int MAX_RETAIN_CKP_NUM = 3;
+ // 1 is actually enough for fetching the latest pending instant,
+ // keep 3 instants here for purpose of debugging.
+ private static final int MAX_RETAIN_CKP_NUM = 3;
// the ckp metadata directory
private static final String CKP_META = "ckp_meta";
@@ -106,15 +108,20 @@ public class CkpMetadata implements Serializable {
} catch (IOException e) {
throw new HoodieException("Exception while adding checkpoint start
metadata for instant: " + instant, e);
}
+ // cache the instant
+ cache(instant);
// cleaning
- clean(instant);
+ clean();
}
- private void clean(String newInstant) {
+ private void cache(String newInstant) {
if (this.instantCache == null) {
this.instantCache = new ArrayList<>();
}
this.instantCache.add(newInstant);
+ }
+
+ private void clean() {
if (instantCache.size() > MAX_RETAIN_CKP_NUM) {
final String instant = instantCache.get(0);
boolean[] error = new boolean[1];