This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new f43f66fd4 [#2707] fix(server): Catch up on any failures in
`calcTopNShuffleDataSize` (#2708)
f43f66fd4 is described below
commit f43f66fd4b88033689d06d5192e4a612046ef31c
Author: xianjingfeng <[email protected]>
AuthorDate: Tue Jan 13 14:17:26 2026 +0800
[#2707] fix(server): Catch up on any failures in `calcTopNShuffleDataSize`
(#2708)
### What changes were proposed in this pull request?
1. Wrap calcTopNShuffleDataSize in error handling
2. Use `scheduleWithFixedDelay` instand of `scheduleAtFixedRate`
### Why are the changes needed?
Fix: #2707
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI
---
.../apache/uniffle/common/util/ThreadUtils.java | 27 ++++++++++++++++++++--
.../server/TopNShuffleDataSizeOfAppCalcTask.java | 9 +++++---
2 files changed, 31 insertions(+), 5 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java
index d84e6dc23..9f3c39098 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java
@@ -54,6 +54,10 @@ public class ThreadUtils {
return new DefaultThreadFactory(threadPoolPrefix, true);
}
+ public static ThreadFactory getExceptionCatchingThreadFactory(String
factoryName) {
+ return new ExceptionCatchingThreadFactory(getThreadFactory(factoryName));
+ }
+
/**
* Encapsulation of the ScheduledExecutorService
*
@@ -62,8 +66,12 @@ public class ThreadUtils {
*/
public static ScheduledExecutorService
getDaemonSingleThreadScheduledExecutor(
String factoryName) {
- ScheduledThreadPoolExecutor executor =
- new ScheduledThreadPoolExecutor(1, getThreadFactory(factoryName));
+ return
getDaemonSingleThreadScheduledExecutor(getThreadFactory(factoryName));
+ }
+
+ public static ScheduledExecutorService
getDaemonSingleThreadScheduledExecutor(
+ ThreadFactory threadFactory) {
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
threadFactory);
executor.setRemoveOnCancelPolicy(true);
return executor;
}
@@ -201,4 +209,19 @@ public class ThreadUtils {
builder.append(info + "\n");
}
}
+
+ private static class ExceptionCatchingThreadFactory implements ThreadFactory
{
+ private final ThreadFactory delegate;
+
+ ExceptionCatchingThreadFactory(ThreadFactory delegate) {
+ this.delegate = delegate;
+ }
+
+ public Thread newThread(final Runnable runnable) {
+ Thread t = delegate.newThread(runnable);
+ t.setUncaughtExceptionHandler(
+ (t1, e) -> LOGGER.error("Thread {} threw an Exception.", t1, e));
+ return t;
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
index 1b22c64fc..97ffb3cf4 100644
---
a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
+++
b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
@@ -19,7 +19,6 @@ package org.apache.uniffle.server;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -28,6 +27,8 @@ import io.prometheus.client.Gauge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.util.ThreadUtils;
+
public class TopNShuffleDataSizeOfAppCalcTask {
private static final Logger LOG =
LoggerFactory.getLogger(TopNShuffleDataSizeOfAppCalcTask.class);
@@ -51,7 +52,9 @@ public class TopNShuffleDataSizeOfAppCalcTask {
this.gaugeInMemoryDataSize =
ShuffleServerMetrics.gaugeInMemoryDataSizeUsage;
this.gaugeOnLocalFileDataSize =
ShuffleServerMetrics.gaugeOnDiskDataSizeUsage;
this.gaugeOnHadoopDataSize =
ShuffleServerMetrics.gaugeOnHadoopDataSizeUsage;
- this.scheduler = Executors.newScheduledThreadPool(1);
+ this.scheduler =
+ ThreadUtils.getDaemonSingleThreadScheduledExecutor(
+
ThreadUtils.getExceptionCatchingThreadFactory("topN-shuffleDataSize-calc"));
}
private void calcTopNShuffleDataSize() {
@@ -127,7 +130,7 @@ public class TopNShuffleDataSizeOfAppCalcTask {
public void start() {
LOG.info("TopNShuffleDataSizeOfAppCalcTask start schedule.");
- this.scheduler.scheduleAtFixedRate(
+ this.scheduler.scheduleWithFixedDelay(
this::calcTopNShuffleDataSize,
0,
topNShuffleDataTaskRefreshInterval,