This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 26232dee301 [HUDI-6501] StreamWriteOperatorCoordinator should recommit
with starting heartbeat for lazy failed writes clean policy (#9135)
26232dee301 is described below
commit 26232dee301876add9ca7d8a5c7dfd4c4966595d
Author: Nicholas Jiang <[email protected]>
AuthorDate: Fri Jul 7 10:36:09 2023 +0800
[HUDI-6501] StreamWriteOperatorCoordinator should recommit with starting
heartbeat for lazy failed writes clean policy (#9135)
---
.../hudi/sink/StreamWriteOperatorCoordinator.java | 9 +++++++++
.../hudi/sink/TestStreamWriteOperatorCoordinator.java | 17 +++++++++++++++++
2 files changed, 26 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index df473584841..39f5dbd75d7 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -412,6 +412,10 @@ public class StreamWriteOperatorCoordinator
reset();
} else {
LOG.info("Recommit instant {}", instant);
+ // Recommit should start heartbeat for lazy failed writes clean policy
to avoid aborting for heartbeat expired.
+ if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
+ writeClient.getHeartbeatClient().start(instant);
+ }
commitInstant(instant);
}
// starts a new instant
@@ -595,6 +599,11 @@ public class StreamWriteOperatorCoordinator
return context;
}
+ @VisibleForTesting
+ public HoodieFlinkWriteClient getWriteClient() {
+ return writeClient;
+ }
+
@VisibleForTesting
public void setExecutor(NonThrownExecutor executor) throws Exception {
if (this.executor != null) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index d26dc587a3f..c4b8f9eb204 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -20,12 +20,14 @@ package org.apache.hudi.sink;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
@@ -183,6 +185,21 @@ public class TestStreamWriteOperatorCoordinator {
assertThat("Recommits the instant with partial uncommitted events",
lastCompleted, is(instant));
}
+ @Test
+ public void testRecommitWithLazyFailedWritesCleanPolicy() {
+
coordinator.getWriteClient().getConfig().setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY,
HoodieFailedWritesCleaningPolicy.LAZY.name());
+
assertTrue(coordinator.getWriteClient().getConfig().getFailedWritesCleanPolicy().isLazy());
+ final CompletableFuture<byte[]> future = new CompletableFuture<>();
+ coordinator.checkpointCoordinator(1, future);
+ String instant = coordinator.getInstant();
+ WriteMetadataEvent event1 = createOperatorEvent(0, instant, "par1", false,
0.2);
+ event1.setBootstrap(true);
+ WriteMetadataEvent event2 = WriteMetadataEvent.emptyBootstrap(1);
+ coordinator.handleEventFromOperator(0, event1);
+ coordinator.handleEventFromOperator(1, event2);
+ assertThat("Recommits the instant with lazy failed writes clean policy",
TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()), is(instant));
+ }
+
@Test
public void testHiveSyncInvoked() throws Exception {
// reset