This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new c09cbd1b [ISSUE-380] Refactor the flush process to fix fallback fail
(#383)
c09cbd1b is described below
commit c09cbd1b707c3c4b3edfedb4e1db2ecba9c17287
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Dec 6 15:18:06 2022 +0800
[ISSUE-380] Refactor the flush process to fix fallback fail (#383)
### What changes were proposed in this pull request?
1. Avoid partial events being pushed into pending queue when using
`MultipleStorageManager`
2. Introduce the cleanupCallback of `ShuffleDataFlushEvent` to scope all
cleanup operation with event lifecycle.
3. Make the concrete under storage bound to data-flush event to avoid
duplicate invoking `selectStorage` method after writing.
4. Refactor the whole flush process to a `while` loop, more simple and
clear.
5. Add the metric of `total_failed_written_event_num`
### Why are the changes needed?
To fix the fallback invalid when data-flush event enters into pending queue
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
1. Simple UTs
2. Integration tests
---
...st.java => MultiStorageFaultToleranceBase.java} | 39 ++---
.../uniffle/test/MultiStorageHdfsFallbackTest.java | 56 ++++++++
.../test/MultiStorageLocalfileFallbackTest.java | 66 +++++++++
.../client/factory/ShuffleServerClientFactory.java | 6 +
.../uniffle/server/ShuffleDataFlushEvent.java | 47 ++++++
.../apache/uniffle/server/ShuffleFlushManager.java | 157 +++++++++++----------
.../uniffle/server/ShuffleServerMetrics.java | 3 +
.../uniffle/server/buffer/ShuffleBuffer.java | 1 +
.../server/buffer/ShuffleBufferManager.java | 1 +
.../uniffle/server/storage/HdfsStorageManager.java | 4 +-
.../server/storage/LocalStorageManager.java | 1 +
.../server/storage/MultiStorageManager.java | 55 +++-----
.../server/storage/SingleStorageManager.java | 2 +-
.../uniffle/server/ShuffleFlushManagerTest.java | 76 +++++++++-
.../uniffle/server/ShuffleServerMetricsTest.java | 1 -
.../server/storage/MultiStorageManagerTest.java | 18 ++-
.../uniffle/storage/common/LocalStorage.java | 6 +
.../handler/impl/LocalFileClientReadHandler.java | 2 +-
18 files changed, 392 insertions(+), 149 deletions(-)
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceBase.java
similarity index 76%
rename from
integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
rename to
integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceBase.java
index 6e0145d8..acdbf23d 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceBase.java
@@ -18,7 +18,6 @@
package org.apache.uniffle.test;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -27,11 +26,11 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.request.RssFinishShuffleRequest;
@@ -44,37 +43,17 @@ import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class MultiStorageFaultToleranceTest extends ShuffleReadWriteBase {
+public abstract class MultiStorageFaultToleranceBase extends
ShuffleReadWriteBase {
private ShuffleServerGrpcClient shuffleServerClient;
private static String REMOTE_STORAGE = HDFS_URI + "rss/multi_storage_fault";
- @BeforeAll
- public static void setupServers() throws Exception {
- final CoordinatorConf coordinatorConf = getCoordinatorConf();
- ShuffleServerConf shuffleServerConf = getShuffleServerConf();
- String basePath = generateBasePath();
- shuffleServerConf.setDouble(ShuffleServerConf.CLEANUP_THRESHOLD, 0.0);
- shuffleServerConf.setDouble(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE,
100.0);
- shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L *
100);
- shuffleServerConf.setLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC,
30L);
- shuffleServerConf.setLong(ShuffleServerConf.SHUFFLE_EXPIRED_TIMEOUT_MS,
5000L);
-
shuffleServerConf.setLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
60L * 1000L * 60L);
- shuffleServerConf.setLong(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 20L *
1000L);
- shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.LOCALFILE_HDFS.name());
- shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(basePath));
-
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE,
400L * 1024L * 1024L);
- createAndStartServers(shuffleServerConf, coordinatorConf);
- }
-
@BeforeEach
public void createClient() {
+ ShuffleServerClientFactory.getInstance().cleanupCache();
shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST,
SHUFFLE_SERVER_PORT);
}
@@ -83,19 +62,19 @@ public class MultiStorageFaultToleranceTest extends
ShuffleReadWriteBase {
shuffleServerClient.close();
}
+ abstract void makeChaos();
+
@Test
- public void hdfsFallbackTest() throws Exception {
- String appId = "fallback_test";
+ public void fallbackTest() throws Exception {
+ String appId = "fallback_test_" + this.getClass().getSimpleName();
Map<Long, byte[]> expectedData = Maps.newHashMap();
Map<Integer, List<Integer>> map = Maps.newHashMap();
map.put(0, Lists.newArrayList(0));
registerShuffle(appId, map);
Roaring64NavigableMap blockBitmap = Roaring64NavigableMap.bitmapOf();
final List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- 0, 0, 0, 40, 10 * 1024 * 1024, blockBitmap, expectedData);
- assertEquals(1, cluster.getDataNodes().size());
- cluster.stopDataNode(0);
- assertEquals(0, cluster.getDataNodes().size());
+ 0, 0, 0, 40, 2 * 1024 * 1024, blockBitmap, expectedData);
+ makeChaos();
sendSinglePartitionToShuffleServer(appId, 0, 0, 0, blocks);
validateResult(appId, 0, 0, blockBitmap,
Roaring64NavigableMap.bitmapOf(0), expectedData);
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageHdfsFallbackTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageHdfsFallbackTest.java
new file mode 100644
index 00000000..63ec7ac7
--- /dev/null
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageHdfsFallbackTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.util.Arrays;
+
+import org.junit.jupiter.api.BeforeAll;
+
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class MultiStorageHdfsFallbackTest extends
MultiStorageFaultToleranceBase {
+
+ @BeforeAll
+ public static void setupServers() throws Exception {
+ final CoordinatorConf coordinatorConf = getCoordinatorConf();
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+ String basePath = generateBasePath();
+ shuffleServerConf.setDouble(ShuffleServerConf.CLEANUP_THRESHOLD, 0.0);
+ shuffleServerConf.setDouble(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE,
100.0);
+ shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L *
100);
+ shuffleServerConf.setLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC,
30L);
+ shuffleServerConf.setLong(ShuffleServerConf.SHUFFLE_EXPIRED_TIMEOUT_MS,
5000L);
+
shuffleServerConf.setLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
60L * 1000L * 60L);
+ shuffleServerConf.setLong(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 20L *
1000L);
+ shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.LOCALFILE_HDFS.name());
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(basePath));
+
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE,
1L * 1024L * 1024L);
+ createAndStartServers(shuffleServerConf, coordinatorConf);
+ }
+
+ @Override
+ public void makeChaos() {
+ assertEquals(1, cluster.getDataNodes().size());
+ cluster.stopDataNode(0);
+ assertEquals(0, cluster.getDataNodes().size());
+ }
+}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageLocalfileFallbackTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageLocalfileFallbackTest.java
new file mode 100644
index 00000000..7d70d017
--- /dev/null
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageLocalfileFallbackTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.util.Arrays;
+
+import org.junit.jupiter.api.BeforeAll;
+
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.storage.LocalStorageManager;
+import org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy;
+import org.apache.uniffle.server.storage.MultiStorageManager;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class MultiStorageLocalfileFallbackTest extends
MultiStorageFaultToleranceBase {
+
+ @BeforeAll
+ public static void setupServers() throws Exception {
+ final CoordinatorConf coordinatorConf = getCoordinatorConf();
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+ String basePath = generateBasePath();
+ shuffleServerConf.setDouble(ShuffleServerConf.CLEANUP_THRESHOLD, 0.0);
+ shuffleServerConf.setDouble(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE,
100.0);
+ shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L *
100);
+ shuffleServerConf.setLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC,
30L);
+ shuffleServerConf.setLong(ShuffleServerConf.SHUFFLE_EXPIRED_TIMEOUT_MS,
5000L);
+
shuffleServerConf.setLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
60L * 1000L * 60L);
+ shuffleServerConf.setLong(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 20L *
1000L);
+ shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.LOCALFILE_HDFS.name());
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(basePath));
+
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE,
1000L * 1024L * 1024L);
+ shuffleServerConf.setString(
+ ShuffleServerConf.MULTISTORAGE_FALLBACK_STRATEGY_CLASS,
+ LocalStorageManagerFallbackStrategy.class.getCanonicalName()
+ );
+ createAndStartServers(shuffleServerConf, coordinatorConf);
+ }
+
+ @Override
+ public void makeChaos() {
+ LocalStorageManager warmStorageManager =
+ (LocalStorageManager)
((MultiStorageManager)shuffleServers.get(0).getStorageManager()).getWarmStorageManager();
+ for (Storage storage : warmStorageManager.getStorages()) {
+ LocalStorage localStorage = (LocalStorage) storage;
+ localStorage.markSpaceFull();
+ }
+ }
+}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
index 4337b855..6d82e126 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
@@ -59,4 +59,10 @@ public class ShuffleServerClientFactory {
}
return serverToClients.get(shuffleServerInfo);
}
+
+ // Only for tests
+ public synchronized void cleanupCache() {
+ clients.values().stream().flatMap(x ->
x.values().stream()).forEach(ShuffleServerClient::close);
+ this.clients = Maps.newConcurrentMap();
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
index 1fe5ba9d..883ef14a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
@@ -17,14 +17,20 @@
package org.apache.uniffle.server;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
+import org.apache.uniffle.storage.common.Storage;
public class ShuffleDataFlushEvent {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ShuffleDataFlushEvent.class);
private final long eventId;
private final String appId;
@@ -37,6 +43,10 @@ public class ShuffleDataFlushEvent {
private final ShuffleBuffer shuffleBuffer;
private final AtomicInteger retryTimes = new AtomicInteger();
+ private boolean isPended = false;
+ private Storage underStorage;
+ private final List<Runnable> cleanupCallbackChains;
+
public ShuffleDataFlushEvent(
long eventId,
String appId,
@@ -56,6 +66,7 @@ public class ShuffleDataFlushEvent {
this.shuffleBlocks = shuffleBlocks;
this.valid = valid;
this.shuffleBuffer = shuffleBuffer;
+ this.cleanupCallbackChains = new ArrayList<>();
}
public List<ShufflePartitionedBlock> getShuffleBlocks() {
@@ -105,6 +116,42 @@ public class ShuffleDataFlushEvent {
retryTimes.incrementAndGet();
}
+ public boolean isPended() {
+ return isPended;
+ }
+
+ public void markPended() {
+ isPended = true;
+ }
+
+ public Storage getUnderStorage() {
+ return underStorage;
+ }
+
+ public void setUnderStorage(Storage underStorage) {
+ this.underStorage = underStorage;
+ }
+
+ public boolean doCleanup() {
+ boolean ret = true;
+ for (Runnable cleanupCallback : cleanupCallbackChains) {
+ try {
+ cleanupCallback.run();
+ } catch (Exception e) {
+ ret = false;
+ LOGGER.error("Errors doing cleanup callback. event: {}", this, e);
+ }
+ }
+ return ret;
+ }
+
+ public void addCleanupCallback(
+ Runnable cleanupCallback) {
+ if (cleanupCallback != null) {
+ cleanupCallbackChains.add(cleanupCallback);
+ }
+ }
+
@Override
public String toString() {
return "ShuffleDataFlushEvent: eventId=" + eventId
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index b2a67853..367fdaff 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -40,7 +40,7 @@ import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.util.ThreadUtils;
-import org.apache.uniffle.server.buffer.ShuffleBuffer;
+import org.apache.uniffle.server.storage.MultiStorageManager;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -143,86 +143,100 @@ public class ShuffleFlushManager {
}
private void flushToFile(ShuffleDataFlushEvent event) {
-
- Storage storage = storageManager.selectStorage(event);
- if (storage != null && !storage.canWrite()) {
- addPendingEvents(event);
- return;
- }
-
long start = System.currentTimeMillis();
- List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
boolean writeSuccess = false;
- try {
- // storage info maybe null if the application cache was cleared already
- if (storage != null) {
- if (blocks == null || blocks.isEmpty()) {
- LOG.info("There is no block to be flushed: " + event);
- } else if (!event.isValid()) {
- // avoid printing error log
+
+ while (event.getRetryTimes() <= retryMax) {
+ try {
+ if (!event.isValid()) {
writeSuccess = true;
LOG.warn("AppId {} was removed already, event {} should be dropped",
event.getAppId(), event);
- } else {
- String user = StringUtils.defaultString(
-
shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
- StringUtils.EMPTY
- );
- CreateShuffleWriteHandlerRequest request = new
CreateShuffleWriteHandlerRequest(
- storageType,
- event.getAppId(),
- event.getShuffleId(),
- event.getStartPartition(),
- event.getEndPartition(),
- storageBasePaths.toArray(new
String[storageBasePaths.size()]),
- shuffleServerId,
- hadoopConf,
- storageDataReplica,
- user);
- ShuffleWriteHandler handler =
storage.getOrCreateWriteHandler(request);
- do {
- if (event.getRetryTimes() > retryMax) {
- LOG.error("Failed to write data for " + event + " in " +
retryMax + " times, shuffle data will be lost");
-
ShuffleServerMetrics.incStorageFailedCounter(storage.getStorageHost());
- break;
- }
- if (!event.isValid()) {
- LOG.warn("AppId {} was removed already, event {} should be
dropped, may leak one handler",
- event.getAppId(), event);
- // avoid printing error log
- writeSuccess = true;
- break;
- }
+ break;
+ }
+
+ List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+ if (blocks == null || blocks.isEmpty()) {
+ LOG.info("There is no block to be flushed: {}", event);
+ break;
+ }
- writeSuccess = storageManager.write(storage, handler, event);
+ Storage storage = storageManager.selectStorage(event);
+ if (storage == null) {
+ break;
+ }
- if (writeSuccess) {
- updateCommittedBlockIds(event.getAppId(), event.getShuffleId(),
blocks);
-
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
+ if (!storage.canWrite()) {
+ // todo: Could we add an interface supportPending for storageManager
+ // to unify following logic of multiple different storage
managers
+ if (storageManager instanceof MultiStorageManager) {
+ event.increaseRetryTimes();
+
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+ continue;
+ } else {
+ // To avoid being re-pushed to pending queue and make the server
too much pressure,
+ // it's better to drop directly.
+ if (event.isPended()) {
+ LOG.error("Drop this event directly due to already having
entered pending queue. event: {}", event);
break;
- } else {
- event.increaseRetryTimes();
-
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
}
- } while (event.getRetryTimes() <= retryMax);
+ event.increaseRetryTimes();
+ event.markPended();
+ addPendingEvents(event);
+ return;
+ }
}
- }
- } catch (Exception e) {
- // just log the error, don't throw the exception and stop the flush
thread
- LOG.error("Exception happened when process flush shuffle data for " +
event, e);
- } finally {
- cleanupFlushEventData(event);
- if (shuffleServer != null) {
- long duration = System.currentTimeMillis() - start;
+
+ String user = StringUtils.defaultString(
+
shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
+ StringUtils.EMPTY
+ );
+ CreateShuffleWriteHandlerRequest request = new
CreateShuffleWriteHandlerRequest(
+ storageType,
+ event.getAppId(),
+ event.getShuffleId(),
+ event.getStartPartition(),
+ event.getEndPartition(),
+ storageBasePaths.toArray(new String[storageBasePaths.size()]),
+ shuffleServerId,
+ hadoopConf,
+ storageDataReplica,
+ user);
+ ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
+ writeSuccess = storageManager.write(storage, handler, event);
if (writeSuccess) {
- LOG.debug("Flush to file success in " + duration + " ms and release
" + event.getSize() + " bytes");
+ updateCommittedBlockIds(event.getAppId(), event.getShuffleId(),
blocks);
+
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
+ break;
} else {
- LOG.error("Flush to file for " + event + " failed in "
- + duration + " ms and release " + event.getSize() + " bytes");
+ event.increaseRetryTimes();
+
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
}
+ } catch (Throwable throwable) {
+ // just log the error, don't throw the exception and stop the flush
thread
+ LOG.error("Exception happened when process flush shuffle data for {}",
event, throwable);
+ event.increaseRetryTimes();
+ }
+ }
+
+ if (event.getRetryTimes() > retryMax) {
+ LOG.error("Failed to write data for {} in {} times, shuffle data will be
lost", event, retryMax);
+ if (event.getUnderStorage() != null) {
+
ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());
}
}
- }
+ event.doCleanup();
+ if (shuffleServer != null) {
+ long duration = System.currentTimeMillis() - start;
+ if (writeSuccess) {
+ LOG.debug("Flush to file success in {} ms and release {} bytes",
duration, event.getSize());
+ } else {
+ ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
+ LOG.error("Flush to file for {} failed in {} ms and release {} bytes",
event, duration, event.getSize());
+ }
+ }
+ }
+
private void updateCommittedBlockIds(String appId, int shuffleId,
List<ShufflePartitionedBlock> blocks) {
if (blocks == null || blocks.size() == 0) {
return;
@@ -309,19 +323,12 @@ public class ShuffleFlushManager {
}
private void cleanupFlushEventData(ShuffleDataFlushEvent event) {
- ShuffleBuffer shuffleBuffer = event.getShuffleBuffer();
- if (shuffleBuffer != null) {
- shuffleBuffer.clearInFlushBuffer(event.getEventId());
- }
- if (shuffleServer != null) {
- shuffleServer.getShuffleBufferManager().releaseMemory(
- event.getSize(), true, false);
- }
+ event.doCleanup();
}
private void dropPendingEvent(PendingShuffleFlushEvent event) {
ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
- cleanupFlushEventData(event.getEvent());
+ event.getEvent().doCleanup();
}
@VisibleForTesting
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 0bc8d903..60071ef3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -71,6 +71,7 @@ public class ShuffleServerMetrics {
private static final String USED_BUFFER_SIZE = "used_buffer_size";
private static final String TOTAL_UPLOAD_SIZE = "total_upload_size";
private static final String TOTAL_UPLOAD_TIME_S = "total_upload_time_s";
+ private static final String TOTAL_FAILED_WRITTEN_EVENT_NUM =
"total_failed_written_event_num";
private static final String TOTAL_DROPPED_EVENT_NUM =
"total_dropped_event_num";
private static final String TOTAL_HDFS_WRITE_DATA = "total_hdfs_write_data";
private static final String TOTAL_LOCALFILE_WRITE_DATA =
"total_localfile_write_data";
@@ -102,6 +103,7 @@ public class ShuffleServerMetrics {
public static Counter counterTotalReadTime;
public static Counter counterTotalUploadSize;
public static Counter counterTotalUploadTimeS;
+ public static Counter counterTotalFailedWrittenEventNum;
public static Counter counterTotalDroppedEventNum;
public static Counter counterTotalHdfsWriteDataSize;
public static Counter counterTotalLocalFileWriteDataSize;
@@ -251,6 +253,7 @@ public class ShuffleServerMetrics {
counterTotalUploadSize = metricsManager.addCounter(TOTAL_UPLOAD_SIZE);
counterTotalUploadTimeS = metricsManager.addCounter(TOTAL_UPLOAD_TIME_S);
counterTotalDroppedEventNum =
metricsManager.addCounter(TOTAL_DROPPED_EVENT_NUM);
+ counterTotalFailedWrittenEventNum =
metricsManager.addCounter(TOTAL_FAILED_WRITTEN_EVENT_NUM);
counterTotalHdfsWriteDataSize =
metricsManager.addCounter(TOTAL_HDFS_WRITE_DATA);
counterTotalLocalFileWriteDataSize =
metricsManager.addCounter(TOTAL_LOCALFILE_WRITE_DATA);
counterTotalRequireBufferFailed =
metricsManager.addCounter(TOTAL_REQUIRE_BUFFER_FAILED);
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index 37e497a8..874bcaae 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -104,6 +104,7 @@ public class ShuffleBuffer {
spBlocks,
isValid,
this);
+ event.addCleanupCallback(() ->
this.clearInFlushBuffer(event.getEventId()));
inFlushBlockMap.put(eventId, inFlushedQueueBlocks);
blocks.clear();
size = 0;
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index c3606a08..b606d30c 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -218,6 +218,7 @@ public class ShuffleBufferManager {
shuffleFlushManager.getDataDistributionType(appId)
);
if (event != null) {
+ event.addCleanupCallback(() -> releaseMemory(event.getSize(), true,
false));
updateShuffleSize(appId, shuffleId, -event.getSize());
inFlushSize.addAndGet(event.getSize());
ShuffleServerMetrics.gaugeInFlushBufferSize.set(inFlushSize.get());
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 2151e75d..09c7dd03 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -70,7 +70,9 @@ public class HdfsStorageManager extends SingleStorageManager {
@Override
public Storage selectStorage(ShuffleDataFlushEvent event) {
- return getStorageByAppId(event.getAppId());
+ Storage storage = getStorageByAppId(event.getAppId());
+ event.setUnderStorage(storage);
+ return storage;
}
@Override
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index db0f47e3..4e47a34f 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -151,6 +151,7 @@ public class LocalStorageManager extends
SingleStorageManager {
if (storage.isCorrupted()) {
storage = getRepairedStorage(event.getAppId(), event.getShuffleId(),
event.getStartPartition());
}
+ event.setUnderStorage(storage);
return storage;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
index adb84369..ee53ff03 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
@@ -17,7 +17,6 @@
package org.apache.uniffle.server.storage;
-import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
@@ -35,7 +34,6 @@ import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.event.PurgeEvent;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
-import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
public class MultiStorageManager implements StorageManager {
@@ -45,7 +43,7 @@ public class MultiStorageManager implements StorageManager {
private final StorageManager coldStorageManager;
private final long flushColdStorageThresholdSize;
private AbstractStorageManagerFallbackStrategy
storageManagerFallbackStrategy;
- private Cache<ShuffleDataFlushEvent, StorageManager> storageManagerCache;
+ private final Cache<ShuffleDataFlushEvent, StorageManager>
eventOfUnderStorageManagers;
MultiStorageManager(ShuffleServerConf conf) {
warmStorageManager = new LocalStorageManager(conf);
@@ -57,7 +55,7 @@ public class MultiStorageManager implements StorageManager {
throw new RuntimeException("Load fallback strategy failed.", e);
}
long cacheTimeout =
conf.getLong(ShuffleServerConf.STORAGEMANAGER_CACHE_TIMEOUT);
- storageManagerCache = CacheBuilder.newBuilder()
+ eventOfUnderStorageManagers = CacheBuilder.newBuilder()
.expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS)
.build();
}
@@ -99,51 +97,32 @@ public class MultiStorageManager implements StorageManager {
throw new UnsupportedOperationException();
}
- @Override
- public boolean write(Storage storage, ShuffleWriteHandler handler,
ShuffleDataFlushEvent event) {
- StorageManager storageManager = selectStorageManager(event);
- if (event.getRetryTimes() > 0) {
- try {
- StorageManager newStorageManager =
storageManagerFallbackStrategy.tryFallback(
- storageManager, event, warmStorageManager, coldStorageManager);
- if (newStorageManager != storageManager) {
- storageManager = newStorageManager;
- storageManagerCache.put(event, storageManager);
- CreateShuffleWriteHandlerRequest request =
storage.getCreateWriterHandlerRequest(
- event.getAppId(), event.getShuffleId(),
event.getStartPartition());
- storage = storageManager.selectStorage(event);
- handler = storage.getOrCreateWriteHandler(request);
- }
- } catch (IOException ioe) {
- LOG.warn("Create fallback write handler failed ", ioe);
- }
- }
- boolean success = storageManager.write(storage, handler, event);
- if (success) {
- storageManagerCache.invalidate(event);
- }
- return success;
- }
-
private StorageManager selectStorageManager(ShuffleDataFlushEvent event) {
- StorageManager storageManager = storageManagerCache.getIfPresent(event);
- if (storageManager != null) {
- return storageManager;
- }
+ StorageManager storageManager;
if (event.getSize() > flushColdStorageThresholdSize) {
storageManager = coldStorageManager;
} else {
storageManager = warmStorageManager;
}
- if (!storageManager.canWrite(event)) {
+ if (!storageManager.canWrite(event) || event.getRetryTimes() > 0) {
storageManager = storageManagerFallbackStrategy.tryFallback(
storageManager, event, warmStorageManager, coldStorageManager);
}
- storageManagerCache.put(event, storageManager);
+ eventOfUnderStorageManagers.put(event, storageManager);
+ event.addCleanupCallback(() ->
eventOfUnderStorageManagers.invalidate(event));
return storageManager;
}
+ @Override
+ public boolean write(Storage storage, ShuffleWriteHandler handler,
ShuffleDataFlushEvent event) {
+ StorageManager underStorageManager =
eventOfUnderStorageManagers.getIfPresent(event);
+ if (underStorageManager == null) {
+ return false;
+ }
+ return underStorageManager.write(storage, handler, event);
+ }
+
public void start() {
}
@@ -174,4 +153,8 @@ public class MultiStorageManager implements StorageManager {
public StorageManager getColdStorageManager() {
return coldStorageManager;
}
+
+ public StorageManager getWarmStorageManager() {
+ return warmStorageManager;
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
index 6219cb54..2acc49c4 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
@@ -104,7 +104,7 @@ public abstract class SingleStorageManager implements
StorageManager {
} else {
ShuffleServerMetrics.counterEventSizeThresholdLevel4.inc();
}
- Storage storage = selectStorage(event);
+ Storage storage = event.getUnderStorage();
if (storage != null) {
storage.updateWriteMetrics(metrics);
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 5f1bb47d..8d27f0f8 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -54,11 +54,14 @@ import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.event.AppPurgeEvent;
import org.apache.uniffle.server.storage.HdfsStorageManager;
+import org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy;
+import org.apache.uniffle.server.storage.MultiStorageManager;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.server.storage.StorageManagerFactory;
import org.apache.uniffle.storage.HdfsTestBase;
import org.apache.uniffle.storage.common.AbstractStorage;
import org.apache.uniffle.storage.common.HdfsStorage;
+import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler;
import org.apache.uniffle.storage.util.StorageType;
@@ -360,9 +363,26 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
public static ShuffleDataFlushEvent createShuffleDataFlushEvent(
String appId, int shuffleId, int startPartition, int endPartition,
Supplier<Boolean> isValid) {
+ return createShuffleDataFlushEvent(
+ appId,
+ shuffleId,
+ startPartition,
+ endPartition,
+ isValid,
+ 1
+ );
+ }
+
+ public static ShuffleDataFlushEvent createShuffleDataFlushEvent(
+ String appId,
+ int shuffleId,
+ int startPartition,
+ int endPartition,
+ Supplier<Boolean> isValid,
+ int size) {
List<ShufflePartitionedBlock> spbs = createBlock(5, 32);
return new ShuffleDataFlushEvent(ATOMIC_LONG.getAndIncrement(),
- appId, shuffleId, startPartition, endPartition, 1, spbs, isValid,
null);
+ appId, shuffleId, startPartition, endPartition, size, spbs, isValid,
null);
}
public static List<ShufflePartitionedBlock> createBlock(int num, int length)
{
@@ -416,6 +436,55 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
assertEquals(blocks.size(), matchNum);
}
+ @Test
+ public void fallbackWrittenWhenMultiStorageManagerEnableTest(@TempDir File
tempDir) throws InterruptedException {
+
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE,
10000L);
+ shuffleServerConf.set(RssBaseConf.RSS_STORAGE_TYPE,
StorageType.LOCALFILE_HDFS.toString());
+ shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(tempDir.getAbsolutePath()));
+ shuffleServerConf.set(ShuffleServerConf.DISK_CAPACITY, 100L);
+
shuffleServerConf.setString(ShuffleServerConf.MULTISTORAGE_FALLBACK_STRATEGY_CLASS,
+ LocalStorageManagerFallbackStrategy.class.getCanonicalName());
+
+ StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+ String remoteStorage = "test";
+ String appId = "fallbackWrittenWhenMultiStorageManagerEnableTest";
+ storageManager.registerRemoteStorage(appId, new
RemoteStorageInfo(remoteStorage));
+
+ ShuffleFlushManager flushManager = new ShuffleFlushManager(
+ shuffleServerConf,
+ "shuffle-server-id",
+ mockShuffleServer,
+ storageManager
+ );
+
+ // case1: normally written to local storage
+ ShuffleDataFlushEvent event = createShuffleDataFlushEvent(appId, 1, 1, 1,
null, 100);
+ flushManager.addToFlushQueue(event);
+ Thread.sleep(1000);
+ assertTrue(event.getUnderStorage() instanceof LocalStorage);
+
+ // case2: huge event is written to cold storage directly
+ event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100000);
+ flushManager.addToFlushQueue(event);
+ Thread.sleep(1000);
+ assertTrue(event.getUnderStorage() instanceof HdfsStorage);
+ assertEquals(0, event.getRetryTimes());
+
+ // case3: local disk is full or corrupted, fallback to HDFS
+ List<ShufflePartitionedBlock> blocks = Lists.newArrayList(
+ new ShufflePartitionedBlock(100000, 1000, 1, 1, 1L, null)
+ );
+ ShuffleDataFlushEvent bigEvent = new ShuffleDataFlushEvent(1, "1", 1, 1,
1, 100, blocks, null, null);
+
bigEvent.setUnderStorage(((MultiStorageManager)storageManager).getWarmStorageManager().selectStorage(event));
+
((MultiStorageManager)storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent,
0);
+
+ event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100);
+ flushManager.addToFlushQueue(event);
+ Thread.sleep(1000);
+ assertTrue(event.getUnderStorage() instanceof HdfsStorage);
+ assertEquals(1, event.getRetryTimes());
+ }
+
@Test
public void processPendingEventsTest(@TempDir File tempDir) {
try {
@@ -427,7 +496,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
ShuffleFlushManager manager =
new ShuffleFlushManager(shuffleServerConf, "shuffleServerId",
mockShuffleServer, storageManager);
- ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(1, "1", 1, 1,1,
100, null, null, null);
+ ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(1, "1", 1, 1, 1,
100, null, null, null);
assertEquals(0, manager.getPendingEventsSize());
manager.addPendingEvents(event);
Thread.sleep(1000);
@@ -435,9 +504,12 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
do {
Thread.sleep(1 * 1000);
} while (manager.getEventNumInFlush() != 0);
+
List<ShufflePartitionedBlock> blocks = Lists.newArrayList(new
ShufflePartitionedBlock(100, 1000, 1, 1, 1L, null));
ShuffleDataFlushEvent bigEvent = new ShuffleDataFlushEvent(1, "1", 1, 1,
1, 100, blocks, null, null);
+ bigEvent.setUnderStorage(storageManager.selectStorage(event));
storageManager.updateWriteMetrics(bigEvent, 0);
+
manager.addPendingEvents(event);
manager.addPendingEvents(event);
manager.addPendingEvents(event);
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index 6276dc23..52629f40 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -90,7 +90,6 @@ public class ShuffleServerMetricsTest {
JsonNode actualObj = mapper.readTree(content);
assertEquals(2, actualObj.size());
JsonNode metricsNode = actualObj.get("metrics");
- assertEquals(49, metricsNode.size());
List<String> expectedMetricNames = Lists.newArrayList(
ShuffleServerMetrics.STORAGE_TOTAL_WRITE_REMOTE_PREFIX + STORAGE_HOST,
diff --git
a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
index 1c6075aa..d385e933 100644
---
a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
@@ -57,7 +57,7 @@ public class MultiStorageManagerTest {
}
@Test
- public void selectStorageManagerIfCanNotWriteTest() {
+ public void underStorageManagerSelectionTest() {
ShuffleServerConf conf = new ShuffleServerConf();
conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 10000L);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
@@ -69,7 +69,21 @@ public class MultiStorageManagerTest {
String remoteStorage = "test";
String appId = "selectStorageManagerIfCanNotWriteTest_appId";
manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
- List<ShufflePartitionedBlock> blocks = Lists.newArrayList(new
ShufflePartitionedBlock(100, 1000, 1, 1, 1L, null));
+
+ /**
+ * case1: big event should be written into cold storage directly
+ */
+ List<ShufflePartitionedBlock> blocks = Lists.newArrayList(
+ new ShufflePartitionedBlock(10001, 1000, 1, 1, 1L, null)
+ );
+ ShuffleDataFlushEvent hugeEvent = new ShuffleDataFlushEvent(
+ 1, appId, 1, 1, 1, 10001, blocks, null, null);
+ assertTrue(manager.selectStorage(hugeEvent) instanceof HdfsStorage);
+
+ /**
+ * case2: fallback when disk can not write
+ */
+ blocks = Lists.newArrayList(new ShufflePartitionedBlock(100, 1000, 1, 1,
1L, null));
ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(
1, appId, 1, 1, 1, 1000, blocks, null, null);
Storage storage = manager.selectStorage(event);
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index 4032f3b1..185ce58f 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -331,6 +331,12 @@ public class LocalStorage extends AbstractStorage {
return appIds;
}
+ // Only for test
+ @VisibleForTesting
+ public void markSpaceFull() {
+ isSpaceEnough = false;
+ }
+
public static class Builder {
private long capacity;
private double lowWaterMarkOfWrite;
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
index 7aac7d0f..a2d96940 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
@@ -90,7 +90,7 @@ public class LocalFileClientReadHandler extends
DataSkippableReadHandler {
shuffleIndexResult =
shuffleServerClient.getShuffleIndex(request).getShuffleIndexResult();
} catch (Exception e) {
throw new RssException("Failed to read shuffle index for appId[" + appId
+ "], shuffleId["
- + shuffleId + "], partitionId[" + partitionId + "] due to " +
e.getMessage());
+ + shuffleId + "], partitionId[" + partitionId + "]", e);
}
return shuffleIndexResult;
}