This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new f3775a1c3 [#1588] improvement(server): Add exception handling for the 
thread pool when flushing events (#1589)
f3775a1c3 is described below

commit f3775a1c3aaad494017330f837f7694c850e6dd0
Author: RickyMa <[email protected]>
AuthorDate: Mon Mar 18 17:40:56 2024 +0800

    [#1588] improvement(server): Add exception handling for the thread pool 
when flushing events (#1589)
    
    ### What changes were proposed in this pull request?
    
    Add exception handling for the thread pool when flushing events.
    
    ### Why are the changes needed?
    
    For https://github.com/apache/incubator-uniffle/issues/1588.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs.
---
 .../java/org/apache/uniffle/server/DefaultFlushEventHandler.java | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
index 2ff85bab2..f32a96101 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -18,6 +18,7 @@
 package org.apache.uniffle.server;
 
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -214,7 +215,13 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
         ShuffleServerMetrics.gaugeFallbackFlushThreadPoolQueueSize.inc();
       }
 
-      dedicatedExecutor.execute(() -> handleEventAndUpdateMetrics(event, 
storage));
+      CompletableFuture.runAsync(
+              () -> handleEventAndUpdateMetrics(event, storage), 
dedicatedExecutor)
+          .exceptionally(
+              e -> {
+                LOG.error("Exception happened when handling event and updating 
metrics.", e);
+                return null;
+              });
     } catch (Exception e) {
       LOG.error("Exception happened when pushing events to dedicated event 
handler.", e);
     }

Reply via email to