This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new f3775a1c3 [#1588] improvement(server): Add exception handling for the
thread pool when flushing events (#1589)
f3775a1c3 is described below
commit f3775a1c3aaad494017330f837f7694c850e6dd0
Author: RickyMa <[email protected]>
AuthorDate: Mon Mar 18 17:40:56 2024 +0800
[#1588] improvement(server): Add exception handling for the thread pool
when flushing events (#1589)
### What changes were proposed in this pull request?
Add exception handling for the thread pool when flushing events.
### Why are the changes needed?
For https://github.com/apache/incubator-uniffle/issues/1588.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
.../java/org/apache/uniffle/server/DefaultFlushEventHandler.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
index 2ff85bab2..f32a96101 100644
---
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.server;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -214,7 +215,13 @@ public class DefaultFlushEventHandler implements
FlushEventHandler {
ShuffleServerMetrics.gaugeFallbackFlushThreadPoolQueueSize.inc();
}
- dedicatedExecutor.execute(() -> handleEventAndUpdateMetrics(event,
storage));
+ CompletableFuture.runAsync(
+ () -> handleEventAndUpdateMetrics(event, storage),
dedicatedExecutor)
+ .exceptionally(
+ e -> {
+ LOG.error("Exception happened when handling event and updating
metrics.", e);
+ return null;
+ });
} catch (Exception e) {
LOG.error("Exception happened when pushing events to dedicated event
handler.", e);
}