This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d81f2bba8d1596bb0621d5992b145f2d7b6bde00
Author: wenbingshen <[email protected]>
AuthorDate: Sat Mar 2 08:43:56 2024 +0800

    [HUDI-7447] Fix not bootstrap when subTask restart when OPCoordinator 
handle CheckPointComplete not finished (#10767)
---
 .../org/apache/hudi/sink/StreamWriteOperatorCoordinator.java | 12 ++++++++++++
 .../test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java | 11 ++++++++++-
 .../org/apache/hudi/sink/utils/InsertFunctionWrapper.java    |  9 ++++++++-
 .../apache/hudi/sink/utils/StreamWriteFunctionWrapper.java   |  9 ++++++++-
 .../java/org/apache/hudi/sink/utils/TestFunctionWrapper.java |  8 ++++++++
 .../test/java/org/apache/hudi/sink/utils/TestWriteBase.java  |  4 ++--
 6 files changed, 48 insertions(+), 5 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 274091c88ea..8d2cf38ed0a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -435,6 +435,18 @@ public class StreamWriteOperatorCoordinator
           .filter(evt -> evt.getWriteStatuses().size() > 0)
           .findFirst().map(WriteMetadataEvent::getInstantTime)
           .orElse(WriteMetadataEvent.BOOTSTRAP_INSTANT);
+
+      // if currentInstant is pending && bootstrap event instant is empty
+      // reuse currentInstant, reject bootstrap
+      if 
(this.metaClient.reloadActiveTimeline().filterInflightsAndRequested().containsInstant(this.instant)
+              && instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT)
+              && this.tableState.operationType == WriteOperationType.INSERT) {
+        LOG.warn("Reuse current pending Instant {} with {} operationType, "
+                + "ignoring empty bootstrap event.", this.instant, 
WriteOperationType.INSERT.value());
+        reset();
+        return;
+      }
+
       initInstant(instant);
     }
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index f28dfe31456..83ca930b61d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -144,6 +144,8 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .end();
   }
 
+  // Only when Job level fails with INSERT operationType can we roll back the 
unfinished instant.
+  // Task level failed retry, we should reuse the unfinished Instant with 
INSERT operationType
   @Test
   public void testPartialFailover() throws Exception {
     conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
@@ -159,7 +161,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .assertNextEvent()
         // if the write task can not fetch any pending instant when starts 
up(the coordinator restarts),
         // it will send an event to the coordinator
-        .coordinatorFails()
+        .restartCoordinator()
         .subTaskFails(0, 2)
         // the subtask can not fetch the instant to write until a new instant 
is initialized
         .checkpointThrows(4, "Timeout(1000ms) while waiting for instant 
initialize")
@@ -168,6 +170,13 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         // the last checkpoint instant was rolled back by subTaskFails(0, 2)
         // with EAGER cleaning strategy
         .assertNoEvent()
+        .checkpoint(4)
+        .assertNextEvent()
+        .subTaskFails(0, 4)
+        // the last checkpoint instant can not be rolled back by 
subTaskFails(0, 4) with INSERT write operationType
+        // because last data has been snapshot by checkpoint complete but 
instant has not been committed
+        // so we need re-commit it
+        .assertEmptyEvent()
         .end();
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
index cb144e92ba0..15634cc6e72 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
@@ -59,7 +59,7 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   private final MockStreamingRuntimeContext runtimeContext;
   private final MockOperatorEventGateway gateway;
   private final MockOperatorCoordinatorContext coordinatorContext;
-  private final StreamWriteOperatorCoordinator coordinator;
+  private StreamWriteOperatorCoordinator coordinator;
   private final MockStateInitializationContext stateInitializationContext;
 
   private final boolean asyncClustering;
@@ -152,6 +152,13 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
   }
 
+  public void restartCoordinator() throws Exception {
+    this.coordinator.close();
+    this.coordinator = new StreamWriteOperatorCoordinator(conf, 
this.coordinatorContext);
+    this.coordinator.start();
+    this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
+  }
+
   public void checkpointFails(long checkpointId) {
     coordinator.notifyCheckpointAborted(checkpointId);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index cf801bb0d7d..c65e42f1521 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -71,7 +71,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   private final MockStreamingRuntimeContext runtimeContext;
   private final MockOperatorEventGateway gateway;
   private final MockOperatorCoordinatorContext coordinatorContext;
-  private final StreamWriteOperatorCoordinator coordinator;
+  private StreamWriteOperatorCoordinator coordinator;
   private final MockStateInitializationContext stateInitializationContext;
 
   /**
@@ -227,6 +227,13 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
   }
 
+  public void restartCoordinator() throws Exception {
+    this.coordinator.close();
+    this.coordinator = new StreamWriteOperatorCoordinator(conf, 
this.coordinatorContext);
+    this.coordinator.start();
+    this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
+  }
+
   public void checkpointFails(long checkpointId) {
     coordinator.notifyCheckpointAborted(checkpointId);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
index 25593d8d2fd..faee168bf25 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
@@ -82,6 +82,14 @@ public interface TestFunctionWrapper<I> {
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Triggers Job level fail, so the coordinator need re-create a new instance.
+   * @throws Exception
+   */
+  default void restartCoordinator() throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Returns the operator coordinator.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index dd0db132bf8..0d668cfda5a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -479,8 +479,8 @@ public class TestWriteBase {
      * Used to simulate the use case that the coordinator has not finished a 
new instant initialization,
      * while the write task fails intermittently.
      */
-    public TestHarness coordinatorFails() throws Exception {
-      this.pipeline.coordinatorFails();
+    public TestHarness restartCoordinator() throws Exception {
+      this.pipeline.restartCoordinator();
       return this;
     }
 

Reply via email to