jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922687334


##########
client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -81,6 +81,7 @@ public class RssShuffleManager implements ShuffleManager {
   private final int dataReplicaRead;
   private final boolean dataReplicaSkipEnabled;
   private final int dataTransferPoolSize;
+  private final int commitSenderPoolSize;

Review Comment:
   ditto.



##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java:
##########
@@ -91,11 +91,13 @@ public static ShuffleWriteClient 
createShuffleClient(JobConf jobConf) {
         RssMRConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
     int dataTransferPoolSize = 
jobConf.getInt(RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE,
         RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+    int commitSenderPoolSize = 
jobConf.getInt(RssMRConfig.RSS_DATA_COMMIT_POOL_SIZE,

Review Comment:
   Could we modify this variable name?



##########
client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java:
##########
@@ -36,9 +36,10 @@ public static ShuffleClientFactory getInstance() {
 
   public ShuffleWriteClient createShuffleWriteClient(
       String clientType, int retryMax, long retryIntervalMax, int 
heartBeatThreadNum,
-      int replica, int replicaWrite, int replicaRead, boolean 
replicaSkipEnabled, int dataTransferPoolSize) {
+      int replica, int replicaWrite, int replicaRead, boolean 
replicaSkipEnabled, int dataTransferPoolSize,
+      int commitSenderPoolSize) {
     return new ShuffleWriteClientImpl(clientType, retryMax, retryIntervalMax, 
heartBeatThreadNum,
-      replica, replicaWrite, replicaRead, replicaSkipEnabled, 
dataTransferPoolSize);
+      replica, replicaWrite, replicaRead, replicaSkipEnabled, 
dataTransferPoolSize, commitSenderPoolSize);

Review Comment:
   ditto.



##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -247,43 +257,58 @@ public SendShuffleDataResult sendShuffleData(String 
appId, List<ShuffleBlockInfo
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
+  /**
+   * This method will wait until all shuffle data have been flushed
+   * to durable storage in assigned shuffle servers.
+   * @param shuffleServerInfoSet
+   * @param appId
+   * @param shuffleId
+   * @param numMaps
+   * @return
+   */
   @Override
   public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, 
String appId, int shuffleId, int numMaps) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(
+        commitSenderPoolSize == -1 ? shuffleServerInfoSet.size() : 
commitSenderPoolSize
+    );
     AtomicInteger successfulCommit = new AtomicInteger(0);
-    shuffleServerInfoSet.stream().forEach(ssi -> {
-      RssSendCommitRequest request = new RssSendCommitRequest(appId, 
shuffleId);
-      String errorMsg = "Failed to commit shuffle data to " + ssi + " for 
shuffleId[" + shuffleId + "]";
-      long startTime = System.currentTimeMillis();
-      try {
-        RssSendCommitResponse response = 
getShuffleServerClient(ssi).sendCommit(request);
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          int commitCount = response.getCommitCount();
-          LOG.info("Successfully sendCommit for appId[" + appId + "], 
shuffleId[" + shuffleId
-              + "] to ShuffleServer[" + ssi.getId() + "], cost "
-              + (System.currentTimeMillis() - startTime) + " ms, got committed 
maps["
-              + commitCount + "], map number of stage is " + numMaps);
-          if (commitCount >= numMaps) {
-            RssFinishShuffleResponse rfsResponse =
-                getShuffleServerClient(ssi).finishShuffle(new 
RssFinishShuffleRequest(appId, shuffleId));
-            if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
-              String msg = "Failed to finish shuffle to " + ssi + " for 
shuffleId[" + shuffleId
-                  + "] with statusCode " + rfsResponse.getStatusCode();
-              LOG.error(msg);
-              throw new Exception(msg);
-            } else {
-              LOG.info("Successfully finish shuffle to " + ssi + " for 
shuffleId[" + shuffleId + "]");
+    forkJoinPool.submit(() -> {
+      shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+        RssSendCommitRequest request = new RssSendCommitRequest(appId, 
shuffleId);
+        String errorMsg = "Failed to commit shuffle data to " + ssi + " for 
shuffleId[" + shuffleId + "]";
+        long startTime = System.currentTimeMillis();
+        try {
+          RssSendCommitResponse response = 
getShuffleServerClient(ssi).sendCommit(request);
+          if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+            int commitCount = response.getCommitCount();
+            LOG.info("Successfully sendCommit for appId[" + appId + "], 
shuffleId[" + shuffleId
+                + "] to ShuffleServer[" + ssi.getId() + "], cost "
+                + (System.currentTimeMillis() - startTime) + " ms, got 
committed maps["
+                + commitCount + "], map number of stage is " + numMaps);
+            if (commitCount >= numMaps) {
+              RssFinishShuffleResponse rfsResponse =
+                  getShuffleServerClient(ssi).finishShuffle(new 
RssFinishShuffleRequest(appId, shuffleId));
+              if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                String msg = "Failed to finish shuffle to " + ssi + " for 
shuffleId[" + shuffleId
+                    + "] with statusCode " + rfsResponse.getStatusCode();
+                LOG.error(msg);
+                throw new Exception(msg);
+              } else {
+                LOG.info("Successfully finish shuffle to " + ssi + " for 
shuffleId[" + shuffleId + "]");
+              }
             }
+          } else {
+            String msg = errorMsg + " with statusCode " + 
response.getStatusCode();
+            LOG.error(msg);
+            throw new Exception(msg);
           }
-        } else {
-          String msg = errorMsg + " with statusCode " + 
response.getStatusCode();
-          LOG.error(msg);
-          throw new Exception(msg);
+          successfulCommit.incrementAndGet();
+        } catch (Exception e) {
+          LOG.error(errorMsg, e);
         }
-        successfulCommit.incrementAndGet();
-      } catch (Exception e) {
-        LOG.error(errorMsg, e);
-      }
-    });
+      });
+    }).join();
+    forkJoinPool.shutdownNow();

Review Comment:
   We add method `shutdownNow`. Maybe we'd better do performance tests again.



##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -88,11 +88,20 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
   private int replicaRead;
   private boolean replicaSkipEnabled;
   private int dataTranferPoolSize;
+  private int commitSenderPoolSize = -1;
   private final ForkJoinPool dataTransferPool;
 
-  public ShuffleWriteClientImpl(String clientType, int retryMax, long 
retryIntervalMax, int heartBeatThreadNum,
-                                int replica, int replicaWrite, int 
replicaRead, boolean replicaSkipEnabled,
-                                int dataTranferPoolSize) {
+  public ShuffleWriteClientImpl(
+      String clientType,
+      int retryMax,
+      long retryIntervalMax,
+      int heartBeatThreadNum,
+      int replica,
+      int replicaWrite,
+      int replicaRead,
+      boolean replicaSkipEnabled,
+      int dataTranferPoolSize,
+      int commitSenderPoolSize) {

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to