This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 053e9f0b380 [HUDI-6036] Add more tests for task and coordinator
interaction (#8383)
053e9f0b380 is described below
commit 053e9f0b3804c799beb17c07c81c56d06a1f8c03
Author: Danny Chan <[email protected]>
AuthorDate: Wed Apr 5 20:30:11 2023 +0800
[HUDI-6036] Add more tests for task and coordinator interaction (#8383)
---
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 36 +++++++++-
.../utils/BucketStreamWriteFunctionWrapper.java | 9 ---
.../hudi/sink/utils/MockOperatorStateStore.java | 4 ++
.../sink/utils/MockStateInitializationContext.java | 2 +-
.../sink/utils/MockStreamingRuntimeContext.java | 12 ++++
.../sink/utils/StreamWriteFunctionWrapper.java | 23 +++++--
.../hudi/sink/utils/TestFunctionWrapper.java | 16 ++++-
.../org/apache/hudi/sink/utils/TestWriteBase.java | 79 +++++++++++++++++++++-
8 files changed, 160 insertions(+), 21 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index f705a63f1ff..9f88dd24140 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -67,7 +67,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
* Override to have custom configuration.
*/
protected void setUp(Configuration conf) {
- // for sub-class extension
+ // for subclass extension
}
@Test
@@ -109,12 +109,46 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
@Test
public void testSubtaskFails() throws Exception {
+ conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
// open the function and ingest data
preparePipeline()
.checkpoint(1)
.assertEmptyEvent()
.subTaskFails(0)
.noCompleteInstant()
+ // write a commit and check the result
+ .consume(TestData.DATA_SET_INSERT)
+ .checkpoint(2)
+ .assertNextEvent()
+ .checkpointComplete(2)
+ .checkWrittenData(EXPECTED1)
+ // triggers task 0 failover, there is no pending instant that needs to
recommit,
+ // the task sends an empty bootstrap event to trigger initialization
of a new instant.
+ .subTaskFails(0, 0)
+ .assertEmptyEvent()
+ // rollback the last complete instant to inflight state, to simulate
an instant commit failure
+ // while executing the post action of a checkpoint success
notification event, the whole job should then
+ // trigger a failover.
+ .rollbackLastCompleteInstantToInflight()
+ .jobFailover()
+ .assertNextEvent()
+ .checkLastPendingInstantCompleted()
+ // triggers subtask failure for multiple times to simulate partial
failover, for partial over,
+ // we allow the task to reuse the pending instant for data flushing,
no metadata event should be sent
+ .subTaskFails(0, 1)
+ .assertNoEvent()
+ // the subtask reuses the pending instant
+ .checkpoint(3)
+ .assertNextEvent()
+ // if the write task can not fetch any pending instant when starts
up(the coordinator restarts),
+ // it will send an event to the coordinator
+ .coordinatorFails()
+ .subTaskFails(0, 2)
+ // the subtask can not fetch the instant to write until a new instant
is initialized
+ .checkpointThrows(4, "Timeout(1000ms) while waiting for instant
initialize")
+ .assertEmptyEvent()
+ .subTaskFails(0, 3)
+ .assertEmptyEvent()
.end();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
index 49d6b4b1e2b..3be484a49be 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
@@ -165,15 +165,6 @@ public class BucketStreamWriteFunctionWrapper<I>
implements TestFunctionWrapper<
}
}
- public void checkpointFails(long checkpointId) {
- coordinator.notifyCheckpointAborted(checkpointId);
- }
-
- public void subTaskFails(int taskID) throws Exception {
- coordinator.subtaskFailed(taskID, new RuntimeException("Dummy exception"));
- setupWriteFunction();
- }
-
public void close() throws Exception {
coordinator.close();
ioManager.close();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockOperatorStateStore.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockOperatorStateStore.java
index cc16be4f0eb..e0d0188c129 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockOperatorStateStore.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockOperatorStateStore.java
@@ -121,6 +121,10 @@ public class MockOperatorStateStore implements
KeyedStateStore, OperatorStateSto
lastSuccessStateMap = historyStateMap.get(checkpointId);
}
+ public boolean isRestored() {
+ return !this.currentStateMap.isEmpty();
+ }
+
public void rollBackToLastSuccessCheckpoint() {
this.currentStateMap = copyStates(lastSuccessStateMap);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateInitializationContext.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateInitializationContext.java
index 945d1bbbe75..e218f29df6f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateInitializationContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateInitializationContext.java
@@ -37,7 +37,7 @@ public class MockStateInitializationContext implements
StateInitializationContex
@Override
public boolean isRestored() {
- return false;
+ return operatorStateStore.isRestored();
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java
index 7c5b79700e4..888e349bdd9 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java
@@ -44,6 +44,8 @@ public class MockStreamingRuntimeContext extends
StreamingRuntimeContextAdapter
private final int numParallelSubtasks;
private final int subtaskIndex;
+ private int attemptNumber;
+
public MockStreamingRuntimeContext(
boolean isCheckpointingEnabled,
int numParallelSubtasks,
@@ -66,6 +68,7 @@ public class MockStreamingRuntimeContext extends
StreamingRuntimeContextAdapter
this.isCheckpointingEnabled = isCheckpointingEnabled;
this.numParallelSubtasks = numParallelSubtasks;
this.subtaskIndex = subtaskIndex;
+ this.attemptNumber = 0;
}
@Override
@@ -83,6 +86,15 @@ public class MockStreamingRuntimeContext extends
StreamingRuntimeContextAdapter
return numParallelSubtasks;
}
+ @Override
+ public int getAttemptNumber() {
+ return this.attemptNumber;
+ }
+
+ public void setAttemptNumber(int attemptNumber) {
+ this.attemptNumber = attemptNumber;
+ }
+
private static class MockStreamOperator extends
AbstractStreamOperator<Integer> {
private static final long serialVersionUID = -1153976702711944427L;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index db8ff36962b..22280bb3129 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -42,7 +42,6 @@ import
org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import
org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
import
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
import org.apache.flink.streaming.util.MockStreamTask;
@@ -65,7 +64,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
private final Configuration conf;
private final IOManager ioManager;
- private final StreamingRuntimeContext runtimeContext;
+ private final MockStreamingRuntimeContext runtimeContext;
private final MockOperatorEventGateway gateway;
private final MockOperatorCoordinatorContext coordinatorContext;
private final StreamWriteOperatorCoordinator coordinator;
@@ -155,6 +154,8 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
}
setupWriteFunction();
+ // handle the bootstrap event
+ coordinator.handleEventFromOperator(0, getNextEvent());
if (asyncCompaction) {
compactFunctionWrapper.openFunction();
@@ -210,12 +211,25 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
}
}
+ public void jobFailover() throws Exception {
+ coordinatorFails();
+ subTaskFails(0, 0);
+ }
+
+ public void coordinatorFails() throws Exception {
+ this.coordinator.close();
+ this.coordinator.start();
+ this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
+ }
+
public void checkpointFails(long checkpointId) {
coordinator.notifyCheckpointAborted(checkpointId);
}
- public void subTaskFails(int taskID) throws Exception {
+ public void subTaskFails(int taskID, int attemptNumber) throws Exception {
coordinator.subtaskFailed(taskID, new RuntimeException("Dummy exception"));
+ // reset the attempt number to simulate the task failover/retries
+ this.runtimeContext.setAttemptNumber(attemptNumber);
setupWriteFunction();
}
@@ -259,9 +273,6 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
writeFunction.setOperatorEventGateway(gateway);
writeFunction.initializeState(this.stateInitializationContext);
writeFunction.open(conf);
-
- // handle the bootstrap event
- coordinator.handleEventFromOperator(0, getNextEvent());
}
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
index d2fe8196502..ca1761a235f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
@@ -63,6 +63,20 @@ public interface TestFunctionWrapper<I> {
*/
void checkpointComplete(long checkpointId);
+ /**
+ * Triggers the job failover, including the coordinator and the write tasks.
+ */
+ default void jobFailover() throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Triggers the coordinator failover separately.
+ */
+ default void coordinatorFails() throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Returns the operator coordinator.
*/
@@ -92,7 +106,7 @@ public interface TestFunctionWrapper<I> {
/**
* Mark sub-task with id {@code taskId} as failed.
*/
- default void subTaskFails(int taskId) throws Exception {
+ default void subTaskFails(int taskId, int attemptNumber) throws Exception {
throw new UnsupportedOperationException();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index a6e7a19952c..5a5254d0313 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -22,12 +22,16 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
@@ -35,10 +39,12 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.hamcrest.MatcherAssert;
import java.io.File;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
@@ -56,6 +62,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -177,7 +184,11 @@ public class TestWriteBase {
final OperatorEvent nextEvent = this.pipeline.getNextEvent();
MatcherAssert.assertThat("The operator expect to send an event",
nextEvent, instanceOf(WriteMetadataEvent.class));
this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
- assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator missed
the event");
+ if (!((WriteMetadataEvent) nextEvent).isBootstrap()) {
+ assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator
missed the event");
+ } else {
+ assertNull(this.pipeline.getEventBuffer()[0], "The coordinator should
reset event buffer because of the instant initialization");
+ }
return this;
}
@@ -214,7 +225,24 @@ public class TestWriteBase {
assertNotNull(writeStatuses);
MatcherAssert.assertThat(writeStatuses.size(), is(0));
this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
- assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator missed
the event");
+ if (!((WriteMetadataEvent) nextEvent).isBootstrap()) {
+ assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator
missed the event");
+ } else {
+ assertNull(this.pipeline.getEventBuffer()[0], "The coordinator should
reset event buffer because of the instant initialization");
+ }
+ return this;
+ }
+
+ /**
+ * Assert the write task should not send the event to the coordinator.
+ */
+ public TestHarness assertNoEvent() {
+ try {
+ this.pipeline.getNextEvent();
+ throw new AssertionError("The write task should not send the event");
+ } catch (Exception ext) {
+ // ignored
+ }
return this;
}
@@ -320,13 +348,29 @@ public class TestWriteBase {
public TestHarness subTaskFails(int taskId) throws Exception {
// fails the subtask
String instant1 = lastPendingInstant();
- this.pipeline.subTaskFails(taskId);
+
+ subTaskFails(taskId, 0);
+ assertEmptyEvent();
String instant2 = lastPendingInstant();
assertNotEquals(instant2, instant1, "The previous instant should be
rolled back when starting new instant");
return this;
}
+ /**
+ * Mark the task with id {@code taskId} as failed.
+ */
+ public TestHarness subTaskFails(int taskId, int attemptNumber) throws
Exception {
+ // fails the subtask
+ this.pipeline.subTaskFails(taskId, attemptNumber);
+ return this;
+ }
+
+ public TestHarness jobFailover() throws Exception {
+ this.pipeline.jobFailover();
+ return this;
+ }
+
public TestHarness noCompleteInstant() {
// no complete instant
checkInstantState(HoodieInstant.State.COMPLETED, null);
@@ -415,6 +459,35 @@ public class TestWriteBase {
return this;
}
+ public TestHarness rollbackLastCompleteInstantToInflight() throws
Exception {
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ Option<HoodieInstant> lastCompletedInstant =
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+ HoodieActiveTimeline.deleteInstantFile(metaClient.getFs(),
metaClient.getMetaPath(), lastCompletedInstant.get());
+ // refresh the heartbeat in case it is timed out.
+ OutputStream outputStream =
+ metaClient.getFs().create(new
Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR +
this.lastComplete), true);
+ outputStream.close();
+ this.lastPending = this.lastComplete;
+ this.lastComplete = lastCompleteInstant();
+ return this;
+ }
+
+ public TestHarness checkLastPendingInstantCompleted() {
+ checkInstantState(HoodieInstant.State.COMPLETED, this.lastPending);
+ this.lastComplete = lastPending;
+ this.lastPending = lastPendingInstant();
+ return this;
+ }
+
+ /**
+ * Used to simulate the use case that the coordinator has not finished a
new instant initialization,
+ * while the write task fails intermittently.
+ */
+ public TestHarness coordinatorFails() throws Exception {
+ this.pipeline.coordinatorFails();
+ return this;
+ }
+
public void end() throws Exception {
this.pipeline.close();
}