This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cd1924bf64 [HUDI-6944] Fix flink boostrap concurrency issue (#9867)
8cd1924bf64 is described below

commit 8cd1924bf64af03c4ccb0be00cc8aa012375b40c
Author: YueZhang <[email protected]>
AuthorDate: Mon Oct 16 19:03:07 2023 +0800

    [HUDI-6944] Fix flink boostrap concurrency issue (#9867)
---
 .../hudi/sink/StreamWriteOperatorCoordinator.java  | 30 ++++++++++------------
 1 file changed, 14 insertions(+), 16 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index fcb2b1dc295..34fb5cf3c67 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -409,23 +409,21 @@ public class StreamWriteOperatorCoordinator
    */
   private void initInstant(String instant) {
     HoodieTimeline completedTimeline = 
this.metaClient.getActiveTimeline().filterCompletedInstants();
-    executor.execute(() -> {
-      if (instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT) || 
completedTimeline.containsInstant(instant)) {
-        // the last instant committed successfully
-        reset();
-      } else {
-        LOG.info("Recommit instant {}", instant);
-        // Recommit should start heartbeat for lazy failed writes clean policy 
to avoid aborting for heartbeat expired.
-        if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
-          writeClient.getHeartbeatClient().start(instant);
-        }
-        commitInstant(instant);
+    if (instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT) || 
completedTimeline.containsInstant(instant)) {
+      // the last instant committed successfully
+      reset();
+    } else {
+      LOG.info("Recommit instant {}", instant);
+      // Recommit should start heartbeat for lazy failed writes clean policy 
to avoid aborting for heartbeat expired.
+      if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
+        writeClient.getHeartbeatClient().start(instant);
       }
-      // starts a new instant
-      startInstant();
-      // upgrade downgrade
-      this.writeClient.upgradeDowngrade(this.instant, this.metaClient);
-    }, "initialize instant %s", instant);
+      commitInstant(instant);
+    }
+    // starts a new instant
+    startInstant();
+    // upgrade downgrade
+    this.writeClient.upgradeDowngrade(this.instant, this.metaClient);
   }
 
   private void handleBootstrapEvent(WriteMetadataEvent event) {

Reply via email to