This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new c09cbd1b [ISSUE-380] Refactor the flush process to fix fallback fail  
(#383)
c09cbd1b is described below

commit c09cbd1b707c3c4b3edfedb4e1db2ecba9c17287
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Dec 6 15:18:06 2022 +0800

    [ISSUE-380] Refactor the flush process to fix fallback fail  (#383)
    
    ### What changes were proposed in this pull request?
    
    1. Avoid partial events being pushed into pending queue when using 
`MultipleStorageManager`
    2. Introduce the cleanupCallback of `ShuffleDataFlushEvent` to scope all 
cleanup operation with event lifecycle.
    3. Make the concrete under storage bound to data-flush event to avoid 
duplicate invoking `selectStorage` method after writing.
    4. Refactor the whole flush process to a `while` loop, more simple and 
clear.
    5. Add the metric of `total_failed_written_event_num`
    
    ### Why are the changes needed?
    
    To fix the fallback invalid when data-flush event enters into pending queue
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    ### How was this patch tested?
    
    1. Simple UTs
    2. Integration tests
---
 ...st.java => MultiStorageFaultToleranceBase.java} |  39 ++---
 .../uniffle/test/MultiStorageHdfsFallbackTest.java |  56 ++++++++
 .../test/MultiStorageLocalfileFallbackTest.java    |  66 +++++++++
 .../client/factory/ShuffleServerClientFactory.java |   6 +
 .../uniffle/server/ShuffleDataFlushEvent.java      |  47 ++++++
 .../apache/uniffle/server/ShuffleFlushManager.java | 157 +++++++++++----------
 .../uniffle/server/ShuffleServerMetrics.java       |   3 +
 .../uniffle/server/buffer/ShuffleBuffer.java       |   1 +
 .../server/buffer/ShuffleBufferManager.java        |   1 +
 .../uniffle/server/storage/HdfsStorageManager.java |   4 +-
 .../server/storage/LocalStorageManager.java        |   1 +
 .../server/storage/MultiStorageManager.java        |  55 +++-----
 .../server/storage/SingleStorageManager.java       |   2 +-
 .../uniffle/server/ShuffleFlushManagerTest.java    |  76 +++++++++-
 .../uniffle/server/ShuffleServerMetricsTest.java   |   1 -
 .../server/storage/MultiStorageManagerTest.java    |  18 ++-
 .../uniffle/storage/common/LocalStorage.java       |   6 +
 .../handler/impl/LocalFileClientReadHandler.java   |   2 +-
 18 files changed, 392 insertions(+), 149 deletions(-)

diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceBase.java
similarity index 76%
rename from 
integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
rename to 
integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceBase.java
index 6e0145d8..acdbf23d 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceBase.java
@@ -18,7 +18,6 @@
 package org.apache.uniffle.test;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -27,11 +26,11 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
+import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
 import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
 import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
 import org.apache.uniffle.client.request.RssFinishShuffleRequest;
@@ -44,37 +43,17 @@ import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.storage.util.StorageType;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class MultiStorageFaultToleranceTest extends ShuffleReadWriteBase {
+public abstract class MultiStorageFaultToleranceBase extends 
ShuffleReadWriteBase {
   private ShuffleServerGrpcClient shuffleServerClient;
   private static String REMOTE_STORAGE = HDFS_URI + "rss/multi_storage_fault";
 
-  @BeforeAll
-  public static void setupServers() throws Exception {
-    final CoordinatorConf coordinatorConf = getCoordinatorConf();
-    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
-    String basePath = generateBasePath();
-    shuffleServerConf.setDouble(ShuffleServerConf.CLEANUP_THRESHOLD, 0.0);
-    shuffleServerConf.setDouble(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE, 
100.0);
-    shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 
100);
-    shuffleServerConf.setLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC, 
30L);
-    shuffleServerConf.setLong(ShuffleServerConf.SHUFFLE_EXPIRED_TIMEOUT_MS, 
5000L);
-    
shuffleServerConf.setLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
 60L * 1000L * 60L);
-    shuffleServerConf.setLong(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 20L * 
1000L);
-    shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE_HDFS.name());
-    shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(basePath));
-    
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 
400L * 1024L * 1024L);
-    createAndStartServers(shuffleServerConf, coordinatorConf);
-  }
-
   @BeforeEach
   public void createClient() {
+    ShuffleServerClientFactory.getInstance().cleanupCache();
     shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, 
SHUFFLE_SERVER_PORT);
   }
 
@@ -83,19 +62,19 @@ public class MultiStorageFaultToleranceTest extends 
ShuffleReadWriteBase {
     shuffleServerClient.close();
   }
 
+  abstract void makeChaos();
+
   @Test
-  public void hdfsFallbackTest() throws Exception {
-    String appId = "fallback_test";
+  public void fallbackTest() throws Exception {
+    String appId = "fallback_test_" + this.getClass().getSimpleName();
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     Map<Integer, List<Integer>> map = Maps.newHashMap();
     map.put(0, Lists.newArrayList(0));
     registerShuffle(appId, map);
     Roaring64NavigableMap blockBitmap = Roaring64NavigableMap.bitmapOf();
     final List<ShuffleBlockInfo> blocks = createShuffleBlockList(
-        0, 0, 0, 40, 10 * 1024 * 1024, blockBitmap, expectedData);
-    assertEquals(1, cluster.getDataNodes().size());
-    cluster.stopDataNode(0);
-    assertEquals(0, cluster.getDataNodes().size());
+        0, 0, 0, 40, 2 * 1024 * 1024, blockBitmap, expectedData);
+    makeChaos();
     sendSinglePartitionToShuffleServer(appId, 0, 0, 0, blocks);
     validateResult(appId, 0, 0, blockBitmap, 
Roaring64NavigableMap.bitmapOf(0), expectedData);
   }
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageHdfsFallbackTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageHdfsFallbackTest.java
new file mode 100644
index 00000000..63ec7ac7
--- /dev/null
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageHdfsFallbackTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.util.Arrays;
+
+import org.junit.jupiter.api.BeforeAll;
+
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class MultiStorageHdfsFallbackTest extends 
MultiStorageFaultToleranceBase {
+
+  @BeforeAll
+  public static void setupServers() throws Exception {
+    final CoordinatorConf coordinatorConf = getCoordinatorConf();
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    String basePath = generateBasePath();
+    shuffleServerConf.setDouble(ShuffleServerConf.CLEANUP_THRESHOLD, 0.0);
+    shuffleServerConf.setDouble(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE, 
100.0);
+    shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 
100);
+    shuffleServerConf.setLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC, 
30L);
+    shuffleServerConf.setLong(ShuffleServerConf.SHUFFLE_EXPIRED_TIMEOUT_MS, 
5000L);
+    
shuffleServerConf.setLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
 60L * 1000L * 60L);
+    shuffleServerConf.setLong(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 20L * 
1000L);
+    shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE_HDFS.name());
+    shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(basePath));
+    
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 
1L * 1024L * 1024L);
+    createAndStartServers(shuffleServerConf, coordinatorConf);
+  }
+
+  @Override
+  public void makeChaos() {
+    assertEquals(1, cluster.getDataNodes().size());
+    cluster.stopDataNode(0);
+    assertEquals(0, cluster.getDataNodes().size());
+  }
+}
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageLocalfileFallbackTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageLocalfileFallbackTest.java
new file mode 100644
index 00000000..7d70d017
--- /dev/null
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageLocalfileFallbackTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.util.Arrays;
+
+import org.junit.jupiter.api.BeforeAll;
+
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.storage.LocalStorageManager;
+import org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy;
+import org.apache.uniffle.server.storage.MultiStorageManager;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class MultiStorageLocalfileFallbackTest extends 
MultiStorageFaultToleranceBase {
+
+  @BeforeAll
+  public static void setupServers() throws Exception {
+    final CoordinatorConf coordinatorConf = getCoordinatorConf();
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    String basePath = generateBasePath();
+    shuffleServerConf.setDouble(ShuffleServerConf.CLEANUP_THRESHOLD, 0.0);
+    shuffleServerConf.setDouble(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE, 
100.0);
+    shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 
100);
+    shuffleServerConf.setLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC, 
30L);
+    shuffleServerConf.setLong(ShuffleServerConf.SHUFFLE_EXPIRED_TIMEOUT_MS, 
5000L);
+    
shuffleServerConf.setLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
 60L * 1000L * 60L);
+    shuffleServerConf.setLong(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 20L * 
1000L);
+    shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE_HDFS.name());
+    shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(basePath));
+    
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 
1000L * 1024L * 1024L);
+    shuffleServerConf.setString(
+        ShuffleServerConf.MULTISTORAGE_FALLBACK_STRATEGY_CLASS,
+        LocalStorageManagerFallbackStrategy.class.getCanonicalName()
+    );
+    createAndStartServers(shuffleServerConf, coordinatorConf);
+  }
+
+  @Override
+  public void makeChaos() {
+    LocalStorageManager warmStorageManager =
+        (LocalStorageManager) 
((MultiStorageManager)shuffleServers.get(0).getStorageManager()).getWarmStorageManager();
+    for (Storage storage : warmStorageManager.getStorages()) {
+      LocalStorage localStorage = (LocalStorage) storage;
+      localStorage.markSpaceFull();
+    }
+  }
+}
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
index 4337b855..6d82e126 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
@@ -59,4 +59,10 @@ public class ShuffleServerClientFactory {
     }
     return serverToClients.get(shuffleServerInfo);
   }
+
+  // Only for tests
+  public synchronized void cleanupCache() {
+    clients.values().stream().flatMap(x -> 
x.values().stream()).forEach(ShuffleServerClient::close);
+    this.clients = Maps.newConcurrentMap();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
index 1fe5ba9d..883ef14a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
@@ -17,14 +17,20 @@
 
 package org.apache.uniffle.server;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
+import org.apache.uniffle.storage.common.Storage;
 
 public class ShuffleDataFlushEvent {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ShuffleDataFlushEvent.class);
 
   private final long eventId;
   private final String appId;
@@ -37,6 +43,10 @@ public class ShuffleDataFlushEvent {
   private final ShuffleBuffer shuffleBuffer;
   private final AtomicInteger retryTimes = new AtomicInteger();
 
+  private boolean isPended = false;
+  private Storage underStorage;
+  private final List<Runnable> cleanupCallbackChains;
+
   public ShuffleDataFlushEvent(
       long eventId,
       String appId,
@@ -56,6 +66,7 @@ public class ShuffleDataFlushEvent {
     this.shuffleBlocks = shuffleBlocks;
     this.valid = valid;
     this.shuffleBuffer = shuffleBuffer;
+    this.cleanupCallbackChains = new ArrayList<>();
   }
 
   public List<ShufflePartitionedBlock> getShuffleBlocks() {
@@ -105,6 +116,42 @@ public class ShuffleDataFlushEvent {
     retryTimes.incrementAndGet();
   }
 
+  public boolean isPended() {
+    return isPended;
+  }
+
+  public void markPended() {
+    isPended = true;
+  }
+
+  public Storage getUnderStorage() {
+    return underStorage;
+  }
+
+  public void setUnderStorage(Storage underStorage) {
+    this.underStorage = underStorage;
+  }
+
+  public boolean doCleanup() {
+    boolean ret = true;
+    for (Runnable cleanupCallback : cleanupCallbackChains) {
+      try {
+        cleanupCallback.run();
+      } catch (Exception e) {
+        ret = false;
+        LOGGER.error("Errors doing cleanup callback. event: {}", this, e);
+      }
+    }
+    return ret;
+  }
+
+  public void addCleanupCallback(
+      Runnable cleanupCallback) {
+    if (cleanupCallback != null) {
+      cleanupCallbackChains.add(cleanupCallback);
+    }
+  }
+
   @Override
   public String toString() {
     return "ShuffleDataFlushEvent: eventId=" + eventId
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index b2a67853..367fdaff 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -40,7 +40,7 @@ import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.util.ThreadUtils;
-import org.apache.uniffle.server.buffer.ShuffleBuffer;
+import org.apache.uniffle.server.storage.MultiStorageManager;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -143,86 +143,100 @@ public class ShuffleFlushManager {
   }
 
   private void flushToFile(ShuffleDataFlushEvent event) {
-
-    Storage storage = storageManager.selectStorage(event);
-    if (storage != null && !storage.canWrite()) {
-      addPendingEvents(event);
-      return;
-    }
-
     long start = System.currentTimeMillis();
-    List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
     boolean writeSuccess = false;
-    try {
-      // storage info maybe null if the application cache was cleared already
-      if (storage != null) {
-        if (blocks == null || blocks.isEmpty()) {
-          LOG.info("There is no block to be flushed: " + event);
-        } else if (!event.isValid()) {
-          //  avoid printing error log
+
+    while (event.getRetryTimes() <= retryMax) {
+      try {
+        if (!event.isValid()) {
           writeSuccess = true;
           LOG.warn("AppId {} was removed already, event {} should be dropped", 
event.getAppId(), event);
-        } else {
-          String user = StringUtils.defaultString(
-              
shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
-              StringUtils.EMPTY
-          );
-          CreateShuffleWriteHandlerRequest request = new 
CreateShuffleWriteHandlerRequest(
-                  storageType,
-                  event.getAppId(),
-                  event.getShuffleId(),
-                  event.getStartPartition(),
-                  event.getEndPartition(),
-                  storageBasePaths.toArray(new 
String[storageBasePaths.size()]),
-                  shuffleServerId,
-                  hadoopConf,
-                  storageDataReplica,
-                  user);
-          ShuffleWriteHandler handler = 
storage.getOrCreateWriteHandler(request);
-          do {
-            if (event.getRetryTimes() > retryMax) {
-              LOG.error("Failed to write data for " + event + " in " + 
retryMax + " times, shuffle data will be lost");
-              
ShuffleServerMetrics.incStorageFailedCounter(storage.getStorageHost());
-              break;
-            }
-            if (!event.isValid()) {
-              LOG.warn("AppId {} was removed already, event {} should be 
dropped, may leak one handler",
-                  event.getAppId(), event);
-              //  avoid printing error log
-              writeSuccess = true;
-              break;
-            }
+          break;
+        }
+
+        List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+        if (blocks == null || blocks.isEmpty()) {
+          LOG.info("There is no block to be flushed: {}", event);
+          break;
+        }
 
-            writeSuccess = storageManager.write(storage, handler, event);
+        Storage storage = storageManager.selectStorage(event);
+        if (storage == null) {
+          break;
+        }
 
-            if (writeSuccess) {
-              updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), 
blocks);
-              
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
+        if (!storage.canWrite()) {
+          // todo: Could we add an interface supportPending for storageManager
+          //       to unify following logic of multiple different storage 
managers
+          if (storageManager instanceof MultiStorageManager) {
+            event.increaseRetryTimes();
+            
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+            continue;
+          } else {
+            // To avoid being re-pushed to pending queue and make the server 
too much pressure,
+            // it's better to drop directly.
+            if (event.isPended()) {
+              LOG.error("Drop this event directly due to already having 
entered pending queue. event: {}", event);
               break;
-            } else {
-              event.increaseRetryTimes();
-              
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
             }
-          } while (event.getRetryTimes() <= retryMax);
+            event.increaseRetryTimes();
+            event.markPended();
+            addPendingEvents(event);
+            return;
+          }
         }
-      }
-    } catch (Exception e) {
-      // just log the error, don't throw the exception and stop the flush 
thread
-      LOG.error("Exception happened when process flush shuffle data for " + 
event, e);
-    } finally {
-      cleanupFlushEventData(event);
-      if (shuffleServer != null) {
-        long duration = System.currentTimeMillis() - start;
+
+        String user = StringUtils.defaultString(
+            
shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
+            StringUtils.EMPTY
+        );
+        CreateShuffleWriteHandlerRequest request = new 
CreateShuffleWriteHandlerRequest(
+            storageType,
+            event.getAppId(),
+            event.getShuffleId(),
+            event.getStartPartition(),
+            event.getEndPartition(),
+            storageBasePaths.toArray(new String[storageBasePaths.size()]),
+            shuffleServerId,
+            hadoopConf,
+            storageDataReplica,
+            user);
+        ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
+        writeSuccess = storageManager.write(storage, handler, event);
         if (writeSuccess) {
-          LOG.debug("Flush to file success in " + duration + " ms and release 
" + event.getSize() + " bytes");
+          updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), 
blocks);
+          
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
+          break;
         } else {
-          LOG.error("Flush to file for " + event + " failed in "
-              + duration + " ms and release " + event.getSize() + " bytes");
+          event.increaseRetryTimes();
+          
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
         }
+      } catch (Throwable throwable) {
+        // just log the error, don't throw the exception and stop the flush 
thread
+        LOG.error("Exception happened when process flush shuffle data for {}", 
event, throwable);
+        event.increaseRetryTimes();
+      }
+    }
+
+    if (event.getRetryTimes() > retryMax) {
+      LOG.error("Failed to write data for {} in {} times, shuffle data will be 
lost", event, retryMax);
+      if (event.getUnderStorage() != null) {
+        
ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());
       }
     }
-  }
 
+    event.doCleanup();
+    if (shuffleServer != null) {
+      long duration = System.currentTimeMillis() - start;
+      if (writeSuccess) {
+        LOG.debug("Flush to file success in {} ms and release {} bytes", 
duration, event.getSize());
+      } else {
+        ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
+        LOG.error("Flush to file for {} failed in {} ms and release {} bytes", 
event, duration, event.getSize());
+      }
+    }
+  }
+  
   private void updateCommittedBlockIds(String appId, int shuffleId, 
List<ShufflePartitionedBlock> blocks) {
     if (blocks == null || blocks.size() == 0) {
       return;
@@ -309,19 +323,12 @@ public class ShuffleFlushManager {
   }
 
   private void cleanupFlushEventData(ShuffleDataFlushEvent event) {
-    ShuffleBuffer shuffleBuffer = event.getShuffleBuffer();
-    if (shuffleBuffer != null) {
-      shuffleBuffer.clearInFlushBuffer(event.getEventId());
-    }
-    if (shuffleServer != null) {
-      shuffleServer.getShuffleBufferManager().releaseMemory(
-          event.getSize(), true, false);
-    }
+    event.doCleanup();
   }
 
   private void dropPendingEvent(PendingShuffleFlushEvent event) {
     ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
-    cleanupFlushEventData(event.getEvent());
+    event.getEvent().doCleanup();
   }
 
   @VisibleForTesting
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 0bc8d903..60071ef3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -71,6 +71,7 @@ public class ShuffleServerMetrics {
   private static final String USED_BUFFER_SIZE = "used_buffer_size";
   private static final String TOTAL_UPLOAD_SIZE = "total_upload_size";
   private static final String TOTAL_UPLOAD_TIME_S = "total_upload_time_s";
+  private static final String TOTAL_FAILED_WRITTEN_EVENT_NUM = 
"total_failed_written_event_num";
   private static final String TOTAL_DROPPED_EVENT_NUM = 
"total_dropped_event_num";
   private static final String TOTAL_HDFS_WRITE_DATA = "total_hdfs_write_data";
   private static final String TOTAL_LOCALFILE_WRITE_DATA = 
"total_localfile_write_data";
@@ -102,6 +103,7 @@ public class ShuffleServerMetrics {
   public static Counter counterTotalReadTime;
   public static Counter counterTotalUploadSize;
   public static Counter counterTotalUploadTimeS;
+  public static Counter counterTotalFailedWrittenEventNum;
   public static Counter counterTotalDroppedEventNum;
   public static Counter counterTotalHdfsWriteDataSize;
   public static Counter counterTotalLocalFileWriteDataSize;
@@ -251,6 +253,7 @@ public class ShuffleServerMetrics {
     counterTotalUploadSize = metricsManager.addCounter(TOTAL_UPLOAD_SIZE);
     counterTotalUploadTimeS = metricsManager.addCounter(TOTAL_UPLOAD_TIME_S);
     counterTotalDroppedEventNum = 
metricsManager.addCounter(TOTAL_DROPPED_EVENT_NUM);
+    counterTotalFailedWrittenEventNum = 
metricsManager.addCounter(TOTAL_FAILED_WRITTEN_EVENT_NUM);
     counterTotalHdfsWriteDataSize = 
metricsManager.addCounter(TOTAL_HDFS_WRITE_DATA);
     counterTotalLocalFileWriteDataSize = 
metricsManager.addCounter(TOTAL_LOCALFILE_WRITE_DATA);
     counterTotalRequireBufferFailed = 
metricsManager.addCounter(TOTAL_REQUIRE_BUFFER_FAILED);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index 37e497a8..874bcaae 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -104,6 +104,7 @@ public class ShuffleBuffer {
         spBlocks,
         isValid,
         this);
+    event.addCleanupCallback(() -> 
this.clearInFlushBuffer(event.getEventId()));
     inFlushBlockMap.put(eventId, inFlushedQueueBlocks);
     blocks.clear();
     size = 0;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index c3606a08..b606d30c 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -218,6 +218,7 @@ public class ShuffleBufferManager {
             shuffleFlushManager.getDataDistributionType(appId)
         );
     if (event != null) {
+      event.addCleanupCallback(() -> releaseMemory(event.getSize(), true, 
false));
       updateShuffleSize(appId, shuffleId, -event.getSize());
       inFlushSize.addAndGet(event.getSize());
       ShuffleServerMetrics.gaugeInFlushBufferSize.set(inFlushSize.get());
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 2151e75d..09c7dd03 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -70,7 +70,9 @@ public class HdfsStorageManager extends SingleStorageManager {
 
   @Override
   public Storage selectStorage(ShuffleDataFlushEvent event) {
-    return getStorageByAppId(event.getAppId());
+    Storage storage = getStorageByAppId(event.getAppId());
+    event.setUnderStorage(storage);
+    return storage;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index db0f47e3..4e47a34f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -151,6 +151,7 @@ public class LocalStorageManager extends 
SingleStorageManager {
     if (storage.isCorrupted()) {
       storage = getRepairedStorage(event.getAppId(), event.getShuffleId(), 
event.getStartPartition());
     }
+    event.setUnderStorage(storage);
     return storage;
   }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
index adb84369..ee53ff03 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.uniffle.server.storage;
 
-import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
@@ -35,7 +34,6 @@ import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.event.PurgeEvent;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
-import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
 
 public class MultiStorageManager implements StorageManager {
 
@@ -45,7 +43,7 @@ public class MultiStorageManager implements StorageManager {
   private final StorageManager coldStorageManager;
   private final long flushColdStorageThresholdSize;
   private AbstractStorageManagerFallbackStrategy 
storageManagerFallbackStrategy;
-  private Cache<ShuffleDataFlushEvent, StorageManager> storageManagerCache;
+  private final Cache<ShuffleDataFlushEvent, StorageManager> 
eventOfUnderStorageManagers;
 
   MultiStorageManager(ShuffleServerConf conf) {
     warmStorageManager = new LocalStorageManager(conf);
@@ -57,7 +55,7 @@ public class MultiStorageManager implements StorageManager {
       throw new RuntimeException("Load fallback strategy failed.", e);
     }
     long cacheTimeout = 
conf.getLong(ShuffleServerConf.STORAGEMANAGER_CACHE_TIMEOUT);
-    storageManagerCache = CacheBuilder.newBuilder()
+    eventOfUnderStorageManagers = CacheBuilder.newBuilder()
         .expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS)
         .build();
   }
@@ -99,51 +97,32 @@ public class MultiStorageManager implements StorageManager {
     throw new UnsupportedOperationException();
   }
 
-  @Override
-  public boolean write(Storage storage, ShuffleWriteHandler handler, 
ShuffleDataFlushEvent event) {
-    StorageManager storageManager = selectStorageManager(event);
-    if (event.getRetryTimes() > 0) {
-      try {
-        StorageManager newStorageManager = 
storageManagerFallbackStrategy.tryFallback(
-                storageManager, event, warmStorageManager, coldStorageManager);
-        if (newStorageManager != storageManager) {
-          storageManager = newStorageManager;
-          storageManagerCache.put(event, storageManager);
-          CreateShuffleWriteHandlerRequest request = 
storage.getCreateWriterHandlerRequest(
-              event.getAppId(), event.getShuffleId(), 
event.getStartPartition());
-          storage = storageManager.selectStorage(event);
-          handler = storage.getOrCreateWriteHandler(request);
-        }
-      } catch (IOException ioe) {
-        LOG.warn("Create fallback write handler failed ", ioe);
-      }
-    }
-    boolean success = storageManager.write(storage, handler, event);
-    if (success) {
-      storageManagerCache.invalidate(event);
-    }
-    return success;
-  }
-
   private StorageManager selectStorageManager(ShuffleDataFlushEvent event) {
-    StorageManager storageManager = storageManagerCache.getIfPresent(event);
-    if (storageManager != null) {
-      return storageManager;
-    }
+    StorageManager storageManager;
     if (event.getSize() > flushColdStorageThresholdSize) {
       storageManager = coldStorageManager;
     } else {
       storageManager = warmStorageManager;
     }
 
-    if (!storageManager.canWrite(event)) {
+    if (!storageManager.canWrite(event) || event.getRetryTimes() > 0) {
       storageManager = storageManagerFallbackStrategy.tryFallback(
           storageManager, event, warmStorageManager, coldStorageManager);
     }
-    storageManagerCache.put(event, storageManager);
+    eventOfUnderStorageManagers.put(event, storageManager);
+    event.addCleanupCallback(() -> 
eventOfUnderStorageManagers.invalidate(event));
     return storageManager;
   }
 
+  @Override
+  public boolean write(Storage storage, ShuffleWriteHandler handler, 
ShuffleDataFlushEvent event) {
+    StorageManager underStorageManager = 
eventOfUnderStorageManagers.getIfPresent(event);
+    if (underStorageManager == null) {
+      return false;
+    }
+    return underStorageManager.write(storage, handler, event);
+  }
+
   public void start() {
   }
 
@@ -174,4 +153,8 @@ public class MultiStorageManager implements StorageManager {
   public StorageManager getColdStorageManager() {
     return coldStorageManager;
   }
+
+  public StorageManager getWarmStorageManager() {
+    return warmStorageManager;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
index 6219cb54..2acc49c4 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
@@ -104,7 +104,7 @@ public abstract class SingleStorageManager implements 
StorageManager {
       } else {
         ShuffleServerMetrics.counterEventSizeThresholdLevel4.inc();
       }
-      Storage storage = selectStorage(event);
+      Storage storage = event.getUnderStorage();
       if (storage != null) {
         storage.updateWriteMetrics(metrics);
       }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 5f1bb47d..8d27f0f8 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -54,11 +54,14 @@ import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
 import org.apache.uniffle.server.event.AppPurgeEvent;
 import org.apache.uniffle.server.storage.HdfsStorageManager;
+import org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy;
+import org.apache.uniffle.server.storage.MultiStorageManager;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.server.storage.StorageManagerFactory;
 import org.apache.uniffle.storage.HdfsTestBase;
 import org.apache.uniffle.storage.common.AbstractStorage;
 import org.apache.uniffle.storage.common.HdfsStorage;
+import org.apache.uniffle.storage.common.LocalStorage;
 import org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler;
 import org.apache.uniffle.storage.util.StorageType;
 
@@ -360,9 +363,26 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
 
   public static ShuffleDataFlushEvent createShuffleDataFlushEvent(
       String appId, int shuffleId, int startPartition, int endPartition, 
Supplier<Boolean> isValid) {
+    return createShuffleDataFlushEvent(
+      appId,
+      shuffleId,
+        startPartition,
+        endPartition,
+        isValid,
+        1
+    );
+  }
+
+  public static ShuffleDataFlushEvent createShuffleDataFlushEvent(
+      String appId,
+      int shuffleId,
+      int startPartition,
+      int endPartition,
+      Supplier<Boolean> isValid,
+      int size) {
     List<ShufflePartitionedBlock> spbs = createBlock(5, 32);
     return new ShuffleDataFlushEvent(ATOMIC_LONG.getAndIncrement(),
-        appId, shuffleId, startPartition, endPartition, 1, spbs, isValid, 
null);
+        appId, shuffleId, startPartition, endPartition, size, spbs, isValid, 
null);
   }
 
   public static List<ShufflePartitionedBlock> createBlock(int num, int length) 
{
@@ -416,6 +436,55 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     assertEquals(blocks.size(), matchNum);
   }
 
+  @Test
+  public void fallbackWrittenWhenMultiStorageManagerEnableTest(@TempDir File 
tempDir) throws InterruptedException {
+    
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 
10000L);
+    shuffleServerConf.set(RssBaseConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE_HDFS.toString());
+    shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(tempDir.getAbsolutePath()));
+    shuffleServerConf.set(ShuffleServerConf.DISK_CAPACITY, 100L);
+    
shuffleServerConf.setString(ShuffleServerConf.MULTISTORAGE_FALLBACK_STRATEGY_CLASS,
+        LocalStorageManagerFallbackStrategy.class.getCanonicalName());
+
+    StorageManager storageManager = 
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+    String remoteStorage = "test";
+    String appId = "fallbackWrittenWhenMultiStorageManagerEnableTest";
+    storageManager.registerRemoteStorage(appId, new 
RemoteStorageInfo(remoteStorage));
+
+    ShuffleFlushManager flushManager = new ShuffleFlushManager(
+        shuffleServerConf,
+        "shuffle-server-id",
+        mockShuffleServer,
+        storageManager
+    );
+
+    // case1: normally written to local storage
+    ShuffleDataFlushEvent event = createShuffleDataFlushEvent(appId, 1, 1, 1, 
null, 100);
+    flushManager.addToFlushQueue(event);
+    Thread.sleep(1000);
+    assertTrue(event.getUnderStorage() instanceof LocalStorage);
+
+    // case2: huge event is written to cold storage directly
+    event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100000);
+    flushManager.addToFlushQueue(event);
+    Thread.sleep(1000);
+    assertTrue(event.getUnderStorage() instanceof HdfsStorage);
+    assertEquals(0, event.getRetryTimes());
+
+    // case3: local disk is full or corrupted, fallback to HDFS
+    List<ShufflePartitionedBlock> blocks = Lists.newArrayList(
+        new ShufflePartitionedBlock(100000, 1000, 1, 1, 1L, null)
+    );
+    ShuffleDataFlushEvent bigEvent = new ShuffleDataFlushEvent(1, "1", 1, 1, 
1, 100, blocks, null, null);
+    
bigEvent.setUnderStorage(((MultiStorageManager)storageManager).getWarmStorageManager().selectStorage(event));
+    
((MultiStorageManager)storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent,
 0);
+
+    event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100);
+    flushManager.addToFlushQueue(event);
+    Thread.sleep(1000);
+    assertTrue(event.getUnderStorage() instanceof HdfsStorage);
+    assertEquals(1, event.getRetryTimes());
+  }
+
   @Test
   public void processPendingEventsTest(@TempDir File tempDir) {
     try {
@@ -427,7 +496,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
           
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
       ShuffleFlushManager manager =
           new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", 
mockShuffleServer, storageManager);
-      ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(1, "1", 1, 1,1, 
100, null, null, null);
+      ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(1, "1", 1, 1, 1, 
100, null, null, null);
       assertEquals(0, manager.getPendingEventsSize());
       manager.addPendingEvents(event);
       Thread.sleep(1000);
@@ -435,9 +504,12 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
       do {
         Thread.sleep(1 * 1000);
       } while (manager.getEventNumInFlush() != 0);
+
       List<ShufflePartitionedBlock> blocks = Lists.newArrayList(new 
ShufflePartitionedBlock(100, 1000, 1, 1, 1L, null));
       ShuffleDataFlushEvent bigEvent = new ShuffleDataFlushEvent(1, "1", 1, 1, 
1, 100, blocks, null, null);
+      bigEvent.setUnderStorage(storageManager.selectStorage(event));
       storageManager.updateWriteMetrics(bigEvent, 0);
+
       manager.addPendingEvents(event);
       manager.addPendingEvents(event);
       manager.addPendingEvents(event);
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index 6276dc23..52629f40 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -90,7 +90,6 @@ public class ShuffleServerMetricsTest {
     JsonNode actualObj = mapper.readTree(content);
     assertEquals(2, actualObj.size());
     JsonNode metricsNode = actualObj.get("metrics");
-    assertEquals(49, metricsNode.size());
 
     List<String> expectedMetricNames = Lists.newArrayList(
         ShuffleServerMetrics.STORAGE_TOTAL_WRITE_REMOTE_PREFIX + STORAGE_HOST,
diff --git 
a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
index 1c6075aa..d385e933 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
@@ -57,7 +57,7 @@ public class MultiStorageManagerTest {
   }
 
   @Test
-  public void selectStorageManagerIfCanNotWriteTest() {
+  public void underStorageManagerSelectionTest() {
     ShuffleServerConf conf = new ShuffleServerConf();
     conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 10000L);
     conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
@@ -69,7 +69,21 @@ public class MultiStorageManagerTest {
     String remoteStorage = "test";
     String appId = "selectStorageManagerIfCanNotWriteTest_appId";
     manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
-    List<ShufflePartitionedBlock> blocks = Lists.newArrayList(new 
ShufflePartitionedBlock(100, 1000, 1, 1, 1L, null));
+
+    /**
+     * case1: big event should be written into cold storage directly
+     */
+    List<ShufflePartitionedBlock> blocks = Lists.newArrayList(
+        new ShufflePartitionedBlock(10001, 1000, 1, 1, 1L, null)
+    );
+    ShuffleDataFlushEvent hugeEvent = new ShuffleDataFlushEvent(
+        1, appId, 1, 1, 1, 10001, blocks, null, null);
+    assertTrue(manager.selectStorage(hugeEvent) instanceof HdfsStorage);
+
+    /**
+     * case2: fallback when disk can not write
+     */
+    blocks = Lists.newArrayList(new ShufflePartitionedBlock(100, 1000, 1, 1, 
1L, null));
     ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(
         1, appId, 1, 1, 1, 1000, blocks, null, null);
     Storage storage = manager.selectStorage(event);
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index 4032f3b1..185ce58f 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -331,6 +331,12 @@ public class LocalStorage extends AbstractStorage {
     return appIds;
   }
 
+  // Only for test
+  @VisibleForTesting
+  public void markSpaceFull() {
+    isSpaceEnough = false;
+  }
+
   public static class Builder {
     private long capacity;
     private double lowWaterMarkOfWrite;
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
index 7aac7d0f..a2d96940 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
@@ -90,7 +90,7 @@ public class LocalFileClientReadHandler extends 
DataSkippableReadHandler {
       shuffleIndexResult = 
shuffleServerClient.getShuffleIndex(request).getShuffleIndexResult();
     } catch (Exception e) {
       throw new RssException("Failed to read shuffle index for appId[" + appId 
+ "], shuffleId["
-        + shuffleId + "], partitionId[" + partitionId + "] due to " + 
e.getMessage());
+        + shuffleId + "], partitionId[" + partitionId + "]", e);
     }
     return shuffleIndexResult;
   }


Reply via email to