This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e788ba8b33 [HUDI-6134] Prevent clean run concurrently in flink (#8568)
3e788ba8b33 is described below

commit 3e788ba8b3351ea0ca001dde8132dba0ccddc7e5
Author: Bingeng Huang <[email protected]>
AuthorDate: Tue May 16 20:32:13 2023 +0800

    [HUDI-6134] Prevent clean run concurrently in flink (#8568)
    
    Co-authored-by: hbg <[email protected]>
---
 .../src/main/java/org/apache/hudi/sink/CleanFunction.java     | 11 +++++++++--
 .../org/apache/hudi/sink/clustering/ClusteringCommitSink.java |  2 +-
 .../org/apache/hudi/sink/compact/CompactionCommitSink.java    |  2 +-
 3 files changed, 11 insertions(+), 4 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
index 638fe9fdab2..b674df17715 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
@@ -51,7 +51,7 @@ public class CleanFunction<T> extends AbstractRichFunction
 
   private NonThrownExecutor executor;
 
-  private volatile boolean isCleaning;
+  protected volatile boolean isCleaning;
 
   public CleanFunction(Configuration conf) {
     this.conf = conf;
@@ -64,7 +64,14 @@ public class CleanFunction<T> extends AbstractRichFunction
     this.executor = 
NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
     String instantTime = HoodieActiveTimeline.createNewInstantTime();
     LOG.info(String.format("exec clean with instant time %s...", instantTime));
-    executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning 
finish");
+    executor.execute(() -> {
+      this.isCleaning = true;
+      try {
+        this.writeClient.clean(instantTime);
+      } finally {
+        this.isCleaning = false;
+      }
+    }, "wait for cleaning finish");
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
index 3a2416248a0..ce2988b915e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -194,7 +194,7 @@ public class ClusteringCommitSink extends 
CleanFunction<ClusteringCommitEvent> {
         TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), 
table, instant);
 
     // whether to clean up the input base parquet files used for clustering
-    if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
+    if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
       LOG.info("Running inline clean");
       this.writeClient.clean();
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 0e9bc54f8fb..828aa3c4265 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -175,7 +175,7 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
     this.writeClient.commitCompaction(instant, metadata, Option.empty());
 
     // Whether to clean up the old log file when compaction
-    if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
+    if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
       this.writeClient.clean();
     }
   }

Reply via email to