This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new f43f66fd4 [#2707] fix(server): Catch up on any failures in 
`calcTopNShuffleDataSize` (#2708)
f43f66fd4 is described below

commit f43f66fd4b88033689d06d5192e4a612046ef31c
Author: xianjingfeng <[email protected]>
AuthorDate: Tue Jan 13 14:17:26 2026 +0800

    [#2707] fix(server): Catch up on any failures in `calcTopNShuffleDataSize` 
(#2708)
    
    ### What changes were proposed in this pull request?
    
    1. Wrap calcTopNShuffleDataSize in error handling
    2. Use `scheduleWithFixedDelay` instand of `scheduleAtFixedRate`
    
    ### Why are the changes needed?
    
    Fix: #2707
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI
---
 .../apache/uniffle/common/util/ThreadUtils.java    | 27 ++++++++++++++++++++--
 .../server/TopNShuffleDataSizeOfAppCalcTask.java   |  9 +++++---
 2 files changed, 31 insertions(+), 5 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java
index d84e6dc23..9f3c39098 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java
@@ -54,6 +54,10 @@ public class ThreadUtils {
     return new DefaultThreadFactory(threadPoolPrefix, true);
   }
 
+  public static ThreadFactory getExceptionCatchingThreadFactory(String 
factoryName) {
+    return new ExceptionCatchingThreadFactory(getThreadFactory(factoryName));
+  }
+
   /**
    * Encapsulation of the ScheduledExecutorService
    *
@@ -62,8 +66,12 @@ public class ThreadUtils {
    */
   public static ScheduledExecutorService 
getDaemonSingleThreadScheduledExecutor(
       String factoryName) {
-    ScheduledThreadPoolExecutor executor =
-        new ScheduledThreadPoolExecutor(1, getThreadFactory(factoryName));
+    return 
getDaemonSingleThreadScheduledExecutor(getThreadFactory(factoryName));
+  }
+
+  public static ScheduledExecutorService 
getDaemonSingleThreadScheduledExecutor(
+      ThreadFactory threadFactory) {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, 
threadFactory);
     executor.setRemoveOnCancelPolicy(true);
     return executor;
   }
@@ -201,4 +209,19 @@ public class ThreadUtils {
       builder.append(info + "\n");
     }
   }
+
+  private static class ExceptionCatchingThreadFactory implements ThreadFactory 
{
+    private final ThreadFactory delegate;
+
+    ExceptionCatchingThreadFactory(ThreadFactory delegate) {
+      this.delegate = delegate;
+    }
+
+    public Thread newThread(final Runnable runnable) {
+      Thread t = delegate.newThread(runnable);
+      t.setUncaughtExceptionHandler(
+          (t1, e) -> LOGGER.error("Thread {} threw an Exception.", t1, e));
+      return t;
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
 
b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
index 1b22c64fc..97ffb3cf4 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
@@ -19,7 +19,6 @@ package org.apache.uniffle.server;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -28,6 +27,8 @@ import io.prometheus.client.Gauge;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.util.ThreadUtils;
+
 public class TopNShuffleDataSizeOfAppCalcTask {
   private static final Logger LOG = 
LoggerFactory.getLogger(TopNShuffleDataSizeOfAppCalcTask.class);
 
@@ -51,7 +52,9 @@ public class TopNShuffleDataSizeOfAppCalcTask {
     this.gaugeInMemoryDataSize = 
ShuffleServerMetrics.gaugeInMemoryDataSizeUsage;
     this.gaugeOnLocalFileDataSize = 
ShuffleServerMetrics.gaugeOnDiskDataSizeUsage;
     this.gaugeOnHadoopDataSize = 
ShuffleServerMetrics.gaugeOnHadoopDataSizeUsage;
-    this.scheduler = Executors.newScheduledThreadPool(1);
+    this.scheduler =
+        ThreadUtils.getDaemonSingleThreadScheduledExecutor(
+            
ThreadUtils.getExceptionCatchingThreadFactory("topN-shuffleDataSize-calc"));
   }
 
   private void calcTopNShuffleDataSize() {
@@ -127,7 +130,7 @@ public class TopNShuffleDataSizeOfAppCalcTask {
 
   public void start() {
     LOG.info("TopNShuffleDataSizeOfAppCalcTask start schedule.");
-    this.scheduler.scheduleAtFixedRate(
+    this.scheduler.scheduleWithFixedDelay(
         this::calcTopNShuffleDataSize,
         0,
         topNShuffleDataTaskRefreshInterval,

Reply via email to