This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new b4adaa50b [#1209] improvement(server): Speed up
cleanupStorageSelectionCache method in LocalStorageManager. (#1210)
b4adaa50b is described below
commit b4adaa50b7474330ed7723bba3976e090b8bd022
Author: Fantasy-Jay <[email protected]>
AuthorDate: Mon Sep 25 13:49:06 2023 +0800
[#1209] improvement(server): Speed up cleanupStorageSelectionCache method
in LocalStorageManager. (#1210)
### What changes were proposed in this pull request?
Optimize cleanupStorageSelectionCache method in LocalStorageManager.
### Why are the changes needed?
Fix: https://github.com/apache/incubator-uniffle/issues/1209
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
.../server/storage/LocalStorageManager.java | 44 ++++++++++++++++------
.../uniffle/server/ShuffleFlushManagerTest.java | 20 ++++++++--
2 files changed, 50 insertions(+), 14 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 0752abcc0..ea47ae1ad 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -22,12 +22,14 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
@@ -50,7 +52,6 @@ import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.storage.StorageMedia;
import org.apache.uniffle.common.storage.StorageStatus;
-import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.Checker;
@@ -81,7 +82,7 @@ public class LocalStorageManager extends SingleStorageManager
{
private final List<String> storageBasePaths;
private final LocalStorageChecker checker;
- private final Map<String, LocalStorage> partitionsOfStorage;
+ private final ConcurrentSkipListMap<String, LocalStorage>
sortedPartitionsOfStorageMap;
private final List<StorageMediaProvider> typeProviders =
Lists.newArrayList();
@VisibleForTesting
@@ -91,7 +92,7 @@ public class LocalStorageManager extends SingleStorageManager
{
if (CollectionUtils.isEmpty(storageBasePaths)) {
throw new IllegalArgumentException("Base path dirs must not be empty");
}
- this.partitionsOfStorage = JavaUtils.newConcurrentMap();
+ this.sortedPartitionsOfStorageMap = new ConcurrentSkipListMap<>();
long capacity = conf.getSizeAsBytes(ShuffleServerConf.DISK_CAPACITY);
double ratio = conf.getDouble(ShuffleServerConf.DISK_CAPACITY_RATIO);
double highWaterMarkOfWrite =
conf.get(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE);
@@ -182,7 +183,7 @@ public class LocalStorageManager extends
SingleStorageManager {
int partitionId = event.getStartPartition();
LocalStorage storage =
- partitionsOfStorage.get(UnionKey.buildKey(appId, shuffleId,
partitionId));
+ sortedPartitionsOfStorageMap.get(UnionKey.buildKey(appId, shuffleId,
partitionId));
if (storage != null) {
if (storage.isCorrupted()) {
if (storage.containsWriteHandler(appId, shuffleId, partitionId)) {
@@ -210,7 +211,7 @@ public class LocalStorageManager extends
SingleStorageManager {
final LocalStorage selectedStorage =
candidates.get(
ShuffleStorageUtils.getStorageIndex(candidates.size(), appId,
shuffleId, partitionId));
- return partitionsOfStorage.compute(
+ return sortedPartitionsOfStorageMap.compute(
UnionKey.buildKey(appId, shuffleId, partitionId),
(key, localStorage) -> {
// If this is the first time to select storage or existing storage
is corrupted,
@@ -231,7 +232,7 @@ public class LocalStorageManager extends
SingleStorageManager {
int shuffleId = event.getShuffleId();
int partitionId = event.getStartPartition();
- return partitionsOfStorage.get(UnionKey.buildKey(appId, shuffleId,
partitionId));
+ return sortedPartitionsOfStorageMap.get(UnionKey.buildKey(appId,
shuffleId, partitionId));
}
@Override
@@ -301,24 +302,39 @@ public class LocalStorageManager extends
SingleStorageManager {
private void cleanupStorageSelectionCache(PurgeEvent event) {
Function<String, Boolean> deleteConditionFunc = null;
+ String prefixKey = null;
if (event instanceof AppPurgeEvent) {
+ prefixKey = UnionKey.buildKey(event.getAppId());
deleteConditionFunc =
partitionUnionKey -> UnionKey.startsWith(partitionUnionKey,
event.getAppId());
} else if (event instanceof ShufflePurgeEvent) {
+ int shuffleId = event.getShuffleIds().get(0);
+ prefixKey = UnionKey.buildKey(event.getAppId(), shuffleId);
deleteConditionFunc =
- partitionUnionKey ->
- UnionKey.startsWith(partitionUnionKey, event.getAppId(),
event.getShuffleIds());
+ partitionUnionKey -> UnionKey.startsWith(partitionUnionKey,
event.getAppId(), shuffleId);
+ }
+ if (prefixKey == null) {
+ throw new RssException("Prefix key is null when handles event: " +
event);
}
long startTime = System.currentTimeMillis();
- deleteElement(partitionsOfStorage, deleteConditionFunc);
+ deleteElement(sortedPartitionsOfStorageMap.tailMap(prefixKey),
deleteConditionFunc);
LOG.info(
"Cleaning the storage selection cache costs: {}(ms) for event: {}",
System.currentTimeMillis() - startTime,
event);
}
- private <K, V> void deleteElement(Map<K, V> map, Function<K, Boolean>
deleteConditionFunc) {
- map.entrySet().removeIf(entry ->
deleteConditionFunc.apply(entry.getKey()));
+ private <K, V> void deleteElement(
+ Map<K, V> sortedPartitionsOfStorageMap, Function<K, Boolean>
deleteConditionFunc) {
+ Iterator<Map.Entry<K, V>> iterator =
sortedPartitionsOfStorageMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<K, V> entry = iterator.next();
+ if (deleteConditionFunc.apply(entry.getKey())) {
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
}
@Override
@@ -379,4 +395,10 @@ public class LocalStorageManager extends
SingleStorageManager {
public List<LocalStorage> getStorages() {
return localStorages;
}
+
+ // Only for test.
+ @VisibleForTesting
+ public Map<String, LocalStorage> getSortedPartitionsOfStorageMap() {
+ return sortedPartitionsOfStorageMap;
+ }
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 82974e305..ec767bc63 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -55,6 +55,7 @@ import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.event.AppPurgeEvent;
+import org.apache.uniffle.server.event.ShufflePurgeEvent;
import org.apache.uniffle.server.storage.HadoopStorageManager;
import org.apache.uniffle.server.storage.HybridStorageManager;
import org.apache.uniffle.server.storage.LocalStorageManager;
@@ -467,30 +468,43 @@ public class ShuffleFlushManagerTest extends
HadoopTestBase {
manager.addToFlushQueue(event1);
ShuffleDataFlushEvent event2 = createShuffleDataFlushEvent(appId2, 1, 0,
1, null);
manager.addToFlushQueue(event2);
+ ShuffleDataFlushEvent event3 = createShuffleDataFlushEvent(appId2, 2, 0,
1, null);
+ manager.addToFlushQueue(event3);
assertEquals(storageManager.selectStorage(event1),
storageManager.selectStorage(event2));
final AbstractStorage storage = (AbstractStorage)
storageManager.selectStorage(event1);
waitForFlush(manager, appId1, 1, 5);
waitForFlush(manager, appId2, 1, 5);
+ waitForFlush(manager, appId2, 2, 5);
assertEquals(5, manager.getCommittedBlockIds(appId1,
1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2,
1).getLongCardinality());
+ assertEquals(5, manager.getCommittedBlockIds(appId2,
2).getLongCardinality());
assertEquals(2, storage.getHandlerSize());
File file = new File(tempDir, appId1);
assertTrue(file.exists());
+ storageManager.removeResources(
+ new ShufflePurgeEvent(appId1, StringUtils.EMPTY,
Lists.newArrayList(1)));
+ ShuffleDataFlushEvent event4 = createShuffleDataFlushEvent(appId1, 1, 0,
1, () -> false);
+ manager.addToFlushQueue(event4);
+ Thread.sleep(1000);
storageManager.removeResources(
new AppPurgeEvent(appId1, StringUtils.EMPTY, Lists.newArrayList(1)));
manager.removeResources(appId1);
assertFalse(file.exists());
- ShuffleDataFlushEvent event3 = createShuffleDataFlushEvent(appId1, 1, 0,
1, () -> false);
- manager.addToFlushQueue(event3);
- Thread.sleep(1000);
assertEquals(0, manager.getCommittedBlockIds(appId1,
1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2,
1).getLongCardinality());
+ assertEquals(5, manager.getCommittedBlockIds(appId2,
2).getLongCardinality());
assertEquals(1, storage.getHandlerSize());
manager.removeResources(appId2);
+ storageManager.removeResources(
+ new ShufflePurgeEvent(appId2, StringUtils.EMPTY,
Lists.newArrayList(1)));
+ storageManager.removeResources(
+ new ShufflePurgeEvent(appId2, StringUtils.EMPTY,
Lists.newArrayList(2)));
storageManager.removeResources(
new AppPurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1)));
assertEquals(0, manager.getCommittedBlockIds(appId2,
1).getLongCardinality());
assertEquals(0, storage.getHandlerSize());
+ assertEquals(
+ 0, ((LocalStorageManager)
storageManager).getSortedPartitionsOfStorageMap().size());
}
private void waitForMetrics(Gauge.Child gauge, double expected, double
delta) throws Exception {