advancedxy commented on code in PR #424:
URL: https://github.com/apache/incubator-uniffle/pull/424#discussion_r1049510529
##########
server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java:
##########
@@ -571,10 +574,11 @@ public void
getLocalShuffleIndex(GetLocalShuffleIndexRequest request,
String requestInfo = "appId[" + appId + "], shuffleId[" + shuffleId + "],
partitionId["
+ partitionId + "]";
- shuffleServer.getStorageManager()
- .selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId))
- .updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
-
+ Storage storage = shuffleServer.getStorageManager()
+ .selectStorage(new ShuffleDataReadEvent(appId, shuffleId,
partitionId));
+ if (storage != null) {
+ storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
Review Comment:
Not related to this PR. But it occurred to me that it's not necessary to
create `new StorageReadMetrics` every local shuffle read. It may put a bit more
gc pressure here.
Could you create a new issue to address this problem?
##########
server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java:
##########
@@ -139,32 +140,60 @@ public class LocalStorageManager extends
SingleStorageManager {
@Override
public Storage selectStorage(ShuffleDataFlushEvent event) {
- LocalStorage storage =
localStorages.get(ShuffleStorageUtils.getStorageIndex(
- localStorages.size(),
- event.getAppId(),
- event.getShuffleId(),
- event.getStartPartition()));
- if (storage.containsWriteHandler(event.getAppId(), event.getShuffleId(),
event.getStartPartition())
- && storage.isCorrupted()) {
- LOG.error("storage " + storage.getBasePath() + " is corrupted");
- }
- if (storage.isCorrupted()) {
- storage = getRepairedStorage(event.getAppId(), event.getShuffleId(),
event.getStartPartition());
+ String appId = event.getAppId();
+ int shuffleId = event.getShuffleId();
+ int partitionId = event.getStartPartition();
+
+ try {
+ LocalStorage storage =
partitionsOfStorage.get(appId).get(shuffleId).get(partitionId);
+ if (storage.isCorrupted()) {
+ if (storage.containsWriteHandler(appId, shuffleId, partitionId)) {
+ throw new RuntimeException("LocalStorage: " + storage.getBasePath()
+ " is corrupted.");
+ } else {
+ // clear the last selection due to non written.
+ partitionsOfStorage.get(appId).get(shuffleId).remove(partitionId);
+ }
+ } else {
+ return storage;
+ }
+ } catch (NullPointerException npe) {
+ // Ignore
}
+
+ // Firstly getting the storage based on its (appId, shuffleId,
partitionId) hash value
+ List<LocalStorage> candidates = localStorages
+ .stream()
+ .filter(x -> x.canWrite() && !x.isCorrupted())
+ .collect(Collectors.toList());
+ LocalStorage storage = candidates.get(
+ ShuffleStorageUtils.getStorageIndex(
+ candidates.size(),
+ appId,
+ shuffleId,
+ partitionId
+ )
+ );
event.setUnderStorage(storage);
+
+ // store it to cache.
+ partitionsOfStorage.computeIfAbsent(appId, key -> Maps.newConcurrentMap());
+ partitionsOfStorage.get(appId).computeIfAbsent(shuffleId, key ->
Maps.newConcurrentMap());
+ partitionsOfStorage.get(appId).get(shuffleId).put(partitionId, storage);
return storage;
}
@Override
public Storage selectStorage(ShuffleDataReadEvent event) {
+ String appId = event.getAppId();
+ int shuffleId = event.getShuffleId();
+ int partitionId = event.getStartPartition();
- LocalStorage storage =
localStorages.get(ShuffleStorageUtils.getStorageIndex(
- localStorages.size(),
- event.getAppId(),
- event.getShuffleId(),
- event.getStartPartition()));
- if (storage.isCorrupted()) {
- storage = getRepairedStorage(event.getAppId(), event.getShuffleId(),
event.getStartPartition());
+ LocalStorage storage = null;
+ try {
+ storage = partitionsOfStorage.get(appId).get(shuffleId).get(partitionId);
Review Comment:
I believe this logic is not right.
Think about this case:
```
# App:1, shuffle: 2, partition: 0.
WriteEvent(1,2,0) -> disk1
WriteEvent(1,2,0) -> disk2(moved to disk2 as disk1 reaches its high
watermark)
--
ReadEvent(1,2,0) -> disk2. The data wrote to disk1 is lost as it's not
selected.
```
##########
server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java:
##########
@@ -69,8 +69,9 @@ public class LocalStorageManager extends SingleStorageManager
{
private final List<LocalStorage> localStorages;
private final List<String> storageBasePaths;
private final LocalStorageChecker checker;
- private List<LocalStorage> unCorruptedStorages = Lists.newArrayList();
- private final Set<String> corruptedStorages = Sets.newConcurrentHashSet();
+
+ // AppId -> ShuffleId -> PartitionId -> LocalStorage
+ private final Map<String, Map<Integer, Map<Integer, LocalStorage>>>
partitionsOfStorage = Maps.newConcurrentMap();
Review Comment:
I'm not sure and always concerned how much memory pressure it would put on
the server.
I saw another place have similar structure, such as the following code in
ShuffleFlushManager
```
// appId -> shuffleId -> partitionId -> handlers
private Map<String, Map<Integer, RangeMap<Integer, ShuffleWriteHandler>>>
handlers = Maps.newConcurrentMap();
```
PartitionIds could be millions for large apps. Are there any other ways to
reconstruct this info?
##########
server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java:
##########
@@ -69,8 +69,9 @@ public class LocalStorageManager extends SingleStorageManager
{
private final List<LocalStorage> localStorages;
private final List<String> storageBasePaths;
private final LocalStorageChecker checker;
- private List<LocalStorage> unCorruptedStorages = Lists.newArrayList();
- private final Set<String> corruptedStorages = Sets.newConcurrentHashSet();
+
+ // AppId -> ShuffleId -> PartitionId -> LocalStorage
+ private final Map<String, Map<Integer, Map<Integer, LocalStorage>>>
partitionsOfStorage = Maps.newConcurrentMap();
Review Comment:
I did a quick look at how Spark handles local map file distribution..
I believe the current strategy is similar with spark's. If high watermark
is reached, maybe we should start uploading local files before reaching high
watermark?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]