This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3e788ba8b33 [HUDI-6134] Prevent clean run concurrently in flink (#8568)
3e788ba8b33 is described below
commit 3e788ba8b3351ea0ca001dde8132dba0ccddc7e5
Author: Bingeng Huang <[email protected]>
AuthorDate: Tue May 16 20:32:13 2023 +0800
[HUDI-6134] Prevent clean run concurrently in flink (#8568)
Co-authored-by: hbg <[email protected]>
---
.../src/main/java/org/apache/hudi/sink/CleanFunction.java | 11 +++++++++--
.../org/apache/hudi/sink/clustering/ClusteringCommitSink.java | 2 +-
.../org/apache/hudi/sink/compact/CompactionCommitSink.java | 2 +-
3 files changed, 11 insertions(+), 4 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
index 638fe9fdab2..b674df17715 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
@@ -51,7 +51,7 @@ public class CleanFunction<T> extends AbstractRichFunction
private NonThrownExecutor executor;
- private volatile boolean isCleaning;
+ protected volatile boolean isCleaning;
public CleanFunction(Configuration conf) {
this.conf = conf;
@@ -64,7 +64,14 @@ public class CleanFunction<T> extends AbstractRichFunction
this.executor =
NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
String instantTime = HoodieActiveTimeline.createNewInstantTime();
LOG.info(String.format("exec clean with instant time %s...", instantTime));
- executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning
finish");
+ executor.execute(() -> {
+ this.isCleaning = true;
+ try {
+ this.writeClient.clean(instantTime);
+ } finally {
+ this.isCleaning = false;
+ }
+ }, "wait for cleaning finish");
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
index 3a2416248a0..ce2988b915e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -194,7 +194,7 @@ public class ClusteringCommitSink extends
CleanFunction<ClusteringCommitEvent> {
TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(),
table, instant);
// whether to clean up the input base parquet files used for clustering
- if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
+ if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
LOG.info("Running inline clean");
this.writeClient.clean();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 0e9bc54f8fb..828aa3c4265 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -175,7 +175,7 @@ public class CompactionCommitSink extends
CleanFunction<CompactionCommitEvent> {
this.writeClient.commitCompaction(instant, metadata, Option.empty());
// Whether to clean up the old log file when compaction
- if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
+ if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
this.writeClient.clean();
}
}