This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0d7d3aa9dd [HUDI-4880] Fix corrupted parquet file issue left over by
cancelled compaction task (#6733)
0d7d3aa9dd is described below
commit 0d7d3aa9ddd8acde8dbdc1e91fd4842273664705
Author: Teng <[email protected]>
AuthorDate: Thu Nov 3 10:28:27 2022 +0800
[HUDI-4880] Fix corrupted parquet file issue left over by cancelled
compaction task (#6733)
---
.../org/apache/hudi/sink/clustering/ClusteringOperator.java | 5 ++++-
.../java/org/apache/hudi/sink/compact/CompactFunction.java | 12 ++++++++++++
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 228b924fc4..837b06a89a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -180,7 +180,10 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
}
@Override
- public void close() {
+ public void close() throws Exception {
+ if (null != this.executor) {
+ this.executor.close();
+ }
if (this.writeClient != null) {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
index 14c5c16941..f14bc8aa61 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
@@ -133,4 +133,16 @@ public class CompactFunction extends
ProcessFunction<CompactionPlanEvent, Compac
public void setExecutor(NonThrownExecutor executor) {
this.executor = executor;
}
+
+ @Override
+ public void close() throws Exception {
+ if (null != this.executor) {
+ this.executor.close();
+ }
+ if (null != this.writeClient) {
+ this.writeClient.cleanHandlesGracefully();
+ this.writeClient.close();
+ this.writeClient = null;
+ }
+ }
}