This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c744848c59 [HUDI-4366] Synchronous cleaning for flink bounded source 
(#6051)
c744848c59 is described below

commit c744848c5915c6e8b50d77c2502e772e76107ca7
Author: Danny Chan <[email protected]>
AuthorDate: Fri Jul 8 09:55:07 2022 +0800

    [HUDI-4366] Synchronous cleaning for flink bounded source (#6051)
---
 .../main/java/org/apache/hudi/configuration/OptionsResolver.java  | 8 ++++++++
 .../src/main/java/org/apache/hudi/sink/CleanFunction.java         | 4 ++--
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 4cfa0bc92a..64bd91f480 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -164,4 +164,12 @@ public class OptionsResolver {
   public static boolean sortClusteringEnabled(Configuration conf) {
     return 
!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS));
   }
+
+  /**
+   * Returns whether the operation is INSERT OVERWRITE (table or partition).
+   */
+  public static boolean isInsertOverwrite(Configuration conf) {
+    return 
conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value())
+        || 
conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE.value());
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
index 65f07d7c7a..1c827517ff 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
@@ -19,9 +19,9 @@
 package org.apache.hudi.sink;
 
 import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -65,7 +65,7 @@ public class CleanFunction<T> extends AbstractRichFunction
       this.writeClient = StreamerUtil.createWriteClient(conf, 
getRuntimeContext());
       this.executor = 
NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
 
-      if 
(conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value()))
 {
+      if (OptionsResolver.isInsertOverwrite(conf)) {
         String instantTime = HoodieActiveTimeline.createNewInstantTime();
         LOG.info(String.format("exec sync clean with instant time %s...", 
instantTime));
         executor.execute(() -> writeClient.clean(instantTime), "wait for sync 
cleaning finish");

Reply via email to