This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 89651c9  [HUDI-2421] Catch the throwable when scheduling the cleaning 
task for flink writer (#3650)
89651c9 is described below

commit 89651c94085f3f775328e5fbc2113aa9d1a6a962
Author: Danny Chan <[email protected]>
AuthorDate: Mon Sep 13 20:43:44 2021 +0800

    [HUDI-2421] Catch the throwable when scheduling the cleaning task for flink 
writer (#3650)
---
 hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
index 1ca593f..e75fad5 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
@@ -81,8 +81,13 @@ public class CleanFunction<T> extends AbstractRichFunction
   @Override
   public void snapshotState(FunctionSnapshotContext context) throws Exception {
     if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
-      this.writeClient.startAsyncCleaning();
-      this.isCleaning = true;
+      try {
+        this.writeClient.startAsyncCleaning();
+        this.isCleaning = true;
+      } catch (Throwable throwable) {
+        // catch the exception to not affect the normal checkpointing
+        LOG.warn("Error while start async cleaning", throwable);
+      }
     }
   }
 

Reply via email to