This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fe9f969289 [HUDI-6588] Fix duplicate fileId on TM partial-failover 
and recovery (#9357)
6fe9f969289 is described below

commit 6fe9f969289c8dd9eaeb843a8f10623f03ab5b59
Author: voonhous <[email protected]>
AuthorDate: Fri Aug 4 16:37:42 2023 +0800

    [HUDI-6588] Fix duplicate fileId on TM partial-failover and recovery (#9357)
    
    Co-authored-by: danny0405 <[email protected]>
---
 .../hudi/sink/append/AppendWriteFunction.java      | 15 +++++++++++++
 .../sink/common/AbstractStreamWriteFunction.java   | 12 +----------
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |  9 ++++++++
 .../org/apache/hudi/sink/TestWriteMergeOnRead.java |  5 +++++
 .../hudi/sink/TestWriteMergeOnReadWithCompact.java |  5 +++++
 .../hudi/sink/utils/InsertFunctionWrapper.java     | 25 +++++++++++++++++-----
 6 files changed, 55 insertions(+), 16 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index e1db125731c..91c59341109 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -95,6 +95,21 @@ public class AppendWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     this.writeStatuses.clear();
   }
 
+  protected void sendBootstrapEvent() {
+    int attemptId = getRuntimeContext().getAttemptNumber();
+    if (attemptId > 0) {
+      // either a partial or global failover, reuses the current inflight 
instant
+      if (this.currentInstant != null) {
+        LOG.info("Recover task[{}] for instant [{}] with attemptId [{}]", 
taskID, this.currentInstant, attemptId);
+        this.currentInstant = null;
+        return;
+      }
+      // the JM may have also been rebooted, sends the bootstrap event either
+    }
+    
this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID));
+    LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", 
taskID);
+  }
+
   // -------------------------------------------------------------------------
   //  GetterSetter
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index cbb8851a940..3bd19fa0699 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -213,17 +213,7 @@ public abstract class AbstractStreamWriteFunction<I>
     }
   }
 
-  private void sendBootstrapEvent() {
-    int attemptId = getRuntimeContext().getAttemptNumber();
-    if (attemptId > 0) {
-      // either a partial or global failover, reuses the current inflight 
instant
-      if (this.currentInstant != null) {
-        LOG.info("Recover task[{}] for instant [{}] with attemptId [{}]", 
taskID, this.currentInstant, attemptId);
-        this.currentInstant = null;
-        return;
-      }
-      // the JM may have also been rebooted, sends the bootstrap event either
-    }
+  protected void sendBootstrapEvent() {
     
this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID));
     LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", 
taskID);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index ea8b8b75ce5..90aa86cd353 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -141,6 +141,15 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .jobFailover()
         .assertNextEvent()
         .checkLastPendingInstantCompleted()
+        .end();
+  }
+
+  @Test
+  public void testPartialFailover() throws Exception {
+    conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
+    conf.setString(FlinkOptions.OPERATION, "INSERT");
+    // open the function and ingest data
+    preparePipeline()
         // triggers subtask failure for multiple times to simulate partial 
failover, for partial over,
         // we allow the task to reuse the pending instant for data flushing, 
no metadata event should be sent
         .subTaskFails(0, 1)
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index d968362ca2a..94214661ea8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -143,6 +143,11 @@ public class TestWriteMergeOnRead extends 
TestWriteCopyOnWrite {
         .end();
   }
 
+  @Test
+  public void testPartialFailover() {
+    // partial failover is only valid for append mode.
+  }
+
   @Test
   public void testInsertAppendMode() {
     // append mode is only valid for cow table.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index f6ee44db254..abc5679367a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -38,6 +38,11 @@ public class TestWriteMergeOnReadWithCompact extends 
TestWriteCopyOnWrite {
     conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
   }
 
+  @Test
+  public void testPartialFailover() {
+    // partial failover is only valid for append mode.
+  }
+
   @Test
   public void testInsertAppendMode() {
     // append mode is only valid for cow table.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
index 5778a0375e9..cb144e92ba0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
@@ -38,7 +38,6 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import 
org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
 import 
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -57,7 +56,7 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   private final Configuration conf;
   private final RowType rowType;
 
-  private final StreamingRuntimeContext runtimeContext;
+  private final MockStreamingRuntimeContext runtimeContext;
   private final MockOperatorEventGateway gateway;
   private final MockOperatorCoordinatorContext coordinatorContext;
   private final StreamWriteOperatorCoordinator coordinator;
@@ -102,6 +101,8 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
 
     setupWriteFunction();
+    // handle the bootstrap event
+    coordinator.handleEventFromOperator(0, getNextEvent());
 
     if (asyncClustering) {
       clusteringFunctionWrapper.openFunction();
@@ -145,6 +146,23 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     }
   }
 
+  public void coordinatorFails() throws Exception {
+    this.coordinator.close();
+    this.coordinator.start();
+    this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
+  }
+
+  public void checkpointFails(long checkpointId) {
+    coordinator.notifyCheckpointAborted(checkpointId);
+  }
+
+  public void subTaskFails(int taskID, int attemptNumber) throws Exception {
+    coordinator.subtaskFailed(taskID, new RuntimeException("Dummy exception"));
+    // reset the attempt number to simulate the task failover/retries
+    this.runtimeContext.setAttemptNumber(attemptNumber);
+    setupWriteFunction();
+  }
+
   public StreamWriteOperatorCoordinator getCoordinator() {
     return coordinator;
   }
@@ -171,8 +189,5 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     writeFunction.setOperatorEventGateway(gateway);
     writeFunction.initializeState(this.stateInitializationContext);
     writeFunction.open(conf);
-
-    // handle the bootstrap event
-    coordinator.handleEventFromOperator(0, getNextEvent());
   }
 }

Reply via email to