This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ee1688c8 [#1428] fix(server): fallback invalid when local storage 
can't write (#1429)
3ee1688c8 is described below

commit 3ee1688c828c26d60f171ddf54b701beaaf0dfaf
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jan 17 13:45:47 2024 +0800

    [#1428] fix(server): fallback invalid when local storage can't write (#1429)
    
    ### What changes were proposed in this pull request?
    
    allow specifying negative fallback threshold to avoid event being discarded
    
    ### Why are the changes needed?
    
    Fix: #1428
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests
---
 .../apache/uniffle/server/ShuffleServerConf.java   |  2 -
 .../HadoopStorageManagerFallbackStrategy.java      |  2 +-
 .../LocalStorageManagerFallbackStrategy.java       |  2 +-
 .../RotateStorageManagerFallbackStrategy.java      |  3 +-
 .../uniffle/server/ShuffleFlushManagerTest.java    | 13 +++---
 .../server/storage/HybridStorageManagerTest.java   | 48 ++++++++++++++++++++++
 6 files changed, 58 insertions(+), 12 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 77287b2a7..f968a1722 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -352,8 +352,6 @@ public class ShuffleServerConf extends RssBaseConf {
   public static final ConfigOption<Long> FALLBACK_MAX_FAIL_TIMES =
       ConfigOptions.key("rss.server.hybrid.storage.fallback.max.fail.times")
           .longType()
-          .checkValue(
-              ConfigUtils.NON_NEGATIVE_LONG_VALIDATOR, " fallback times must 
be non-negative")
           .defaultValue(0L)
           .withDescription("For hybrid storage, fail times exceed the number, 
will switch storage")
           
.withDeprecatedKeys("rss.server.multistorage.fallback.max.fail.times");
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManagerFallbackStrategy.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManagerFallbackStrategy.java
index 48a0858ee..e569578db 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManagerFallbackStrategy.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManagerFallbackStrategy.java
@@ -37,7 +37,7 @@ public class HadoopStorageManagerFallbackStrategy extends 
AbstractStorageManager
   @Override
   public StorageManager tryFallback(
       StorageManager current, ShuffleDataFlushEvent event, StorageManager... 
candidates) {
-    if (event.getRetryTimes() > fallBackTimes) {
+    if (event.getRetryTimes() > fallBackTimes || !current.canWrite(event)) {
       return findNextStorageManager(current, excludeTypes, event, candidates);
     }
     return current;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java
index d506fe97d..565fa2b3c 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java
@@ -37,7 +37,7 @@ public class LocalStorageManagerFallbackStrategy extends 
AbstractStorageManagerF
   @Override
   public StorageManager tryFallback(
       StorageManager current, ShuffleDataFlushEvent event, StorageManager... 
candidates) {
-    if (event.getRetryTimes() > fallBackTimes) {
+    if (event.getRetryTimes() > fallBackTimes || !current.canWrite(event)) {
       return findNextStorageManager(current, excludeTypes, event, candidates);
     }
     return current;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java
index 0f61643d9..623438f20 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java
@@ -32,7 +32,8 @@ public class RotateStorageManagerFallbackStrategy extends 
AbstractStorageManager
   public StorageManager tryFallback(
       StorageManager current, ShuffleDataFlushEvent event, StorageManager... 
candidates) {
     if (fallBackTimes > 0
-        && (event.getRetryTimes() < fallBackTimes || event.getRetryTimes() % 
fallBackTimes > 0)) {
+        && (event.getRetryTimes() < fallBackTimes || event.getRetryTimes() % 
fallBackTimes > 0)
+        && current.canWrite(event)) {
       return current;
     }
     return findNextStorageManager(current, null, event, candidates);
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index ec767bc63..f2b0b3048 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -697,16 +697,15 @@ public class ShuffleFlushManagerTest extends 
HadoopTestBase {
     List<ShufflePartitionedBlock> blocks =
         Lists.newArrayList(new ShufflePartitionedBlock(100000, 1000, 1, 1, 1L, 
(byte[]) null));
     ShuffleDataFlushEvent bigEvent =
-        new ShuffleDataFlushEvent(1, "1", 1, 1, 1, 100, blocks, null, null);
+        new ShuffleDataFlushEvent(1, "1", 2, 1, 1, 100, blocks, null, null);
     bigEvent.setUnderStorage(
         ((HybridStorageManager) 
storageManager).getWarmStorageManager().selectStorage(event));
     ((HybridStorageManager) 
storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent, 0);
 
-    event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100);
+    event = createShuffleDataFlushEvent(appId, 3, 1, 1, null, 100);
     flushManager.addToFlushQueue(event);
     Thread.sleep(1000);
     assertTrue(event.getUnderStorage() instanceof HadoopStorage);
-    assertEquals(1, event.getRetryTimes());
   }
 
   @Test
@@ -756,11 +755,11 @@ public class ShuffleFlushManagerTest extends 
HadoopTestBase {
         ((HybridStorageManager) 
storageManager).getWarmStorageManager().selectStorage(event));
     ((HybridStorageManager) 
storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent, 0);
 
-    event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100);
+    event = createShuffleDataFlushEvent(appId, 2, 1, 1, null, 100);
     flushManager.addToFlushQueue(event);
-    waitForFlush(flushManager, appId, 1, 15);
-    assertEquals(1, event.getRetryTimes());
-    assertEquals(2, ShuffleServerMetrics.counterLocalFileEventFlush.get());
+    waitForFlush(flushManager, appId, 2, 5);
+    assertEquals(0, event.getRetryTimes());
+    assertEquals(1, ShuffleServerMetrics.counterLocalFileEventFlush.get());
     assertEquals(2, ShuffleServerMetrics.counterHadoopEventFlush.get());
   }
 
diff --git 
a/server/src/test/java/org/apache/uniffle/server/storage/HybridStorageManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/storage/HybridStorageManagerTest.java
index d3e12a284..7fe6f3bc0 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/storage/HybridStorageManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/storage/HybridStorageManagerTest.java
@@ -36,6 +36,54 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class HybridStorageManagerTest {
 
+  /**
+   * this tests the fallback strategy when encountering the local storage is 
invalid. 1. When
+   * specifying the fallback max fail time = 0, the event will be discarded 2. 
When specifying the
+   * fallback max fail time < 0, the event will be taken by Hadoop Storage.
+   */
+  @Test
+  public void fallbackTestWhenLocalStorageCorrupted() {
+    ShuffleServerConf conf = new ShuffleServerConf();
+    conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 2000L);
+    conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
+    conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
+    conf.setString(
+        ShuffleServerConf.RSS_STORAGE_TYPE.key(), 
StorageType.MEMORY_LOCALFILE_HDFS.name());
+    conf.setString(
+        ShuffleServerConf.HYBRID_STORAGE_MANAGER_SELECTOR_CLASS,
+        
"org.apache.uniffle.server.storage.hybrid.HugePartitionSensitiveStorageManagerSelector");
+    conf.setString(
+        ShuffleServerConf.HYBRID_STORAGE_FALLBACK_STRATEGY_CLASS,
+        
"org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy");
+
+    // case1: fallback to hadoop storage when fallback_max_fail_time = -1
+    conf.setLong(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES, -1);
+    HybridStorageManager manager = new HybridStorageManager(conf);
+
+    LocalStorageManager localStorageManager = (LocalStorageManager) 
manager.getWarmStorageManager();
+    localStorageManager.getStorages().get(0).markCorrupted();
+
+    String remoteStorage = "test";
+    String appId = "selectStorageManagerWithSelectorAndFallbackStrategy_appId";
+    manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
+    List<ShufflePartitionedBlock> blocks =
+        Lists.newArrayList(new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, 
(byte[]) null));
+    ShuffleDataFlushEvent event =
+        new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 1000, blocks, null, null);
+    assertTrue((manager.selectStorage(event) instanceof HadoopStorage));
+
+    // case2: fallback is still valid when fallback_max_fail_time = 0
+    conf.setLong(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES, 0);
+    manager = new HybridStorageManager(conf);
+
+    localStorageManager = (LocalStorageManager) 
manager.getWarmStorageManager();
+    localStorageManager.getStorages().get(0).markCorrupted();
+
+    event = new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 1000, blocks, null, 
null);
+    manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
+    assertTrue((manager.selectStorage(event) instanceof HadoopStorage));
+  }
+
   @Test
   public void selectStorageManagerTest() {
     ShuffleServerConf conf = new ShuffleServerConf();

Reply via email to