This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 053e9f0b380 [HUDI-6036] Add more tests for task and coordinator 
interaction (#8383)
053e9f0b380 is described below

commit 053e9f0b3804c799beb17c07c81c56d06a1f8c03
Author: Danny Chan <[email protected]>
AuthorDate: Wed Apr 5 20:30:11 2023 +0800

    [HUDI-6036] Add more tests for task and coordinator interaction (#8383)
---
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 36 +++++++++-
 .../utils/BucketStreamWriteFunctionWrapper.java    |  9 ---
 .../hudi/sink/utils/MockOperatorStateStore.java    |  4 ++
 .../sink/utils/MockStateInitializationContext.java |  2 +-
 .../sink/utils/MockStreamingRuntimeContext.java    | 12 ++++
 .../sink/utils/StreamWriteFunctionWrapper.java     | 23 +++++--
 .../hudi/sink/utils/TestFunctionWrapper.java       | 16 ++++-
 .../org/apache/hudi/sink/utils/TestWriteBase.java  | 79 +++++++++++++++++++++-
 8 files changed, 160 insertions(+), 21 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index f705a63f1ff..9f88dd24140 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -67,7 +67,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
    * Override to have custom configuration.
    */
   protected void setUp(Configuration conf) {
-    // for sub-class extension
+    // for subclass extension
   }
 
   @Test
@@ -109,12 +109,46 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
 
   @Test
   public void testSubtaskFails() throws Exception {
+    conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
     // open the function and ingest data
     preparePipeline()
         .checkpoint(1)
         .assertEmptyEvent()
         .subTaskFails(0)
         .noCompleteInstant()
+        // write a commit and check the result
+        .consume(TestData.DATA_SET_INSERT)
+        .checkpoint(2)
+        .assertNextEvent()
+        .checkpointComplete(2)
+        .checkWrittenData(EXPECTED1)
+        // triggers task 0 failover, there is no pending instant that needs to 
recommit,
+        // the task sends an empty bootstrap event to trigger initialization 
of a new instant.
+        .subTaskFails(0, 0)
+        .assertEmptyEvent()
+        // rollback the last complete instant to inflight state, to simulate 
an instant commit failure
+        // while executing the post action of a checkpoint success 
notification event, the whole job should then
+        // trigger a failover.
+        .rollbackLastCompleteInstantToInflight()
+        .jobFailover()
+        .assertNextEvent()
+        .checkLastPendingInstantCompleted()
+        // triggers subtask failure for multiple times to simulate partial 
failover, for partial over,
+        // we allow the task to reuse the pending instant for data flushing, 
no metadata event should be sent
+        .subTaskFails(0, 1)
+        .assertNoEvent()
+        // the subtask reuses the pending instant
+        .checkpoint(3)
+        .assertNextEvent()
+        // if the write task can not fetch any pending instant when starts 
up(the coordinator restarts),
+        // it will send an event to the coordinator
+        .coordinatorFails()
+        .subTaskFails(0, 2)
+        // the subtask can not fetch the instant to write until a new instant 
is initialized
+        .checkpointThrows(4, "Timeout(1000ms) while waiting for instant 
initialize")
+        .assertEmptyEvent()
+        .subTaskFails(0, 3)
+        .assertEmptyEvent()
         .end();
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
index 49d6b4b1e2b..3be484a49be 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
@@ -165,15 +165,6 @@ public class BucketStreamWriteFunctionWrapper<I> 
implements TestFunctionWrapper<
     }
   }
 
-  public void checkpointFails(long checkpointId) {
-    coordinator.notifyCheckpointAborted(checkpointId);
-  }
-
-  public void subTaskFails(int taskID) throws Exception {
-    coordinator.subtaskFailed(taskID, new RuntimeException("Dummy exception"));
-    setupWriteFunction();
-  }
-
   public void close() throws Exception {
     coordinator.close();
     ioManager.close();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockOperatorStateStore.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockOperatorStateStore.java
index cc16be4f0eb..e0d0188c129 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockOperatorStateStore.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockOperatorStateStore.java
@@ -121,6 +121,10 @@ public class MockOperatorStateStore implements 
KeyedStateStore, OperatorStateSto
     lastSuccessStateMap = historyStateMap.get(checkpointId);
   }
 
+  public boolean isRestored() {
+    return !this.currentStateMap.isEmpty();
+  }
+
   public void rollBackToLastSuccessCheckpoint() {
     this.currentStateMap = copyStates(lastSuccessStateMap);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateInitializationContext.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateInitializationContext.java
index 945d1bbbe75..e218f29df6f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateInitializationContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateInitializationContext.java
@@ -37,7 +37,7 @@ public class MockStateInitializationContext implements 
StateInitializationContex
 
   @Override
   public boolean isRestored() {
-    return false;
+    return operatorStateStore.isRestored();
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java
index 7c5b79700e4..888e349bdd9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java
@@ -44,6 +44,8 @@ public class MockStreamingRuntimeContext extends 
StreamingRuntimeContextAdapter
   private final int numParallelSubtasks;
   private final int subtaskIndex;
 
+  private int attemptNumber;
+
   public MockStreamingRuntimeContext(
       boolean isCheckpointingEnabled,
       int numParallelSubtasks,
@@ -66,6 +68,7 @@ public class MockStreamingRuntimeContext extends 
StreamingRuntimeContextAdapter
     this.isCheckpointingEnabled = isCheckpointingEnabled;
     this.numParallelSubtasks = numParallelSubtasks;
     this.subtaskIndex = subtaskIndex;
+    this.attemptNumber = 0;
   }
 
   @Override
@@ -83,6 +86,15 @@ public class MockStreamingRuntimeContext extends 
StreamingRuntimeContextAdapter
     return numParallelSubtasks;
   }
 
+  @Override
+  public int getAttemptNumber() {
+    return this.attemptNumber;
+  }
+
+  public void setAttemptNumber(int attemptNumber) {
+    this.attemptNumber = attemptNumber;
+  }
+
   private static class MockStreamOperator extends 
AbstractStreamOperator<Integer> {
     private static final long serialVersionUID = -1153976702711944427L;
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index db8ff36962b..22280bb3129 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -42,7 +42,6 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import 
org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
 import 
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
 import org.apache.flink.streaming.util.MockStreamTask;
@@ -65,7 +64,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   private final Configuration conf;
 
   private final IOManager ioManager;
-  private final StreamingRuntimeContext runtimeContext;
+  private final MockStreamingRuntimeContext runtimeContext;
   private final MockOperatorEventGateway gateway;
   private final MockOperatorCoordinatorContext coordinatorContext;
   private final StreamWriteOperatorCoordinator coordinator;
@@ -155,6 +154,8 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     }
 
     setupWriteFunction();
+    // handle the bootstrap event
+    coordinator.handleEventFromOperator(0, getNextEvent());
 
     if (asyncCompaction) {
       compactFunctionWrapper.openFunction();
@@ -210,12 +211,25 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     }
   }
 
+  public void jobFailover() throws Exception {
+    coordinatorFails();
+    subTaskFails(0, 0);
+  }
+
+  public void coordinatorFails() throws Exception {
+    this.coordinator.close();
+    this.coordinator.start();
+    this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
+  }
+
   public void checkpointFails(long checkpointId) {
     coordinator.notifyCheckpointAborted(checkpointId);
   }
 
-  public void subTaskFails(int taskID) throws Exception {
+  public void subTaskFails(int taskID, int attemptNumber) throws Exception {
     coordinator.subtaskFailed(taskID, new RuntimeException("Dummy exception"));
+    // reset the attempt number to simulate the task failover/retries
+    this.runtimeContext.setAttemptNumber(attemptNumber);
     setupWriteFunction();
   }
 
@@ -259,9 +273,6 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     writeFunction.setOperatorEventGateway(gateway);
     writeFunction.initializeState(this.stateInitializationContext);
     writeFunction.open(conf);
-
-    // handle the bootstrap event
-    coordinator.handleEventFromOperator(0, getNextEvent());
   }
 
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
index d2fe8196502..ca1761a235f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
@@ -63,6 +63,20 @@ public interface TestFunctionWrapper<I> {
    */
   void checkpointComplete(long checkpointId);
 
+  /**
+   * Triggers the job failover, including the coordinator and the write tasks.
+   */
+  default void jobFailover() throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Triggers the coordinator failover separately.
+   */
+  default void coordinatorFails() throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Returns the operator coordinator.
    */
@@ -92,7 +106,7 @@ public interface TestFunctionWrapper<I> {
   /**
    * Mark sub-task with id {@code taskId} as failed.
    */
-  default void subTaskFails(int taskId) throws Exception {
+  default void subTaskFails(int taskId, int attemptNumber) throws Exception {
     throw new UnsupportedOperationException();
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index a6e7a19952c..5a5254d0313 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -22,12 +22,16 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.meta.CkpMetadata;
+import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestData;
 import org.apache.hudi.utils.TestUtils;
 
@@ -35,10 +39,12 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.table.data.RowData;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.hamcrest.MatcherAssert;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -56,6 +62,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -177,7 +184,11 @@ public class TestWriteBase {
       final OperatorEvent nextEvent = this.pipeline.getNextEvent();
       MatcherAssert.assertThat("The operator expect to send an event", 
nextEvent, instanceOf(WriteMetadataEvent.class));
       this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
-      assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator missed 
the event");
+      if (!((WriteMetadataEvent) nextEvent).isBootstrap()) {
+        assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator 
missed the event");
+      } else {
+        assertNull(this.pipeline.getEventBuffer()[0], "The coordinator should 
reset event buffer because of the instant initialization");
+      }
       return this;
     }
 
@@ -214,7 +225,24 @@ public class TestWriteBase {
       assertNotNull(writeStatuses);
       MatcherAssert.assertThat(writeStatuses.size(), is(0));
       this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
-      assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator missed 
the event");
+      if (!((WriteMetadataEvent) nextEvent).isBootstrap()) {
+        assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator 
missed the event");
+      } else {
+        assertNull(this.pipeline.getEventBuffer()[0], "The coordinator should 
reset event buffer because of the instant initialization");
+      }
+      return this;
+    }
+
+    /**
+     * Assert the write task should not send the event to the coordinator.
+     */
+    public TestHarness assertNoEvent() {
+      try {
+        this.pipeline.getNextEvent();
+        throw new AssertionError("The write task should not send the event");
+      } catch (Exception ext) {
+        // ignored
+      }
       return this;
     }
 
@@ -320,13 +348,29 @@ public class TestWriteBase {
     public TestHarness subTaskFails(int taskId) throws Exception {
       // fails the subtask
       String instant1 = lastPendingInstant();
-      this.pipeline.subTaskFails(taskId);
+
+      subTaskFails(taskId, 0);
+      assertEmptyEvent();
 
       String instant2 = lastPendingInstant();
       assertNotEquals(instant2, instant1, "The previous instant should be 
rolled back when starting new instant");
       return this;
     }
 
+    /**
+     * Mark the task with id {@code taskId} as failed.
+     */
+    public TestHarness subTaskFails(int taskId, int attemptNumber) throws 
Exception {
+      // fails the subtask
+      this.pipeline.subTaskFails(taskId, attemptNumber);
+      return this;
+    }
+
+    public TestHarness jobFailover() throws Exception {
+      this.pipeline.jobFailover();
+      return this;
+    }
+
     public TestHarness noCompleteInstant() {
       // no complete instant
       checkInstantState(HoodieInstant.State.COMPLETED, null);
@@ -415,6 +459,35 @@ public class TestWriteBase {
       return this;
     }
 
+    public TestHarness rollbackLastCompleteInstantToInflight() throws 
Exception {
+      HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+      Option<HoodieInstant> lastCompletedInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+      HoodieActiveTimeline.deleteInstantFile(metaClient.getFs(), 
metaClient.getMetaPath(), lastCompletedInstant.get());
+      // refresh the heartbeat in case it is timed out.
+      OutputStream outputStream =
+          metaClient.getFs().create(new 
Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR + 
this.lastComplete), true);
+      outputStream.close();
+      this.lastPending = this.lastComplete;
+      this.lastComplete = lastCompleteInstant();
+      return this;
+    }
+
+    public TestHarness checkLastPendingInstantCompleted() {
+      checkInstantState(HoodieInstant.State.COMPLETED, this.lastPending);
+      this.lastComplete = lastPending;
+      this.lastPending = lastPendingInstant();
+      return this;
+    }
+
+    /**
+     * Used to simulate the use case that the coordinator has not finished a 
new instant initialization,
+     * while the write task fails intermittently.
+     */
+    public TestHarness coordinatorFails() throws Exception {
+      this.pipeline.coordinatorFails();
+      return this;
+    }
+
     public void end() throws Exception {
       this.pipeline.close();
     }

Reply via email to