This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d3f957755a [HUDI-5223] Partial failover for flink (#7208)
d3f957755a is described below

commit d3f957755abf76c64ff06fac6d857cba9bdbbacf
Author: Danny Chan <[email protected]>
AuthorDate: Wed Nov 16 14:47:38 2022 +0800

    [HUDI-5223] Partial failover for flink (#7208)
    
    Before the patch, when there are partial failover within the write tasks, 
the write task current instant was initialized as the latest inflight instant, 
the write task then waits for a new instant to write with so hangs and failover 
continuously.
    
    For a task recovered from failover (with attempt number greater than 0), 
the latest inflight instant can actually be reused, the intermediate data files 
can be cleaned with MARGER files post commit.
---
 .../src/main/java/org/apache/hudi/io/FlinkMergeHandle.java   |  8 +-------
 .../apache/hudi/sink/common/AbstractStreamWriteFunction.java | 12 ++++++++++--
 2 files changed, 11 insertions(+), 9 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index 69121a9a04..a44783f99e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -143,13 +143,7 @@ public class FlinkMergeHandle<T extends 
HoodieRecordPayload, I, K, O>
           break;
         }
 
-        // Override the old file name,
-        // In rare cases, when a checkpoint was aborted and the instant time
-        // is reused, the merge handle generates a new file name
-        // with the reused instant time of last checkpoint, which is duplicate,
-        // use the same name file as new base file in case data loss.
-        oldFilePath = newFilePath;
-        rolloverPaths.add(oldFilePath);
+        rolloverPaths.add(newFilePath);
         newFileName = newFileNameWithRollover(rollNumber++);
         newFilePath = makeNewFilePath(partitionPath, newFileName);
         LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath 
+ ", rolls over to new path: " + newFilePath);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index 674cd3588a..1f23946184 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -192,10 +192,9 @@ public abstract class AbstractStreamWriteFunction<I>
   // -------------------------------------------------------------------------
 
   private void restoreWriteMetadata() throws Exception {
-    String lastInflight = lastPendingInstant();
     boolean eventSent = false;
     for (WriteMetadataEvent event : this.writeMetadataState.get()) {
-      if (Objects.equals(lastInflight, event.getInstantTime())) {
+      if (Objects.equals(this.currentInstant, event.getInstantTime())) {
         // Reset taskID for event
         event.setTaskID(taskID);
         // The checkpoint succeed but the meta does not commit,
@@ -211,6 +210,15 @@ public abstract class AbstractStreamWriteFunction<I>
   }
 
   private void sendBootstrapEvent() {
+    int attemptId = getRuntimeContext().getAttemptNumber();
+    if (attemptId > 0) {
+      // either a partial or global failover, reuses the current inflight 
instant
+      if (this.currentInstant != null) {
+        LOG.info("Recover task[{}] for instant [{}] with attemptId [{}]", 
taskID, this.currentInstant, attemptId);
+        this.currentInstant = null;
+      }
+      return;
+    }
     
this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID));
     LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", 
taskID);
   }

Reply via email to