advancedxy commented on code in PR #1138:
URL: 
https://github.com/apache/incubator-uniffle/pull/1138#discussion_r1360032670


##########
client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java:
##########
@@ -397,7 +399,7 @@ public SendShuffleDataResult sendShuffleData(
         for (ShuffleBlockInfo blockInfo : shuffleBlockInfoList) {
           successBlockIds.add(blockInfo.getBlockId());
         }
-        return new SendShuffleDataResult(successBlockIds, Sets.newHashSet());
+        return new SendShuffleDataResult(successBlockIds, Sets.newHashSet(), 
Maps.newHashMap());

Review Comment:
   ditto.



##########
client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java:
##########
@@ -382,7 +383,8 @@ public SendShuffleDataResult sendShuffleData(
       if (mode == 0) {
         throw new RssException("send data failed");
       } else if (mode == 1) {
-        return new SendShuffleDataResult(Sets.newHashSet(2L), 
Sets.newHashSet(1L));
+        return new SendShuffleDataResult(
+            Sets.newHashSet(2L), Sets.newHashSet(1L), Maps.newHashMap());

Review Comment:
   I don't think this line is needed? Since the constructor of 
`SentShuffleDataResult` is kept for compatibility.



##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -986,4 +992,18 @@ private Roaring64NavigableMap getShuffleResultForMultiPart(
           e, sparkConf, appId, shuffleId, stageAttemptId, failedPartitions);
     }
   }
+
+  /**
+   * The ShuffleServer list of block sending failures is returned using the 
shuffle task ID
+   *
+   * @param taskId Shuffle taskId
+   * @return List of failed ShuffleServer blocks
+   */
+  public Map<Long, List<ShuffleServerInfo>> 
getFailedBlockIdsWithShuffleServer(String taskId) {

Review Comment:
   when is this method called?
   
   And it seems a bit unusual to return an empty map without putting it back to 
the `taskToFailedBlockIdsAndServer`.



##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -366,35 +385,18 @@ public SendShuffleDataResult sendShuffleData(
             false);
       }
     }
-
-    // maintain the count of blocks that have been sent to the server
-    // unnecessary to use concurrent hashmap here unless you need to insert or 
delete entries in
-    // other threads
-    // AtomicInteger is enough to reflect value changes in other threads
-    Map<Long, AtomicInteger> blockIdsTracker = Maps.newHashMap();
-    primaryServerToBlockIds
-        .values()
-        .forEach(
-            blockList ->
-                blockList.forEach(block -> blockIdsTracker.put(block, new 
AtomicInteger(0))));
-    secondaryServerToBlockIds
-        .values()
-        .forEach(
-            blockList ->
-                blockList.forEach(block -> blockIdsTracker.put(block, new 
AtomicInteger(0))));
-
-    Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
-    Set<Long> successBlockIds = Sets.newConcurrentHashSet();
-    // if send block failed, the task will fail
-    // todo: better to have fallback solution when send to multiple servers
+    /** Records the ShuffleServer that successfully or failed to send blocks */
+    Map<Long, List<ShuffleServerInfo>> blockIdSendSuccessTracker = 
JavaUtils.newConcurrentMap();
+    Map<Long, List<ShuffleServerInfo>> blockIdsSendFailTracker = 
JavaUtils.newConcurrentMap();

Review Comment:
   Reminder: let's review the memory usage for this change after all the code 
has been finished.
   
   I'm a bit of worried about the memory overhead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to