This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d81f2bba8d1596bb0621d5992b145f2d7b6bde00 Author: wenbingshen <[email protected]> AuthorDate: Sat Mar 2 08:43:56 2024 +0800 [HUDI-7447] Fix not bootstrap when subTask restart when OPCoordinator handle CheckPointComplete not finished (#10767) --- .../org/apache/hudi/sink/StreamWriteOperatorCoordinator.java | 12 ++++++++++++ .../test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java | 11 ++++++++++- .../org/apache/hudi/sink/utils/InsertFunctionWrapper.java | 9 ++++++++- .../apache/hudi/sink/utils/StreamWriteFunctionWrapper.java | 9 ++++++++- .../java/org/apache/hudi/sink/utils/TestFunctionWrapper.java | 8 ++++++++ .../test/java/org/apache/hudi/sink/utils/TestWriteBase.java | 4 ++-- 6 files changed, 48 insertions(+), 5 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 274091c88ea..8d2cf38ed0a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -435,6 +435,18 @@ public class StreamWriteOperatorCoordinator .filter(evt -> evt.getWriteStatuses().size() > 0) .findFirst().map(WriteMetadataEvent::getInstantTime) .orElse(WriteMetadataEvent.BOOTSTRAP_INSTANT); + + // if currentInstant is pending && bootstrap event instant is empty + // reuse currentInstant, reject bootstrap + if (this.metaClient.reloadActiveTimeline().filterInflightsAndRequested().containsInstant(this.instant) + && instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT) + && this.tableState.operationType == WriteOperationType.INSERT) { + LOG.warn("Reuse current pending Instant {} with {} operationType, " + + "ignoring empty bootstrap event.", this.instant, WriteOperationType.INSERT.value()); + reset(); + return; + } + initInstant(instant); } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index f28dfe31456..83ca930b61d 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -144,6 +144,8 @@ public class TestWriteCopyOnWrite extends TestWriteBase { .end(); } + // Only when Job level fails with INSERT operationType can we roll back the unfinished instant. + // Task level failed retry, we should reuse the unfinished Instant with INSERT operationType @Test public void testPartialFailover() throws Exception { conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L); @@ -159,7 +161,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase { .assertNextEvent() // if the write task can not fetch any pending instant when starts up(the coordinator restarts), // it will send an event to the coordinator - .coordinatorFails() + .restartCoordinator() .subTaskFails(0, 2) // the subtask can not fetch the instant to write until a new instant is initialized .checkpointThrows(4, "Timeout(1000ms) while waiting for instant initialize") @@ -168,6 +170,13 @@ public class TestWriteCopyOnWrite extends TestWriteBase { // the last checkpoint instant was rolled back by subTaskFails(0, 2) // with EAGER cleaning strategy .assertNoEvent() + .checkpoint(4) + .assertNextEvent() + .subTaskFails(0, 4) + // the last checkpoint instant can not be rolled back by subTaskFails(0, 4) with INSERT write operationType + // because last data has been snapshot by checkpoint complete but instant has not been committed + // so we need re-commit it + .assertEmptyEvent() .end(); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java index cb144e92ba0..15634cc6e72 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java @@ -59,7 +59,7 @@ public class InsertFunctionWrapper<I> implements TestFunctionWrapper<I> { private final MockStreamingRuntimeContext runtimeContext; private final MockOperatorEventGateway gateway; private final MockOperatorCoordinatorContext coordinatorContext; - private final StreamWriteOperatorCoordinator coordinator; + private StreamWriteOperatorCoordinator coordinator; private final MockStateInitializationContext stateInitializationContext; private final boolean asyncClustering; @@ -152,6 +152,13 @@ public class InsertFunctionWrapper<I> implements TestFunctionWrapper<I> { this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext)); } + public void restartCoordinator() throws Exception { + this.coordinator.close(); + this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); + this.coordinator.start(); + this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext)); + } + public void checkpointFails(long checkpointId) { coordinator.notifyCheckpointAborted(checkpointId); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index cf801bb0d7d..c65e42f1521 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -71,7 +71,7 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> { private final MockStreamingRuntimeContext runtimeContext; private final MockOperatorEventGateway gateway; private final MockOperatorCoordinatorContext coordinatorContext; - private final StreamWriteOperatorCoordinator coordinator; + private StreamWriteOperatorCoordinator coordinator; private final MockStateInitializationContext stateInitializationContext; /** @@ -227,6 +227,13 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> { this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext)); } + public void restartCoordinator() throws Exception { + this.coordinator.close(); + this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); + this.coordinator.start(); + this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext)); + } + public void checkpointFails(long checkpointId) { coordinator.notifyCheckpointAborted(checkpointId); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java index 25593d8d2fd..faee168bf25 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java @@ -82,6 +82,14 @@ public interface TestFunctionWrapper<I> { throw new UnsupportedOperationException(); } + /** + * Triggers Job level fail, so the coordinator need re-create a new instance. + * @throws Exception + */ + default void restartCoordinator() throws Exception { + throw new UnsupportedOperationException(); + } + /** * Returns the operator coordinator. */ diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java index dd0db132bf8..0d668cfda5a 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java @@ -479,8 +479,8 @@ public class TestWriteBase { * Used to simulate the use case that the coordinator has not finished a new instant initialization, * while the write task fails intermittently. */ - public TestHarness coordinatorFails() throws Exception { - this.pipeline.coordinatorFails(); + public TestHarness restartCoordinator() throws Exception { + this.pipeline.restartCoordinator(); return this; }
