This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new fbc4f4dd3ed [HUDI-7447] Fix not bootstrap when subTask restart when
OPCoordinator handle CheckPointComplete not finished (#10767)
fbc4f4dd3ed is described below
commit fbc4f4dd3ede79e1e9c460c0ec30c96d98056244
Author: wenbingshen <[email protected]>
AuthorDate: Sat Mar 2 08:43:56 2024 +0800
[HUDI-7447] Fix not bootstrap when subTask restart when OPCoordinator
handle CheckPointComplete not finished (#10767)
---
.../org/apache/hudi/sink/StreamWriteOperatorCoordinator.java | 12 ++++++++++++
.../test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java | 11 ++++++++++-
.../org/apache/hudi/sink/utils/InsertFunctionWrapper.java | 9 ++++++++-
.../apache/hudi/sink/utils/StreamWriteFunctionWrapper.java | 9 ++++++++-
.../java/org/apache/hudi/sink/utils/TestFunctionWrapper.java | 8 ++++++++
.../test/java/org/apache/hudi/sink/utils/TestWriteBase.java | 4 ++--
6 files changed, 48 insertions(+), 5 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 274091c88ea..8d2cf38ed0a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -435,6 +435,18 @@ public class StreamWriteOperatorCoordinator
.filter(evt -> evt.getWriteStatuses().size() > 0)
.findFirst().map(WriteMetadataEvent::getInstantTime)
.orElse(WriteMetadataEvent.BOOTSTRAP_INSTANT);
+
+ // if currentInstant is pending && bootstrap event instant is empty
+ // reuse currentInstant, reject bootstrap
+ if
(this.metaClient.reloadActiveTimeline().filterInflightsAndRequested().containsInstant(this.instant)
+ && instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT)
+ && this.tableState.operationType == WriteOperationType.INSERT) {
+ LOG.warn("Reuse current pending Instant {} with {} operationType, "
+ + "ignoring empty bootstrap event.", this.instant,
WriteOperationType.INSERT.value());
+ reset();
+ return;
+ }
+
initInstant(instant);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index f28dfe31456..83ca930b61d 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -144,6 +144,8 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.end();
}
+ // Only when Job level fails with INSERT operationType can we roll back the
unfinished instant.
+ // Task level failed retry, we should reuse the unfinished Instant with
INSERT operationType
@Test
public void testPartialFailover() throws Exception {
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
@@ -159,7 +161,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.assertNextEvent()
// if the write task can not fetch any pending instant when starts
up(the coordinator restarts),
// it will send an event to the coordinator
- .coordinatorFails()
+ .restartCoordinator()
.subTaskFails(0, 2)
// the subtask can not fetch the instant to write until a new instant
is initialized
.checkpointThrows(4, "Timeout(1000ms) while waiting for instant
initialize")
@@ -168,6 +170,13 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
// the last checkpoint instant was rolled back by subTaskFails(0, 2)
// with EAGER cleaning strategy
.assertNoEvent()
+ .checkpoint(4)
+ .assertNextEvent()
+ .subTaskFails(0, 4)
+ // the last checkpoint instant can not be rolled back by
subTaskFails(0, 4) with INSERT write operationType
+ // because last data has been snapshot by checkpoint complete but
instant has not been committed
+ // so we need re-commit it
+ .assertEmptyEvent()
.end();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
index cb144e92ba0..15634cc6e72 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
@@ -59,7 +59,7 @@ public class InsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
private final MockStreamingRuntimeContext runtimeContext;
private final MockOperatorEventGateway gateway;
private final MockOperatorCoordinatorContext coordinatorContext;
- private final StreamWriteOperatorCoordinator coordinator;
+ private StreamWriteOperatorCoordinator coordinator;
private final MockStateInitializationContext stateInitializationContext;
private final boolean asyncClustering;
@@ -152,6 +152,13 @@ public class InsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
}
+ public void restartCoordinator() throws Exception {
+ this.coordinator.close();
+ this.coordinator = new StreamWriteOperatorCoordinator(conf,
this.coordinatorContext);
+ this.coordinator.start();
+ this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
+ }
+
public void checkpointFails(long checkpointId) {
coordinator.notifyCheckpointAborted(checkpointId);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index cf801bb0d7d..c65e42f1521 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -71,7 +71,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
private final MockStreamingRuntimeContext runtimeContext;
private final MockOperatorEventGateway gateway;
private final MockOperatorCoordinatorContext coordinatorContext;
- private final StreamWriteOperatorCoordinator coordinator;
+ private StreamWriteOperatorCoordinator coordinator;
private final MockStateInitializationContext stateInitializationContext;
/**
@@ -227,6 +227,13 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
}
+ public void restartCoordinator() throws Exception {
+ this.coordinator.close();
+ this.coordinator = new StreamWriteOperatorCoordinator(conf,
this.coordinatorContext);
+ this.coordinator.start();
+ this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
+ }
+
public void checkpointFails(long checkpointId) {
coordinator.notifyCheckpointAborted(checkpointId);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
index 25593d8d2fd..faee168bf25 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
@@ -82,6 +82,14 @@ public interface TestFunctionWrapper<I> {
throw new UnsupportedOperationException();
}
+ /**
+ * Triggers Job level fail, so the coordinator need re-create a new instance.
+ * @throws Exception
+ */
+ default void restartCoordinator() throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Returns the operator coordinator.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index dd0db132bf8..0d668cfda5a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -479,8 +479,8 @@ public class TestWriteBase {
* Used to simulate the use case that the coordinator has not finished a
new instant initialization,
* while the write task fails intermittently.
*/
- public TestHarness coordinatorFails() throws Exception {
- this.pipeline.coordinatorFails();
+ public TestHarness restartCoordinator() throws Exception {
+ this.pipeline.restartCoordinator();
return this;
}