xccui commented on code in PR #8374:
URL: https://github.com/apache/hudi/pull/8374#discussion_r1157359620


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java:
##########
@@ -72,6 +76,24 @@ void testWriteAndReadMessage(String uniqueId) {
     assertThat(metadata.getMessages().size(), is(5));
   }
 
+  @Test
+  void testBootstrap() throws Exception {
+    CkpMetadata metadata = getCkpMetadata("");
+    // write 4 instants to the ckp_meta
+    IntStream.range(0, 4).forEach(i -> metadata.startInstant(i + ""));
+    assertThat("The first instant should be removed from the instant cache",
+        metadata.getInstantCache(), is(Arrays.asList("1", "2", "3")));
+
+    // simulate the reboot of coordinator
+    CkpMetadata metadata1 = getCkpMetadata("");
+    metadata1.bootstrap();
+    assertNull(metadata1.getInstantCache(), "The instant cache should be 
recovered from bootstrap");
+
+    metadata1.startInstant("4");
+    assertThat("The first instant should be removed from the instant cache",

Review Comment:
   The message should be fixed



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java:
##########
@@ -203,6 +203,11 @@ public boolean isAborted(String instant) {
     return this.messages.stream().anyMatch(ckpMsg -> 
instant.equals(ckpMsg.getInstant()) && ckpMsg.isAborted());
   }
 
+  @VisibleForTesting
+  public List<String> getInstantCache() {
+    return this.instantCache;

Review Comment:
   I suppose we can reduce an immutable list here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to