advancedxy commented on code in PR #424:
URL: https://github.com/apache/incubator-uniffle/pull/424#discussion_r1049510529


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java:
##########
@@ -571,10 +574,11 @@ public void 
getLocalShuffleIndex(GetLocalShuffleIndexRequest request,
     String requestInfo = "appId[" + appId + "], shuffleId[" + shuffleId + "], 
partitionId["
         + partitionId + "]";
 
-    shuffleServer.getStorageManager()
-        .selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId))
-        .updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
-
+    Storage storage = shuffleServer.getStorageManager()
+        .selectStorage(new ShuffleDataReadEvent(appId, shuffleId, 
partitionId));
+    if (storage != null) {
+      storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));

Review Comment:
   Not related to this PR. But it occurred to me that it's not necessary to 
create `new StorageReadMetrics` every local shuffle read. It may put a bit more 
gc pressure here.
   
   Could you create a new issue to address this problem?



##########
server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java:
##########
@@ -139,32 +140,60 @@ public class LocalStorageManager extends 
SingleStorageManager {
 
   @Override
   public Storage selectStorage(ShuffleDataFlushEvent event) {
-    LocalStorage storage = 
localStorages.get(ShuffleStorageUtils.getStorageIndex(
-        localStorages.size(),
-        event.getAppId(),
-        event.getShuffleId(),
-        event.getStartPartition()));
-    if (storage.containsWriteHandler(event.getAppId(), event.getShuffleId(), 
event.getStartPartition())
-        && storage.isCorrupted()) {
-      LOG.error("storage " + storage.getBasePath() + " is corrupted");
-    }
-    if (storage.isCorrupted()) {
-      storage = getRepairedStorage(event.getAppId(), event.getShuffleId(), 
event.getStartPartition());
+    String appId = event.getAppId();
+    int shuffleId = event.getShuffleId();
+    int partitionId = event.getStartPartition();
+
+    try {
+      LocalStorage storage = 
partitionsOfStorage.get(appId).get(shuffleId).get(partitionId);
+      if (storage.isCorrupted()) {
+        if (storage.containsWriteHandler(appId, shuffleId, partitionId)) {
+          throw new RuntimeException("LocalStorage: " + storage.getBasePath() 
+ " is corrupted.");
+        } else {
+          // clear the last selection due to non written.
+          partitionsOfStorage.get(appId).get(shuffleId).remove(partitionId);
+        }
+      } else {
+        return storage;
+      }
+    } catch (NullPointerException npe) {
+      // Ignore
     }
+
+    // Firstly getting the storage based on its (appId, shuffleId, 
partitionId) hash value
+    List<LocalStorage> candidates = localStorages
+        .stream()
+        .filter(x -> x.canWrite() && !x.isCorrupted())
+        .collect(Collectors.toList());
+    LocalStorage storage = candidates.get(
+        ShuffleStorageUtils.getStorageIndex(
+            candidates.size(),
+            appId,
+            shuffleId,
+            partitionId
+        )
+    );
     event.setUnderStorage(storage);
+
+    // store it to cache.
+    partitionsOfStorage.computeIfAbsent(appId, key -> Maps.newConcurrentMap());
+    partitionsOfStorage.get(appId).computeIfAbsent(shuffleId, key -> 
Maps.newConcurrentMap());
+    partitionsOfStorage.get(appId).get(shuffleId).put(partitionId, storage);
     return storage;
   }
 
   @Override
   public Storage selectStorage(ShuffleDataReadEvent event) {
+    String appId = event.getAppId();
+    int shuffleId = event.getShuffleId();
+    int partitionId = event.getStartPartition();
 
-    LocalStorage storage = 
localStorages.get(ShuffleStorageUtils.getStorageIndex(
-        localStorages.size(),
-        event.getAppId(),
-        event.getShuffleId(),
-        event.getStartPartition()));
-    if (storage.isCorrupted()) {
-      storage = getRepairedStorage(event.getAppId(), event.getShuffleId(), 
event.getStartPartition());
+    LocalStorage storage = null;
+    try {
+      storage = partitionsOfStorage.get(appId).get(shuffleId).get(partitionId);

Review Comment:
   I believe this logic is not right.
   
   Think about this case:
   ```
   # App:1, shuffle: 2, partition: 0. 
   WriteEvent(1,2,0) -> disk1
   WriteEvent(1,2,0) -> disk2(moved to disk2 as disk1 reaches its high 
watermark)
   
   --
   ReadEvent(1,2,0) -> disk2. The data wrote to disk1 is lost as it's not 
selected.
   ```



##########
server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java:
##########
@@ -69,8 +69,9 @@ public class LocalStorageManager extends SingleStorageManager 
{
   private final List<LocalStorage> localStorages;
   private final List<String> storageBasePaths;
   private final LocalStorageChecker checker;
-  private List<LocalStorage> unCorruptedStorages = Lists.newArrayList();
-  private final Set<String> corruptedStorages = Sets.newConcurrentHashSet();
+
+  // AppId -> ShuffleId -> PartitionId -> LocalStorage
+  private final Map<String, Map<Integer, Map<Integer, LocalStorage>>> 
partitionsOfStorage = Maps.newConcurrentMap();

Review Comment:
   I'm not sure and always concerned how much memory pressure it would put on 
the server. 
   I saw another place have similar structure, such as the following code in 
ShuffleFlushManager
   ```
     // appId -> shuffleId -> partitionId -> handlers
     private Map<String, Map<Integer, RangeMap<Integer, ShuffleWriteHandler>>> 
handlers = Maps.newConcurrentMap();
   ```
   PartitionIds could be millions for large apps. Are there any other ways to 
reconstruct this info?



##########
server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java:
##########
@@ -69,8 +69,9 @@ public class LocalStorageManager extends SingleStorageManager 
{
   private final List<LocalStorage> localStorages;
   private final List<String> storageBasePaths;
   private final LocalStorageChecker checker;
-  private List<LocalStorage> unCorruptedStorages = Lists.newArrayList();
-  private final Set<String> corruptedStorages = Sets.newConcurrentHashSet();
+
+  // AppId -> ShuffleId -> PartitionId -> LocalStorage
+  private final Map<String, Map<Integer, Map<Integer, LocalStorage>>> 
partitionsOfStorage = Maps.newConcurrentMap();

Review Comment:
   I did a quick look at how Spark handles local map file distribution..
   I believe  the current strategy is similar with spark's. If high watermark 
is reached, maybe we should start uploading local files before reaching high 
watermark?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to