This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f21d2a  [Improvement] Ignore partial failure on initializing local 
storage in shuffle server side (#102)
5f21d2a is described below

commit 5f21d2abcc01490d9ec6fd650a3ca7a2af71ff47
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Jul 29 18:54:15 2022 +0800

    [Improvement] Ignore partial failure on initializing local storage in 
shuffle server side (#102)
    
    ### What changes were proposed in this pull request?
    Ignore partial failure on initializing local storage in shuffle server side
    
    ### Why are the changes needed?
    When setting multiple storage paths, only one disk is insufficient 
capacity, shuffle server will directly throw exception. We hope it can be 
ignored, because bad disks are more common in production environments
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
---
 .../apache/uniffle/server/ShuffleServerConf.java   | 17 +++--
 .../server/storage/LocalStorageManager.java        | 27 ++++++--
 .../server/storage/LocalStorageManagerTest.java    | 76 ++++++++++++++++++----
 .../uniffle/storage/common/LocalStorage.java       |  2 +-
 4 files changed, 98 insertions(+), 24 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 2c54655..c0cca48 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -271,11 +271,18 @@ public class ShuffleServerConf extends RssBaseConf {
       .withDescription("For multistorage, fail times exceed the number, will 
switch storage");
 
   public static final ConfigOption<List<String>> TAGS = ConfigOptions
-          .key("rss.server.tags")
-          .stringType()
-          .asList()
-          .noDefaultValue()
-          .withDescription("Tags list supported by shuffle server");
+      .key("rss.server.tags")
+      .stringType()
+      .asList()
+      .noDefaultValue()
+      .withDescription("Tags list supported by shuffle server");
+
+  public static final ConfigOption<Long> 
LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER = ConfigOptions
+      .key("rss.server.localstorage.initialize.max.fail.number")
+      .longType()
+      .checkValue(ConfigUtils.NON_NEGATIVE_LONG_VALIDATOR, " max fail times 
must be non-negative")
+      .defaultValue(0L)
+      .withDescription("For localstorage, it will exit when the failed 
initialized local storage exceed the number");
 
   public ShuffleServerConf() {
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index ee42b58..5b05dbe 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -19,12 +19,15 @@ package org.apache.uniffle.server.storage;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
@@ -48,6 +51,8 @@ import 
org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 import org.apache.uniffle.storage.util.StorageType;
 
+import static 
org.apache.uniffle.server.ShuffleServerConf.LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER;
+
 public class LocalStorageManager extends SingleStorageManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(LocalStorageManager.class);
 
@@ -57,6 +62,7 @@ public class LocalStorageManager extends SingleStorageManager 
{
   private List<LocalStorage> unCorruptedStorages = Lists.newArrayList();
   private final Set<String> corruptedStorages = Sets.newConcurrentHashSet();
 
+  @VisibleForTesting
   LocalStorageManager(ShuffleServerConf conf) {
     super(conf);
     String storageBasePathStr = 
conf.getString(ShuffleServerConf.RSS_STORAGE_BASE_PATH);
@@ -101,16 +107,24 @@ public class LocalStorageManager extends 
SingleStorageManager {
     try {
       countDownLatch.await();
     } catch (InterruptedException e) {
-      e.printStackTrace();
+      LOG.error("Failed to wait initializing local storage.", e);
     }
-
     executorService.shutdown();
+
     int failedCount = storageBasePaths.length - successCount.get();
-    if (failedCount > 0) {
-      throw new RuntimeException(String.format("[%s] local storage init 
failed!", failedCount));
+    long maxFailedNumber = 
conf.getLong(LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER);
+    if (failedCount > maxFailedNumber || successCount.get() == 0) {
+      throw new RuntimeException(
+          String.format("Initialize %s local storage(s) failed, "
+              + "specified local storage paths size: %s, the conf of %s size: 
%s",
+              failedCount, localStorageArray.length, 
LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER.key(), maxFailedNumber)
+      );
     }
-
-    localStorages = Arrays.asList(localStorageArray);
+    localStorages = 
Arrays.stream(localStorageArray).filter(Objects::nonNull).collect(Collectors.toList());
+    LOG.info(
+        "Succeed to initialize storage paths: {}",
+        
StringUtils.join(localStorages.stream().map(LocalStorage::getBasePath).collect(Collectors.toList()))
+    );
     this.checker = new LocalStorageChecker(conf, localStorages);
   }
 
@@ -131,7 +145,6 @@ public class LocalStorageManager extends 
SingleStorageManager {
     return storage;
   }
 
-
   @Override
   public Storage selectStorage(ShuffleDataReadEvent event) {
 
diff --git 
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
index 477b607..ee7d412 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
@@ -17,32 +17,31 @@
 
 package org.apache.uniffle.server.storage;
 
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.ShuffleServerMetrics;
 import org.apache.uniffle.storage.common.LocalStorage;
 import org.apache.uniffle.storage.util.StorageType;
+
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
-import java.util.List;
-
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.fail;
 
+/**
+ * The class is to test the {@link LocalStorageManager}
+ */
 public class LocalStorageManagerTest {
 
-  private static LocalStorageManager localStorageManager;
-  private static String[] storagePaths = {"/tmp/rssdata", "/tmp/rssdata2"};
-
   @BeforeAll
   public static void prepare() {
     ShuffleServerMetrics.register();
-    ShuffleServerConf conf = new ShuffleServerConf();
-    conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, String.join(",", 
storagePaths));
-    conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
-    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE.name());
-    localStorageManager = new LocalStorageManager(conf);
   }
 
   @AfterAll
@@ -52,6 +51,14 @@ public class LocalStorageManagerTest {
 
   @Test
   public void testInitLocalStorageManager() {
+    String[] storagePaths = {"/tmp/rssdata", "/tmp/rssdata2"};
+
+    ShuffleServerConf conf = new ShuffleServerConf();
+    conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, String.join(",", 
storagePaths));
+    conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
+    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE.name());
+    LocalStorageManager localStorageManager = new LocalStorageManager(conf);
+
     List<LocalStorage> storages = localStorageManager.getStorages();
     assertNotNull(storages);
     assertTrue(storages.size() == storagePaths.length);
@@ -59,5 +66,52 @@ public class LocalStorageManagerTest {
       assertTrue(storagePaths[i].equals(storages.get(i).getBasePath()));
     }
   }
-}
 
+  @Test
+  public void testInitializeLocalStorage() throws IOException {
+
+    // case1: when no candidates, it should throw exception.
+    ShuffleServerConf conf = new ShuffleServerConf();
+    conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 2000L);
+    conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
"/a/rss-data,/b/rss-data");
+    conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
+    conf.setLong(ShuffleServerConf.LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER, 
1);
+    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE_HDFS.name());
+    try {
+      LocalStorageManager localStorageManager = new LocalStorageManager(conf);
+      fail();
+    } catch (Exception e) {
+      // ignore
+    }
+
+    // case2: when candidates exist, it should initialize successfully.
+    conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
"/a/rss-data,/tmp/rss-data-1");
+    LocalStorageManager localStorageManager = new LocalStorageManager(conf);
+    assertEquals(1, localStorageManager.getStorages().size());
+
+    // case3: all ok
+    conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
"/tmp/rss-data-1,/tmp/rss-data-2");
+    localStorageManager = new LocalStorageManager(conf);
+    assertEquals(2, localStorageManager.getStorages().size());
+
+    // case4: only have 1 candidates, but exceed the number of 
rss.server.localstorage.initialize.max.fail.number
+    conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
"/a/rss-data,/tmp/rss-data-1");
+    conf.setLong(ShuffleServerConf.LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER, 
0L);
+    try {
+      localStorageManager = new LocalStorageManager(conf);
+      fail();
+    } catch (Exception e) {
+      // ignore
+    }
+
+    // case5: if failed=2, but lower than 
rss.server.localstorage.initialize.max.fail.number, should exit
+    conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
"/a/rss-data,/b/rss-data-1");
+    conf.setLong(ShuffleServerConf.LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER, 
10L);
+    try {
+      localStorageManager = new LocalStorageManager(conf);
+      fail();
+    } catch (Exception e) {
+      // ignore
+    }
+  }
+}
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index 117c347..0fce345 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -77,7 +77,7 @@ public class LocalStorage extends AbstractStorage {
     }
     long freeSpace = baseFolder.getFreeSpace();
     if (freeSpace < capacity) {
-      throw new IllegalArgumentException("Disk Available Capacity " + freeSpace
+      throw new IllegalArgumentException("The Disk of " + basePath + " 
Available Capacity " + freeSpace
           + " is smaller than configuration");
     }
   }

Reply via email to