This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b4adaa50b [#1209] improvement(server): Speed up 
cleanupStorageSelectionCache method in LocalStorageManager. (#1210)
b4adaa50b is described below

commit b4adaa50b7474330ed7723bba3976e090b8bd022
Author: Fantasy-Jay <[email protected]>
AuthorDate: Mon Sep 25 13:49:06 2023 +0800

    [#1209] improvement(server): Speed up cleanupStorageSelectionCache method 
in LocalStorageManager. (#1210)
    
    ### What changes were proposed in this pull request?
    
    Optimize cleanupStorageSelectionCache method in LocalStorageManager.
    
    ### Why are the changes needed?
    
    Fix: https://github.com/apache/incubator-uniffle/issues/1209
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    Existing UTs.
---
 .../server/storage/LocalStorageManager.java        | 44 ++++++++++++++++------
 .../uniffle/server/ShuffleFlushManagerTest.java    | 20 ++++++++--
 2 files changed, 50 insertions(+), 14 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 0752abcc0..ea47ae1ad 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -22,12 +22,14 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.ServiceLoader;
 import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -50,7 +52,6 @@ import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.storage.StorageInfo;
 import org.apache.uniffle.common.storage.StorageMedia;
 import org.apache.uniffle.common.storage.StorageStatus;
-import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.server.Checker;
@@ -81,7 +82,7 @@ public class LocalStorageManager extends SingleStorageManager 
{
   private final List<String> storageBasePaths;
   private final LocalStorageChecker checker;
 
-  private final Map<String, LocalStorage> partitionsOfStorage;
+  private final ConcurrentSkipListMap<String, LocalStorage> 
sortedPartitionsOfStorageMap;
   private final List<StorageMediaProvider> typeProviders = 
Lists.newArrayList();
 
   @VisibleForTesting
@@ -91,7 +92,7 @@ public class LocalStorageManager extends SingleStorageManager 
{
     if (CollectionUtils.isEmpty(storageBasePaths)) {
       throw new IllegalArgumentException("Base path dirs must not be empty");
     }
-    this.partitionsOfStorage = JavaUtils.newConcurrentMap();
+    this.sortedPartitionsOfStorageMap = new ConcurrentSkipListMap<>();
     long capacity = conf.getSizeAsBytes(ShuffleServerConf.DISK_CAPACITY);
     double ratio = conf.getDouble(ShuffleServerConf.DISK_CAPACITY_RATIO);
     double highWaterMarkOfWrite = 
conf.get(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE);
@@ -182,7 +183,7 @@ public class LocalStorageManager extends 
SingleStorageManager {
     int partitionId = event.getStartPartition();
 
     LocalStorage storage =
-        partitionsOfStorage.get(UnionKey.buildKey(appId, shuffleId, 
partitionId));
+        sortedPartitionsOfStorageMap.get(UnionKey.buildKey(appId, shuffleId, 
partitionId));
     if (storage != null) {
       if (storage.isCorrupted()) {
         if (storage.containsWriteHandler(appId, shuffleId, partitionId)) {
@@ -210,7 +211,7 @@ public class LocalStorageManager extends 
SingleStorageManager {
     final LocalStorage selectedStorage =
         candidates.get(
             ShuffleStorageUtils.getStorageIndex(candidates.size(), appId, 
shuffleId, partitionId));
-    return partitionsOfStorage.compute(
+    return sortedPartitionsOfStorageMap.compute(
         UnionKey.buildKey(appId, shuffleId, partitionId),
         (key, localStorage) -> {
           // If this is the first time to select storage or existing storage 
is corrupted,
@@ -231,7 +232,7 @@ public class LocalStorageManager extends 
SingleStorageManager {
     int shuffleId = event.getShuffleId();
     int partitionId = event.getStartPartition();
 
-    return partitionsOfStorage.get(UnionKey.buildKey(appId, shuffleId, 
partitionId));
+    return sortedPartitionsOfStorageMap.get(UnionKey.buildKey(appId, 
shuffleId, partitionId));
   }
 
   @Override
@@ -301,24 +302,39 @@ public class LocalStorageManager extends 
SingleStorageManager {
 
   private void cleanupStorageSelectionCache(PurgeEvent event) {
     Function<String, Boolean> deleteConditionFunc = null;
+    String prefixKey = null;
     if (event instanceof AppPurgeEvent) {
+      prefixKey = UnionKey.buildKey(event.getAppId());
       deleteConditionFunc =
           partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, 
event.getAppId());
     } else if (event instanceof ShufflePurgeEvent) {
+      int shuffleId = event.getShuffleIds().get(0);
+      prefixKey = UnionKey.buildKey(event.getAppId(), shuffleId);
       deleteConditionFunc =
-          partitionUnionKey ->
-              UnionKey.startsWith(partitionUnionKey, event.getAppId(), 
event.getShuffleIds());
+          partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, 
event.getAppId(), shuffleId);
+    }
+    if (prefixKey == null) {
+      throw new RssException("Prefix key is null when handles event: " + 
event);
     }
     long startTime = System.currentTimeMillis();
-    deleteElement(partitionsOfStorage, deleteConditionFunc);
+    deleteElement(sortedPartitionsOfStorageMap.tailMap(prefixKey), 
deleteConditionFunc);
     LOG.info(
         "Cleaning the storage selection cache costs: {}(ms) for event: {}",
         System.currentTimeMillis() - startTime,
         event);
   }
 
-  private <K, V> void deleteElement(Map<K, V> map, Function<K, Boolean> 
deleteConditionFunc) {
-    map.entrySet().removeIf(entry -> 
deleteConditionFunc.apply(entry.getKey()));
+  private <K, V> void deleteElement(
+      Map<K, V> sortedPartitionsOfStorageMap, Function<K, Boolean> 
deleteConditionFunc) {
+    Iterator<Map.Entry<K, V>> iterator = 
sortedPartitionsOfStorageMap.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<K, V> entry = iterator.next();
+      if (deleteConditionFunc.apply(entry.getKey())) {
+        iterator.remove();
+      } else {
+        break;
+      }
+    }
   }
 
   @Override
@@ -379,4 +395,10 @@ public class LocalStorageManager extends 
SingleStorageManager {
   public List<LocalStorage> getStorages() {
     return localStorages;
   }
+
+  // Only for test.
+  @VisibleForTesting
+  public Map<String, LocalStorage> getSortedPartitionsOfStorageMap() {
+    return sortedPartitionsOfStorageMap;
+  }
 }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 82974e305..ec767bc63 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -55,6 +55,7 @@ import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
 import org.apache.uniffle.server.event.AppPurgeEvent;
+import org.apache.uniffle.server.event.ShufflePurgeEvent;
 import org.apache.uniffle.server.storage.HadoopStorageManager;
 import org.apache.uniffle.server.storage.HybridStorageManager;
 import org.apache.uniffle.server.storage.LocalStorageManager;
@@ -467,30 +468,43 @@ public class ShuffleFlushManagerTest extends 
HadoopTestBase {
     manager.addToFlushQueue(event1);
     ShuffleDataFlushEvent event2 = createShuffleDataFlushEvent(appId2, 1, 0, 
1, null);
     manager.addToFlushQueue(event2);
+    ShuffleDataFlushEvent event3 = createShuffleDataFlushEvent(appId2, 2, 0, 
1, null);
+    manager.addToFlushQueue(event3);
     assertEquals(storageManager.selectStorage(event1), 
storageManager.selectStorage(event2));
     final AbstractStorage storage = (AbstractStorage) 
storageManager.selectStorage(event1);
     waitForFlush(manager, appId1, 1, 5);
     waitForFlush(manager, appId2, 1, 5);
+    waitForFlush(manager, appId2, 2, 5);
     assertEquals(5, manager.getCommittedBlockIds(appId1, 
1).getLongCardinality());
     assertEquals(5, manager.getCommittedBlockIds(appId2, 
1).getLongCardinality());
+    assertEquals(5, manager.getCommittedBlockIds(appId2, 
2).getLongCardinality());
     assertEquals(2, storage.getHandlerSize());
     File file = new File(tempDir, appId1);
     assertTrue(file.exists());
+    storageManager.removeResources(
+        new ShufflePurgeEvent(appId1, StringUtils.EMPTY, 
Lists.newArrayList(1)));
+    ShuffleDataFlushEvent event4 = createShuffleDataFlushEvent(appId1, 1, 0, 
1, () -> false);
+    manager.addToFlushQueue(event4);
+    Thread.sleep(1000);
     storageManager.removeResources(
         new AppPurgeEvent(appId1, StringUtils.EMPTY, Lists.newArrayList(1)));
     manager.removeResources(appId1);
     assertFalse(file.exists());
-    ShuffleDataFlushEvent event3 = createShuffleDataFlushEvent(appId1, 1, 0, 
1, () -> false);
-    manager.addToFlushQueue(event3);
-    Thread.sleep(1000);
     assertEquals(0, manager.getCommittedBlockIds(appId1, 
1).getLongCardinality());
     assertEquals(5, manager.getCommittedBlockIds(appId2, 
1).getLongCardinality());
+    assertEquals(5, manager.getCommittedBlockIds(appId2, 
2).getLongCardinality());
     assertEquals(1, storage.getHandlerSize());
     manager.removeResources(appId2);
+    storageManager.removeResources(
+        new ShufflePurgeEvent(appId2, StringUtils.EMPTY, 
Lists.newArrayList(1)));
+    storageManager.removeResources(
+        new ShufflePurgeEvent(appId2, StringUtils.EMPTY, 
Lists.newArrayList(2)));
     storageManager.removeResources(
         new AppPurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1)));
     assertEquals(0, manager.getCommittedBlockIds(appId2, 
1).getLongCardinality());
     assertEquals(0, storage.getHandlerSize());
+    assertEquals(
+        0, ((LocalStorageManager) 
storageManager).getSortedPartitionsOfStorageMap().size());
   }
 
   private void waitForMetrics(Gauge.Child gauge, double expected, double 
delta) throws Exception {

Reply via email to