This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6fe9f969289 [HUDI-6588] Fix duplicate fileId on TM partial-failover
and recovery (#9357)
6fe9f969289 is described below
commit 6fe9f969289c8dd9eaeb843a8f10623f03ab5b59
Author: voonhous <[email protected]>
AuthorDate: Fri Aug 4 16:37:42 2023 +0800
[HUDI-6588] Fix duplicate fileId on TM partial-failover and recovery (#9357)
Co-authored-by: danny0405 <[email protected]>
---
.../hudi/sink/append/AppendWriteFunction.java | 15 +++++++++++++
.../sink/common/AbstractStreamWriteFunction.java | 12 +----------
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 9 ++++++++
.../org/apache/hudi/sink/TestWriteMergeOnRead.java | 5 +++++
.../hudi/sink/TestWriteMergeOnReadWithCompact.java | 5 +++++
.../hudi/sink/utils/InsertFunctionWrapper.java | 25 +++++++++++++++++-----
6 files changed, 55 insertions(+), 16 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index e1db125731c..91c59341109 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -95,6 +95,21 @@ public class AppendWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
this.writeStatuses.clear();
}
+ protected void sendBootstrapEvent() {
+ int attemptId = getRuntimeContext().getAttemptNumber();
+ if (attemptId > 0) {
+ // either a partial or global failover, reuses the current inflight
instant
+ if (this.currentInstant != null) {
+ LOG.info("Recover task[{}] for instant [{}] with attemptId [{}]",
taskID, this.currentInstant, attemptId);
+ this.currentInstant = null;
+ return;
+ }
+ // the JM may have also been rebooted, sends the bootstrap event either
+ }
+
this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID));
+ LOG.info("Send bootstrap write metadata event to coordinator, task[{}].",
taskID);
+ }
+
// -------------------------------------------------------------------------
// GetterSetter
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index cbb8851a940..3bd19fa0699 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -213,17 +213,7 @@ public abstract class AbstractStreamWriteFunction<I>
}
}
- private void sendBootstrapEvent() {
- int attemptId = getRuntimeContext().getAttemptNumber();
- if (attemptId > 0) {
- // either a partial or global failover, reuses the current inflight
instant
- if (this.currentInstant != null) {
- LOG.info("Recover task[{}] for instant [{}] with attemptId [{}]",
taskID, this.currentInstant, attemptId);
- this.currentInstant = null;
- return;
- }
- // the JM may have also been rebooted, sends the bootstrap event either
- }
+ protected void sendBootstrapEvent() {
this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID));
LOG.info("Send bootstrap write metadata event to coordinator, task[{}].",
taskID);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index ea8b8b75ce5..90aa86cd353 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -141,6 +141,15 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.jobFailover()
.assertNextEvent()
.checkLastPendingInstantCompleted()
+ .end();
+ }
+
+ @Test
+ public void testPartialFailover() throws Exception {
+ conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
+ conf.setString(FlinkOptions.OPERATION, "INSERT");
+ // open the function and ingest data
+ preparePipeline()
// triggers subtask failure for multiple times to simulate partial
failover, for partial over,
// we allow the task to reuse the pending instant for data flushing,
no metadata event should be sent
.subTaskFails(0, 1)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index d968362ca2a..94214661ea8 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -143,6 +143,11 @@ public class TestWriteMergeOnRead extends
TestWriteCopyOnWrite {
.end();
}
+ @Test
+ public void testPartialFailover() {
+ // partial failover is only valid for append mode.
+ }
+
@Test
public void testInsertAppendMode() {
// append mode is only valid for cow table.
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index f6ee44db254..abc5679367a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -38,6 +38,11 @@ public class TestWriteMergeOnReadWithCompact extends
TestWriteCopyOnWrite {
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
}
+ @Test
+ public void testPartialFailover() {
+ // partial failover is only valid for append mode.
+ }
+
@Test
public void testInsertAppendMode() {
// append mode is only valid for cow table.
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
index 5778a0375e9..cb144e92ba0 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
@@ -38,7 +38,6 @@ import
org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import
org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
import
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -57,7 +56,7 @@ public class InsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
private final Configuration conf;
private final RowType rowType;
- private final StreamingRuntimeContext runtimeContext;
+ private final MockStreamingRuntimeContext runtimeContext;
private final MockOperatorEventGateway gateway;
private final MockOperatorCoordinatorContext coordinatorContext;
private final StreamWriteOperatorCoordinator coordinator;
@@ -102,6 +101,8 @@ public class InsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
setupWriteFunction();
+ // handle the bootstrap event
+ coordinator.handleEventFromOperator(0, getNextEvent());
if (asyncClustering) {
clusteringFunctionWrapper.openFunction();
@@ -145,6 +146,23 @@ public class InsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
}
}
+ public void coordinatorFails() throws Exception {
+ this.coordinator.close();
+ this.coordinator.start();
+ this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
+ }
+
+ public void checkpointFails(long checkpointId) {
+ coordinator.notifyCheckpointAborted(checkpointId);
+ }
+
+ public void subTaskFails(int taskID, int attemptNumber) throws Exception {
+ coordinator.subtaskFailed(taskID, new RuntimeException("Dummy exception"));
+ // reset the attempt number to simulate the task failover/retries
+ this.runtimeContext.setAttemptNumber(attemptNumber);
+ setupWriteFunction();
+ }
+
public StreamWriteOperatorCoordinator getCoordinator() {
return coordinator;
}
@@ -171,8 +189,5 @@ public class InsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
writeFunction.setOperatorEventGateway(gateway);
writeFunction.initializeState(this.stateInitializationContext);
writeFunction.open(conf);
-
- // handle the bootstrap event
- coordinator.handleEventFromOperator(0, getNextEvent());
}
}