This is an automated email from the ASF dual-hosted git repository.
rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new e500dc597 [#1956] fix(server): Prevent file deletion hang in case of
disk corruption (#1958)
e500dc597 is described below
commit e500dc5973e51723504909c2e48fd2e945e71f5e
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Jul 26 16:20:19 2024 +0800
[#1956] fix(server): Prevent file deletion hang in case of disk corruption
(#1958)
### What changes were proposed in this pull request?
Prevent file deletion hang in case of disk corruption.
### Why are the changes needed?
Fix: #1956
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
Unit tests
---
.../apache/uniffle/server/ShuffleServerConf.java | 6 ++
.../apache/uniffle/server/ShuffleTaskManager.java | 73 +++++++++++++++++++---
.../server/storage/LocalStorageManager.java | 4 +-
.../uniffle/server/ShuffleTaskManagerTest.java | 55 ++++++++++++++++
4 files changed, 128 insertions(+), 10 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index e504cd2e3..92e5a46fa 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -636,6 +636,12 @@ public class ShuffleServerConf extends RssBaseConf {
"A comma-separated block size list, where each value"
+ " can be suffixed with a memory size unit, such as kb or
k, mb or m, etc.");
+ public static final ConfigOption<Long>
STORAGE_REMOVE_RESOURCE_OPERATION_TIMEOUT_SEC =
+ ConfigOptions.key("rss.server.storage.resourceRemoveOperationTimeoutSec")
+ .longType()
+ .defaultValue(10 * 60L)
+ .withDescription("The storage remove resource operation timeout.");
+
public ShuffleServerConf() {}
public ShuffleServerConf(String fileName) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index b258c8a11..f90ec7c8a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -26,13 +26,17 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
@@ -61,6 +65,7 @@ import org.apache.uniffle.common.exception.NoBufferException;
import org.apache.uniffle.common.exception.NoBufferForHugePartitionException;
import org.apache.uniffle.common.exception.NoRegisterException;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.future.CompletableFutureExtension;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.Constants;
@@ -92,7 +97,7 @@ public class ShuffleTaskManager {
private final ScheduledExecutorService leakShuffleDataCheckExecutorService;
private ScheduledExecutorService triggerFlushExecutorService;
private final TopNShuffleDataSizeOfAppCalcTask
topNShuffleDataSizeOfAppCalcTask;
- private final StorageManager storageManager;
+ private StorageManager storageManager;
private AtomicLong requireBufferId = new AtomicLong(0);
private ShuffleServerConf conf;
private long appExpiredWithoutHB;
@@ -113,6 +118,7 @@ public class ShuffleTaskManager {
private Thread clearResourceThread;
private BlockingQueue<PurgeEvent> expiredAppIdQueue =
Queues.newLinkedBlockingQueue();
private final Cache<String, ReentrantReadWriteLock> appLocks;
+ private final long storageRemoveOperationTimeoutSec;
public ShuffleTaskManager(
ShuffleServerConf conf,
@@ -127,6 +133,8 @@ public class ShuffleTaskManager {
this.appExpiredWithoutHB =
conf.getLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT);
this.commitCheckIntervalMax =
conf.getLong(ShuffleServerConf.SERVER_COMMIT_CHECK_INTERVAL_MAX);
this.preAllocationExpired =
conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED);
+ this.storageRemoveOperationTimeoutSec =
+
conf.getLong(ShuffleServerConf.STORAGE_REMOVE_RESOURCE_OPERATION_TIMEOUT_SEC);
this.leakShuffleDataCheckInterval =
conf.getLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL);
this.triggerFlushInterval =
conf.getLong(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL);
@@ -766,8 +774,18 @@ public class ShuffleTaskManager {
});
shuffleBufferManager.removeBufferByShuffleId(appId, shuffleIds);
shuffleFlushManager.removeResourcesOfShuffleId(appId, shuffleIds);
- storageManager.removeResources(
- new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds));
+
+ String operationMsg =
+ String.format("removing storage data for appId:%s, shuffleIds:%s",
appId, shuffleIds);
+ withTimeoutExecution(
+ () -> {
+ storageManager.removeResources(
+ new ShufflePurgeEvent(appId, getUserByAppId(appId),
shuffleIds));
+ return null;
+ },
+ storageRemoveOperationTimeoutSec,
+ operationMsg);
+
LOG.info(
"Finish remove resource for appId[{}], shuffleIds[{}], cost[{}]",
appId,
@@ -811,12 +829,21 @@ public class ShuffleTaskManager {
partitionsToBlockIds.remove(appId);
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
- storageManager.removeResources(
- new AppPurgeEvent(
- appId,
- shuffleTaskInfo.getUser(),
- new ArrayList<>(shuffleTaskInfo.getShuffleIds()),
- checkAppExpired));
+
+ String operationMsg = String.format("removing storage data for
appId:%s", appId);
+ withTimeoutExecution(
+ () -> {
+ storageManager.removeResources(
+ new AppPurgeEvent(
+ appId,
+ shuffleTaskInfo.getUser(),
+ new ArrayList<>(shuffleTaskInfo.getShuffleIds()),
+ checkAppExpired));
+ return null;
+ },
+ storageRemoveOperationTimeoutSec,
+ operationMsg);
+
if (shuffleTaskInfo.hasHugePartition()) {
ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec();
ShuffleServerMetrics.gaugeHugePartitionNum.dec();
@@ -832,6 +859,28 @@ public class ShuffleTaskManager {
}
}
+ private void withTimeoutExecution(
+ Supplier supplier, long timeoutSec, String operationDetailedMsg) {
+ CompletableFuture<Void> future =
+ CompletableFuture.supplyAsync(supplier,
Executors.newSingleThreadExecutor());
+ CompletableFuture extended =
+ CompletableFutureExtension.orTimeout(future, timeoutSec,
TimeUnit.SECONDS);
+ try {
+ extended.get();
+ } catch (Exception e) {
+ if (e instanceof ExecutionException) {
+ if (e.getCause() instanceof TimeoutException) {
+ LOG.error(
+ "Errors on finishing operation of [{}] in the {}(sec). This
should not happen!",
+ operationDetailedMsg,
+ timeoutSec);
+ return;
+ }
+ throw new RssException(e);
+ }
+ }
+ }
+
public void refreshAppId(String appId) {
shuffleTaskInfos
.computeIfAbsent(
@@ -940,4 +989,10 @@ public class ShuffleTaskManager {
public void start() {
clearResourceThread.start();
}
+
+ // only for tests
+ @VisibleForTesting
+ protected void setStorageManager(StorageManager storageManager) {
+ this.storageManager = storageManager;
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 159f64f27..10f831ad1 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -290,7 +290,9 @@ public class LocalStorageManager extends
SingleStorageManager {
StorageType.LOCALFILE.name(), new Configuration()));
List<String> deletePaths =
- storageBasePaths.stream()
+ localStorages.stream()
+ .filter(x -> !x.isCorrupted())
+ .map(x -> x.getBasePath())
.flatMap(
path -> {
String basicPath =
ShuffleStorageUtils.getFullShuffleDataFolder(path, appId);
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 75c49cd4d..c9cbdf49a 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -40,6 +40,7 @@ import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -49,6 +50,7 @@ import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
+import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.InvalidRequestException;
import org.apache.uniffle.common.exception.NoBufferForHugePartitionException;
import org.apache.uniffle.common.exception.NoRegisterException;
@@ -60,6 +62,7 @@ import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.server.event.PurgeEvent;
import org.apache.uniffle.server.storage.LocalStorageManager;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.storage.HadoopTestBase;
@@ -68,6 +71,7 @@ import
org.apache.uniffle.storage.handler.impl.HadoopClientReadHandler;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import org.apache.uniffle.storage.util.StorageType;
+import static org.apache.uniffle.common.StorageType.MEMORY_LOCALFILE;
import static
org.apache.uniffle.server.ShuffleServerConf.CLIENT_MAX_CONCURRENCY_LIMITATION_OF_ONE_PARTITION;
import static
org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -77,6 +81,9 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
public class ShuffleTaskManagerTest extends HadoopTestBase {
@@ -1154,6 +1161,54 @@ public class ShuffleTaskManagerTest extends
HadoopTestBase {
assertEquals(30, ShuffleTaskManager.getMaxConcurrencyWriting(40, conf));
}
+ @Timeout(10)
+ @Test
+ public void testStorageRemoveResourceHang(@TempDir File tmpDir) throws
Exception {
+ String confFile = ClassLoader.getSystemResource("server.conf").getFile();
+ ShuffleServerConf conf = new ShuffleServerConf(confFile);
+ final String storageBasePath = tmpDir.getAbsolutePath() +
"rss/testStorageRemoveResourceHang";
+ conf.set(RssBaseConf.RSS_STORAGE_TYPE, MEMORY_LOCALFILE);
+ conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+ conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
+ conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
+ conf.set(ShuffleServerConf.STORAGE_REMOVE_RESOURCE_OPERATION_TIMEOUT_SEC,
2L);
+
+ shuffleServer = new ShuffleServer(conf);
+ ShuffleTaskManager shuffleTaskManager =
shuffleServer.getShuffleTaskManager();
+
+ String appId = "appId1";
+ shuffleTaskManager.registerShuffle(
+ appId,
+ 1,
+ Lists.newArrayList(new PartitionRange(0, 1)),
+ new RemoteStorageInfo(storageBasePath, Maps.newHashMap()),
+ StringUtils.EMPTY);
+ shuffleTaskManager.refreshAppId(appId);
+ assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+ ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
+ shuffleTaskManager.requireBuffer(35);
+ shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
+ shuffleTaskManager.updateCachedBlockIds(appId, 0,
partitionedData0.getBlockList());
+ shuffleTaskManager.refreshAppId(appId);
+ shuffleTaskManager.checkResourceStatus();
+ assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+ // get the underlying localfile storage manager to simulate hang
+ LocalStorageManager storageManager = (LocalStorageManager)
shuffleServer.getStorageManager();
+ storageManager = spy(storageManager);
+ doAnswer(
+ x -> {
+ Thread.sleep(100000);
+ return null;
+ })
+ .when(storageManager)
+ .removeResources(any(PurgeEvent.class));
+ shuffleTaskManager.setStorageManager(storageManager);
+
+ shuffleTaskManager.removeResources(appId, false);
+ }
+
@Test
public void testRegisterShuffleAfterAppIsExpired() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();