This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 26232dee301 [HUDI-6501] StreamWriteOperatorCoordinator should recommit 
with starting heartbeat for lazy failed writes clean policy (#9135)
26232dee301 is described below

commit 26232dee301876add9ca7d8a5c7dfd4c4966595d
Author: Nicholas Jiang <[email protected]>
AuthorDate: Fri Jul 7 10:36:09 2023 +0800

    [HUDI-6501] StreamWriteOperatorCoordinator should recommit with starting 
heartbeat for lazy failed writes clean policy (#9135)
---
 .../hudi/sink/StreamWriteOperatorCoordinator.java       |  9 +++++++++
 .../hudi/sink/TestStreamWriteOperatorCoordinator.java   | 17 +++++++++++++++++
 2 files changed, 26 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index df473584841..39f5dbd75d7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -412,6 +412,10 @@ public class StreamWriteOperatorCoordinator
         reset();
       } else {
         LOG.info("Recommit instant {}", instant);
+        // Recommit should start heartbeat for lazy failed writes clean policy 
to avoid aborting for heartbeat expired.
+        if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
+          writeClient.getHeartbeatClient().start(instant);
+        }
         commitInstant(instant);
       }
       // starts a new instant
@@ -595,6 +599,11 @@ public class StreamWriteOperatorCoordinator
     return context;
   }
 
+  @VisibleForTesting
+  public HoodieFlinkWriteClient getWriteClient() {
+    return writeClient;
+  }
+
   @VisibleForTesting
   public void setExecutor(NonThrownExecutor executor) throws Exception {
     if (this.executor != null) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index d26dc587a3f..c4b8f9eb204 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -20,12 +20,14 @@ package org.apache.hudi.sink;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
@@ -183,6 +185,21 @@ public class TestStreamWriteOperatorCoordinator {
     assertThat("Recommits the instant with partial uncommitted events", 
lastCompleted, is(instant));
   }
 
+  @Test
+  public void testRecommitWithLazyFailedWritesCleanPolicy() {
+    
coordinator.getWriteClient().getConfig().setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY,
 HoodieFailedWritesCleaningPolicy.LAZY.name());
+    
assertTrue(coordinator.getWriteClient().getConfig().getFailedWritesCleanPolicy().isLazy());
+    final CompletableFuture<byte[]> future = new CompletableFuture<>();
+    coordinator.checkpointCoordinator(1, future);
+    String instant = coordinator.getInstant();
+    WriteMetadataEvent event1 = createOperatorEvent(0, instant, "par1", false, 
0.2);
+    event1.setBootstrap(true);
+    WriteMetadataEvent event2 = WriteMetadataEvent.emptyBootstrap(1);
+    coordinator.handleEventFromOperator(0, event1);
+    coordinator.handleEventFromOperator(1, event2);
+    assertThat("Recommits the instant with lazy failed writes clean policy", 
TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()), is(instant));
+  }
+
   @Test
   public void testHiveSyncInvoked() throws Exception {
     // reset

Reply via email to