This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d7d3aa9dd [HUDI-4880] Fix corrupted parquet file issue left over by 
cancelled compaction task (#6733)
0d7d3aa9dd is described below

commit 0d7d3aa9ddd8acde8dbdc1e91fd4842273664705
Author: Teng <[email protected]>
AuthorDate: Thu Nov 3 10:28:27 2022 +0800

    [HUDI-4880] Fix corrupted parquet file issue left over by cancelled 
compaction task (#6733)
---
 .../org/apache/hudi/sink/clustering/ClusteringOperator.java  |  5 ++++-
 .../java/org/apache/hudi/sink/compact/CompactFunction.java   | 12 ++++++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 228b924fc4..837b06a89a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -180,7 +180,10 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
   }
 
   @Override
-  public void close() {
+  public void close() throws Exception {
+    if (null != this.executor) {
+      this.executor.close();
+    }
     if (this.writeClient != null) {
       this.writeClient.cleanHandlesGracefully();
       this.writeClient.close();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
index 14c5c16941..f14bc8aa61 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
@@ -133,4 +133,16 @@ public class CompactFunction extends 
ProcessFunction<CompactionPlanEvent, Compac
   public void setExecutor(NonThrownExecutor executor) {
     this.executor = executor;
   }
+
+  @Override
+  public void close() throws Exception {
+    if (null != this.executor) {
+      this.executor.close();
+    }
+    if (null != this.writeClient) {
+      this.writeClient.cleanHandlesGracefully();
+      this.writeClient.close();
+      this.writeClient = null;
+    }
+  }
 }

Reply via email to