This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9bdcee0 [HUDI-2959] Fix the thread leak of cleaning service (#4252)
9bdcee0 is described below
commit 9bdcee00c010fd6d7c817ee882550dd78e35ad91
Author: Danny Chan <[email protected]>
AuthorDate: Sat Dec 11 12:08:47 2021 +0800
[HUDI-2959] Fix the thread leak of cleaning service (#4252)
---
.../org/apache/hudi/async/HoodieAsyncService.java | 36 +++++-----------------
.../hudi/client/AbstractHoodieWriteClient.java | 6 +++-
.../apache/hudi/client/AsyncCleanerService.java | 14 ++++-----
.../apache/hudi/client/HoodieFlinkWriteClient.java | 6 +++-
.../java/org/apache/hudi/sink/CleanFunction.java | 7 +++++
5 files changed, 31 insertions(+), 38 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
index 85e0081..f57484d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
@@ -29,7 +29,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@@ -130,7 +129,7 @@ public abstract class HoodieAsyncService implements
Serializable {
future = res.getKey();
executor = res.getValue();
started = true;
- monitorThreads(onShutdownCallback);
+ shutdownCallback(onShutdownCallback);
}
/**
@@ -141,34 +140,15 @@ public abstract class HoodieAsyncService implements
Serializable {
protected abstract Pair<CompletableFuture, ExecutorService> startService();
/**
- * A monitor thread is started which would trigger a callback if the service
is shutdown.
+ * Add shutdown callback for the completable future.
*
- * @param onShutdownCallback
+ * @param callback The callback
*/
- private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) {
- LOG.info("Submitting monitor thread !!");
- Executors.newSingleThreadExecutor(r -> {
- Thread t = new Thread(r, "Monitor Thread");
- t.setDaemon(isRunInDaemonMode());
- return t;
- }).submit(() -> {
- boolean error = false;
- try {
- LOG.info("Monitoring thread(s) !!");
- future.get();
- } catch (ExecutionException ex) {
- LOG.error("Monitor noticed one or more threads failed. Requesting
graceful shutdown of other threads", ex);
- error = true;
- } catch (InterruptedException ie) {
- LOG.error("Got interrupted Monitoring threads", ie);
- error = true;
- } finally {
- // Mark as shutdown
- shutdown = true;
- if (null != onShutdownCallback) {
- onShutdownCallback.apply(error);
- }
- shutdown(false);
+ @SuppressWarnings("unchecked")
+ private void shutdownCallback(Function<Boolean, Boolean> callback) {
+ future.whenComplete((resp, error) -> {
+ if (null != callback) {
+ callback.apply(null != error);
}
});
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 358e307..18f93fa 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -424,7 +424,11 @@ public abstract class AbstractHoodieWriteClient<T extends
HoodieRecordPayload, I
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata =
TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
- this.asyncCleanerService =
AsyncCleanerService.startAsyncCleaningIfEnabled(this);
+ if (null == this.asyncCleanerService) {
+ this.asyncCleanerService =
AsyncCleanerService.startAsyncCleaningIfEnabled(this);
+ } else {
+ this.asyncCleanerService.start(null);
+ }
}
/**
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
index 2fd4251..a5a38f2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
@@ -37,28 +37,26 @@ class AsyncCleanerService extends HoodieAsyncService {
private static final Logger LOG =
LogManager.getLogger(AsyncCleanerService.class);
private final AbstractHoodieWriteClient writeClient;
- private final String cleanInstantTime;
private final transient ExecutorService executor =
Executors.newSingleThreadExecutor();
- protected AsyncCleanerService(AbstractHoodieWriteClient writeClient, String
cleanInstantTime) {
+ protected AsyncCleanerService(AbstractHoodieWriteClient writeClient) {
this.writeClient = writeClient;
- this.cleanInstantTime = cleanInstantTime;
}
@Override
protected Pair<CompletableFuture, ExecutorService> startService() {
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ LOG.info("Auto cleaning is enabled. Running cleaner async to write
operation at instant time " + instantTime);
return Pair.of(CompletableFuture.supplyAsync(() -> {
- writeClient.clean(cleanInstantTime);
+ writeClient.clean(instantTime);
return true;
- }), executor);
+ }, executor), executor);
}
public static AsyncCleanerService
startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) {
AsyncCleanerService asyncCleanerService = null;
if (writeClient.getConfig().isAutoClean() &&
writeClient.getConfig().isAsyncClean()) {
- String instantTime = HoodieActiveTimeline.createNewInstantTime();
- LOG.info("Auto cleaning is enabled. Running cleaner async to write
operation at instant time " + instantTime);
- asyncCleanerService = new AsyncCleanerService(writeClient, instantTime);
+ asyncCleanerService = new AsyncCleanerService(writeClient);
asyncCleanerService.start(null);
} else {
LOG.info("Async auto cleaning is not enabled. Not running cleaner now");
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 374dd12..9a56d6d 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -281,7 +281,11 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
* checkpoint finish.
*/
public void startAsyncCleaning() {
- this.asyncCleanerService =
AsyncCleanerService.startAsyncCleaningIfEnabled(this);
+ if (this.asyncCleanerService == null) {
+ this.asyncCleanerService =
AsyncCleanerService.startAsyncCleaningIfEnabled(this);
+ } else {
+ this.asyncCleanerService.start(null);
+ }
}
/**
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
index 77d6630..bb79006 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
@@ -98,4 +98,11 @@ public class CleanFunction<T> extends AbstractRichFunction
public void initializeState(FunctionInitializationContext context) throws
Exception {
// no operation
}
+
+ @Override
+ public void close() throws Exception {
+ if (this.writeClient != null) {
+ this.writeClient.close();
+ }
+ }
}