This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9737d5790 [#1564] fix(server): disk health check invalid when hang
(#1568)
9737d5790 is described below
commit 9737d5790218cfc9c8be6439fd48bb7af98430c3
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Mar 13 11:24:34 2024 +0800
[#1564] fix(server): disk health check invalid when hang (#1568)
### What changes were proposed in this pull request?
Using the completefuture to check whether the disk checker timeout. If it
is, it will directly mark disk corrupted
### Why are the changes needed?
Fix: #1564
### Does this PR introduce _any_ user-facing change?
Yes. Introduing the conf: `rss.server.health.checker.
localStorageExecutionTimeoutMS`. unit is ms, default value is 1minute
### How was this patch tested?
unit tests. But I will apply this to our online env.
---
.../common/future/CompletableFutureExtension.java | 88 ++++++++++++++++++++
.../future/CompletableFutureExtensionTest.java | 71 ++++++++++++++++
.../apache/uniffle/server/LocalStorageChecker.java | 95 +++++++++++++++-------
.../apache/uniffle/server/ShuffleServerConf.java | 7 ++
.../uniffle/server/LocalStorageCheckerTest.java | 69 ++++++++++++++++
5 files changed, 302 insertions(+), 28 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/future/CompletableFutureExtension.java
b/common/src/main/java/org/apache/uniffle/common/future/CompletableFutureExtension.java
new file mode 100644
index 000000000..22948c89d
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/future/CompletableFutureExtension.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.future;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+
+public class CompletableFutureExtension {
+ public static <T> CompletableFuture<T> orTimeout(
+ CompletableFuture<T> future, long timeout, TimeUnit unit) {
+ if (future.isDone()) {
+ return future;
+ }
+
+ return future.whenComplete(new Canceller(Delayer.delay(new
Timeout(future), timeout, unit)));
+ }
+
+ static final class Timeout implements Runnable {
+ final CompletableFuture<?> future;
+
+ Timeout(CompletableFuture<?> future) {
+ this.future = future;
+ }
+
+ public void run() {
+ if (null != future && !future.isDone()) {
+ future.completeExceptionally(new TimeoutException());
+ }
+ }
+ }
+
+ static final class Canceller implements BiConsumer<Object, Throwable> {
+ final Future<?> future;
+
+ Canceller(Future<?> future) {
+ this.future = future;
+ }
+
+ public void accept(Object ignore, Throwable ex) {
+ if (null == ex && null != future && !future.isDone()) {
+ future.cancel(false);
+ }
+ }
+ }
+
+ static final class Delayer {
+ static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit
unit) {
+ return delayer.schedule(command, delay, unit);
+ }
+
+ static final class DaemonThreadFactory implements ThreadFactory {
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setDaemon(true);
+ t.setName("CompletableFutureExtensionDelayScheduler");
+ return t;
+ }
+ }
+
+ static final ScheduledThreadPoolExecutor delayer;
+
+ static {
+ delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
+ delayer.setRemoveOnCancelPolicy(true);
+ }
+ }
+}
diff --git
a/common/src/test/java/org/apache/uniffle/common/future/CompletableFutureExtensionTest.java
b/common/src/test/java/org/apache/uniffle/common/future/CompletableFutureExtensionTest.java
new file mode 100644
index 000000000..ec120003b
--- /dev/null
+++
b/common/src/test/java/org/apache/uniffle/common/future/CompletableFutureExtensionTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.future;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CompletableFutureExtensionTest {
+
+ @Test
+ public void timeoutExceptionTest() throws ExecutionException,
InterruptedException {
+ // case1
+ CompletableFuture<Integer> future =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return 10;
+ });
+
+ CompletableFuture<Integer> wrap =
+ CompletableFutureExtension.orTimeout(future, 1, TimeUnit.SECONDS);
+ try {
+ wrap.get();
+ fail();
+ } catch (Exception e) {
+ if (!(e instanceof ExecutionException) || !(e.getCause() instanceof
TimeoutException)) {
+ fail();
+ }
+ }
+
+ // case2
+ future =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return 10;
+ });
+ wrap = CompletableFutureExtension.orTimeout(future, 3, TimeUnit.SECONDS);
+ assertEquals(10, wrap.get());
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
index ac30baff9..0554035d6 100644
--- a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
+++ b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
@@ -21,8 +21,15 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -34,6 +41,8 @@ import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.future.CompletableFutureExtension;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -48,8 +57,10 @@ public class LocalStorageChecker extends Checker {
private final double diskMaxUsagePercentage;
private final double diskRecoveryUsagePercentage;
private final double minStorageHealthyPercentage;
- private final List<StorageInfo> storageInfos = Lists.newArrayList();
+ protected List<StorageInfo> storageInfos = Lists.newArrayList();
private boolean isHealthy = true;
+ private ExecutorService workers;
+ private final long diskCheckerExecutionTimeoutMs;
public LocalStorageChecker(ShuffleServerConf conf, List<LocalStorage>
storages) {
super(conf);
@@ -72,6 +83,10 @@ public class LocalStorageChecker extends Checker {
conf.getDouble(ShuffleServerConf.HEALTH_STORAGE_RECOVERY_USAGE_PERCENTAGE);
this.minStorageHealthyPercentage =
conf.getDouble(ShuffleServerConf.HEALTH_MIN_STORAGE_PERCENTAGE);
+
+ this.diskCheckerExecutionTimeoutMs =
+
conf.getLong(ShuffleServerConf.HEALTH_CHECKER_LOCAL_STORAGE_EXECUTE_TIMEOUT);
+ this.workers = Executors.newFixedThreadPool(basePaths.size());
}
@Override
@@ -81,36 +96,60 @@ public class LocalStorageChecker extends Checker {
AtomicLong wholeDiskUsedSpace = new AtomicLong(0L);
AtomicLong serviceUsedSpace = new AtomicLong(0L);
AtomicInteger corruptedDirs = new AtomicInteger(0);
- CountDownLatch cdl = new CountDownLatch(storageInfos.size());
- storageInfos
- .parallelStream()
- .forEach(
- storageInfo -> {
- if (!storageInfo.checkStorageReadAndWrite()) {
- storageInfo.markCorrupted();
- corruptedDirs.incrementAndGet();
- cdl.countDown();
- return;
- }
- long total = getTotalSpace(storageInfo.storageDir);
- long free = getFreeSpace(storageInfo.storageDir);
+ Map<StorageInfo, CompletableFuture<Void>> futureMap = new HashMap<>();
+ for (StorageInfo storageInfo : storageInfos) {
+ CompletableFuture<Void> storageCheckFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ if (!storageInfo.checkStorageReadAndWrite()) {
+ storageInfo.markCorrupted();
+ corruptedDirs.incrementAndGet();
+ return null;
+ }
- totalSpace.addAndGet(total);
- wholeDiskUsedSpace.addAndGet(total - free);
-
serviceUsedSpace.addAndGet(getServiceUsedSpace(storageInfo.storageDir));
+ long total = getTotalSpace(storageInfo.storageDir);
+ long free = getFreeSpace(storageInfo.storageDir);
- storageInfo.updateStorageFreeSpace(free);
- if (storageInfo.checkIsSpaceEnough(total, free)) {
- num.incrementAndGet();
- }
- cdl.countDown();
- });
- try {
- cdl.await();
- } catch (InterruptedException e) {
- LOG.error("Failed to check local storage!");
+ totalSpace.addAndGet(total);
+ wholeDiskUsedSpace.addAndGet(total - free);
+
serviceUsedSpace.addAndGet(getServiceUsedSpace(storageInfo.storageDir));
+
+ storageInfo.updateStorageFreeSpace(free);
+ if (storageInfo.checkIsSpaceEnough(total, free)) {
+ num.incrementAndGet();
+ }
+ return null;
+ },
+ workers);
+
+ futureMap.put(
+ storageInfo,
+ CompletableFutureExtension.orTimeout(
+ storageCheckFuture, diskCheckerExecutionTimeoutMs,
TimeUnit.MILLISECONDS));
+ }
+
+ for (Map.Entry<StorageInfo, CompletableFuture<Void>> entry :
futureMap.entrySet()) {
+ StorageInfo storageInfo = entry.getKey();
+ CompletableFuture<Void> f = entry.getValue();
+
+ try {
+ f.get();
+ } catch (Exception e) {
+ if (e instanceof ExecutionException) {
+ if (e.getCause() instanceof TimeoutException) {
+ storageInfo.markCorrupted();
+ LOG.error(
+ "Timeout of checking local storage: {}. This should not happen
and mark this disk corrupted.",
+ storageInfo.storage.getBasePath());
+ continue;
+ }
+ }
+
+ throw new RssException(e);
+ }
}
+
ShuffleServerMetrics.gaugeLocalStorageTotalSpace.set(totalSpace.get());
ShuffleServerMetrics.gaugeLocalStorageWholeDiskUsedSpace.set(wholeDiskUsedSpace.get());
ShuffleServerMetrics.gaugeLocalStorageServiceUsedSpace.set(serviceUsedSpace.get());
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index f968a1722..a6cd26547 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -306,6 +306,13 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(5000L)
.withDescription("The health script file execute timeout ms.");
+ public static final ConfigOption<Long>
HEALTH_CHECKER_LOCAL_STORAGE_EXECUTE_TIMEOUT =
+
ConfigOptions.key("rss.server.health.checker.localStorageExecutionTimeoutMS")
+ .longType()
+ .defaultValue(1000 * 60L)
+ .withDescription(
+ "The health checker for LocalStorageChecker execution timeout
(Unit: ms). Default value is 1min");
+
public static final ConfigOption<Double>
SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE =
ConfigOptions.key("rss.server.memory.shuffle.lowWaterMark.percentage")
.doubleType()
diff --git
a/server/src/test/java/org/apache/uniffle/server/LocalStorageCheckerTest.java
b/server/src/test/java/org/apache/uniffle/server/LocalStorageCheckerTest.java
index e5eaba11d..9336ce956 100644
---
a/server/src/test/java/org/apache/uniffle/server/LocalStorageCheckerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/LocalStorageCheckerTest.java
@@ -20,12 +20,81 @@ package org.apache.uniffle.server;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
+import org.apache.uniffle.common.StorageType;
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.storage.common.LocalStorage;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
public class LocalStorageCheckerTest {
+ @BeforeAll
+ public static void setup() {
+ ShuffleServerMetrics.register();
+ }
+
+ @AfterAll
+ public static void clear() {
+ ShuffleServerMetrics.clear();
+ }
+
+ private class SlowDiskStorageChecker extends LocalStorageChecker {
+ private long hangTimeSec;
+
+ SlowDiskStorageChecker(ShuffleServerConf conf, List<LocalStorage>
storages, long hangTimeSec) {
+ super(conf, storages);
+ this.hangTimeSec = hangTimeSec;
+
+ List<StorageInfo> storageInfoList =
+ storages.stream().map(x -> new
SlowStorageInfo(x)).collect(Collectors.toList());
+ super.storageInfos = storageInfoList;
+ }
+
+ private class SlowStorageInfo extends StorageInfo {
+
+ SlowStorageInfo(LocalStorage storage) {
+ super(storage);
+ }
+
+ @Override
+ public boolean checkStorageReadAndWrite() {
+ try {
+ Thread.sleep(hangTimeSec * 1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return true;
+ }
+ }
+ }
+
+ @Test
+ @Timeout(10)
+ public void testCheckingStorageHang(@TempDir File tempDir) {
+ String basePath = tempDir.getAbsolutePath();
+
+ ShuffleServerConf conf = new ShuffleServerConf();
+ conf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
+ conf.set(RssBaseConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE);
+ conf.set(ShuffleServerConf.HEALTH_CHECKER_LOCAL_STORAGE_EXECUTE_TIMEOUT, 2
* 1000L);
+
+ LocalStorage localStorage =
+
LocalStorage.newBuilder().basePath(tempDir.getAbsolutePath()).capacity(100000L).build();
+
+ SlowDiskStorageChecker checker =
+ new SlowDiskStorageChecker(conf, Arrays.asList(localStorage), 600);
+ assertFalse(checker.checkIsHealthy());
+ }
@Test
public void testGetUniffleUsedSpace(@TempDir File tempDir) throws
IOException {