This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.12.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ff52597a078c7b6302bc6fb4a2b6b11f4b3bea61
Author: Danny Chan <[email protected]>
AuthorDate: Wed Apr 5 09:32:46 2023 +0800

    [HUDI-6030] Cleans the ckp meta while the JM restarts (#8374)
    
    We received several bug reports since #7620, for example: #8060, this patch 
revert the changes of CkpMetadata and always report the write metadata events 
for write task, the coordinator would decide whether to re-commit these 
metadata stats.
---
 .../sink/common/AbstractStreamWriteFunction.java   |  7 ++++--
 .../org/apache/hudi/sink/meta/CkpMetadata.java     | 13 +++++++----
 .../org/apache/hudi/sink/meta/TestCkpMetadata.java | 27 ++++++++++++++++++++++
 3 files changed, 41 insertions(+), 6 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index fa4c3db86ea..04cab0b1eff 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.common;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
@@ -46,7 +47,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
 
 /**
  * Base infrastructures for streaming writer function.
@@ -194,8 +194,11 @@ public abstract class AbstractStreamWriteFunction<I>
 
   private void restoreWriteMetadata() throws Exception {
     boolean eventSent = false;
+    HoodieTimeline pendingTimeline = 
this.metaClient.getActiveTimeline().filterPendingExcludingCompaction();
     for (WriteMetadataEvent event : this.writeMetadataState.get()) {
-      if (Objects.equals(this.currentInstant, event.getInstantTime())) {
+      // Must filter out the completed instants in case it is a partial 
failover,
+      // the write status should not be accumulated in such case.
+      if (pendingTimeline.containsInstant(event.getInstantTime())) {
         // Reset taskID for event
         event.setTaskID(taskID);
         // The checkpoint succeed but the meta does not commit,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index 155f0e6905e..200577428fc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.meta;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.exception.HoodieException;
@@ -90,14 +91,13 @@ public class CkpMetadata implements Serializable {
   // -------------------------------------------------------------------------
 
   /**
-   * Initialize the message bus, would keep all the messages.
+   * Initialize the message bus, would clean all the messages
    *
    * <p>This expects to be called by the driver.
    */
   public void bootstrap() throws IOException {
-    if (!fs.exists(path)) {
-      fs.mkdirs(path);
-    }
+    fs.delete(path, true);
+    fs.mkdirs(path);
   }
 
   public void startInstant(String instant) {
@@ -194,6 +194,11 @@ public class CkpMetadata implements Serializable {
     return this.messages.stream().anyMatch(ckpMsg -> 
instant.equals(ckpMsg.getInstant()) && ckpMsg.isAborted());
   }
 
+  @VisibleForTesting
+  public List<String> getInstantCache() {
+    return this.instantCache;
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
index fe7ce3f9478..041cd12c97a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
@@ -30,11 +30,14 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.stream.IntStream;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 /**
  * Test cases for {@link CkpMetadata}.
@@ -74,4 +77,28 @@ public class TestCkpMetadata {
     metadata.abortInstant("7");
     assertThat(metadata.getMessages().size(), is(5));
   }
+
+  @Test
+  void testBootstrap() throws Exception {
+    CkpMetadata metadata = getCkpMetadata("");
+    // write 4 instants to the ckp_meta
+    IntStream.range(0, 4).forEach(i -> metadata.startInstant(i + ""));
+    assertThat("The first instant should be removed from the instant cache",
+        metadata.getInstantCache(), is(Arrays.asList("1", "2", "3")));
+
+    // simulate the reboot of coordinator
+    CkpMetadata metadata1 = getCkpMetadata("");
+    metadata1.bootstrap();
+    assertNull(metadata1.getInstantCache(), "The instant cache should be 
recovered from bootstrap");
+
+    metadata1.startInstant("4");
+    assertThat("The first instant should be removed from the instant cache",
+        metadata1.getInstantCache(), is(Collections.singletonList("4")));
+  }
+
+  private CkpMetadata getCkpMetadata(String uniqueId) {
+    String basePath = tempFile.getAbsolutePath();
+    FileSystem fs = FSUtils.getFs(basePath, 
HadoopConfigurations.getHadoopConf(new Configuration()));
+    return CkpMetadata.getInstance(fs, basePath);
+  }
 }

Reply via email to