This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 493bf19  [Improvement] LocalStorage init use multi thread  (#72)
493bf19 is described below

commit 493bf19e5b6f9bcfc4fc742e2356d259f8200ee7
Author: xianjingfeng <[email protected]>
AuthorDate: Fri Jul 29 15:56:59 2022 +0800

    [Improvement] LocalStorage init use multi thread  (#72)
    
    ### **What changes were proposed in this pull request?**
    solve issue #71, use multi thread to clean local storage
    
    ### **Why are the changes needed?**
    If shuffle server exit abnormally, there will be many files need to be 
clear when shuffle server start again and this operation will cost a lot of time
    
    ### **Does this PR introduce any user-facing change?**
    No
    
    ### **How was this patch tested?**
    Add new ut
---
 .../server/storage/LocalStorageManager.java        | 57 ++++++++++++++++----
 .../server/storage/LocalStorageManagerTest.java    | 63 ++++++++++++++++++++++
 2 files changed, 111 insertions(+), 9 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 4dde90f..ee42b58 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -17,13 +17,20 @@
 
 package org.apache.uniffle.server.storage;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.util.RssUtils;
@@ -42,8 +49,9 @@ import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 import org.apache.uniffle.storage.util.StorageType;
 
 public class LocalStorageManager extends SingleStorageManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LocalStorageManager.class);
 
-  private final List<LocalStorage> localStorages = Lists.newArrayList();
+  private final List<LocalStorage> localStorages;
   private final String[] storageBasePaths;
   private final LocalStorageChecker checker;
   private List<LocalStorage> unCorruptedStorages = Lists.newArrayList();
@@ -63,15 +71,46 @@ public class LocalStorageManager extends 
SingleStorageManager {
     if (highWaterMarkOfWrite < lowWaterMarkOfWrite) {
       throw new IllegalArgumentException("highWaterMarkOfWrite must be larger 
than lowWaterMarkOfWrite");
     }
-    for (String storagePath : storageBasePaths) {
-      localStorages.add(LocalStorage.newBuilder()
-          .basePath(storagePath)
-          .capacity(capacity)
-          .lowWaterMarkOfWrite(lowWaterMarkOfWrite)
-          .highWaterMarkOfWrite(highWaterMarkOfWrite)
-          .shuffleExpiredTimeoutMs(shuffleExpiredTimeoutMs)
-          .build());
+
+    // We must make sure the order of `storageBasePaths` and `localStorages` 
is same, or some unit test may be fail
+    CountDownLatch countDownLatch = new 
CountDownLatch(storageBasePaths.length);
+    AtomicInteger successCount = new AtomicInteger();
+    ExecutorService executorService = Executors.newCachedThreadPool();
+    LocalStorage[] localStorageArray = new 
LocalStorage[storageBasePaths.length];
+    for (int i = 0; i < storageBasePaths.length; i++) {
+      final int idx = i;
+      String storagePath = storageBasePaths[i];
+      executorService.submit(() -> {
+        try {
+          localStorageArray[idx] = LocalStorage.newBuilder()
+              .basePath(storagePath)
+              .capacity(capacity)
+              .lowWaterMarkOfWrite(lowWaterMarkOfWrite)
+              .highWaterMarkOfWrite(highWaterMarkOfWrite)
+              .shuffleExpiredTimeoutMs(shuffleExpiredTimeoutMs)
+              .build();
+          successCount.incrementAndGet();
+        } catch (Exception e) {
+          LOG.error("LocalStorage init failed!", e);
+        } finally {
+          countDownLatch.countDown();
+        }
+      });
+    }
+
+    try {
+      countDownLatch.await();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
     }
+
+    executorService.shutdown();
+    int failedCount = storageBasePaths.length - successCount.get();
+    if (failedCount > 0) {
+      throw new RuntimeException(String.format("[%s] local storage init 
failed!", failedCount));
+    }
+
+    localStorages = Arrays.asList(localStorageArray);
     this.checker = new LocalStorageChecker(conf, localStorages);
   }
 
diff --git 
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
new file mode 100644
index 0000000..477b607
--- /dev/null
+++ 
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage;
+
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.util.StorageType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class LocalStorageManagerTest {
+
+  private static LocalStorageManager localStorageManager;
+  private static String[] storagePaths = {"/tmp/rssdata", "/tmp/rssdata2"};
+
+  @BeforeAll
+  public static void prepare() {
+    ShuffleServerMetrics.register();
+    ShuffleServerConf conf = new ShuffleServerConf();
+    conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, String.join(",", 
storagePaths));
+    conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
+    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE.name());
+    localStorageManager = new LocalStorageManager(conf);
+  }
+
+  @AfterAll
+  public static void clear() {
+    ShuffleServerMetrics.clear();
+  }
+
+  @Test
+  public void testInitLocalStorageManager() {
+    List<LocalStorage> storages = localStorageManager.getStorages();
+    assertNotNull(storages);
+    assertTrue(storages.size() == storagePaths.length);
+    for (int i = 0; i < storagePaths.length; i++) {
+      assertTrue(storagePaths[i].equals(storages.get(i).getBasePath()));
+    }
+  }
+}
+

Reply via email to