jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922690489


##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -247,43 +257,58 @@ public SendShuffleDataResult sendShuffleData(String 
appId, List<ShuffleBlockInfo
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
+  /**
+   * This method will wait until all shuffle data have been flushed
+   * to durable storage in assigned shuffle servers.
+   * @param shuffleServerInfoSet
+   * @param appId
+   * @param shuffleId
+   * @param numMaps
+   * @return
+   */
   @Override
   public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, 
String appId, int shuffleId, int numMaps) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(
+        commitSenderPoolSize == -1 ? shuffleServerInfoSet.size() : 
commitSenderPoolSize
+    );
     AtomicInteger successfulCommit = new AtomicInteger(0);
-    shuffleServerInfoSet.stream().forEach(ssi -> {
-      RssSendCommitRequest request = new RssSendCommitRequest(appId, 
shuffleId);
-      String errorMsg = "Failed to commit shuffle data to " + ssi + " for 
shuffleId[" + shuffleId + "]";
-      long startTime = System.currentTimeMillis();
-      try {
-        RssSendCommitResponse response = 
getShuffleServerClient(ssi).sendCommit(request);
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          int commitCount = response.getCommitCount();
-          LOG.info("Successfully sendCommit for appId[" + appId + "], 
shuffleId[" + shuffleId
-              + "] to ShuffleServer[" + ssi.getId() + "], cost "
-              + (System.currentTimeMillis() - startTime) + " ms, got committed 
maps["
-              + commitCount + "], map number of stage is " + numMaps);
-          if (commitCount >= numMaps) {
-            RssFinishShuffleResponse rfsResponse =
-                getShuffleServerClient(ssi).finishShuffle(new 
RssFinishShuffleRequest(appId, shuffleId));
-            if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
-              String msg = "Failed to finish shuffle to " + ssi + " for 
shuffleId[" + shuffleId
-                  + "] with statusCode " + rfsResponse.getStatusCode();
-              LOG.error(msg);
-              throw new Exception(msg);
-            } else {
-              LOG.info("Successfully finish shuffle to " + ssi + " for 
shuffleId[" + shuffleId + "]");
+    forkJoinPool.submit(() -> {
+      shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+        RssSendCommitRequest request = new RssSendCommitRequest(appId, 
shuffleId);
+        String errorMsg = "Failed to commit shuffle data to " + ssi + " for 
shuffleId[" + shuffleId + "]";
+        long startTime = System.currentTimeMillis();
+        try {
+          RssSendCommitResponse response = 
getShuffleServerClient(ssi).sendCommit(request);
+          if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+            int commitCount = response.getCommitCount();
+            LOG.info("Successfully sendCommit for appId[" + appId + "], 
shuffleId[" + shuffleId
+                + "] to ShuffleServer[" + ssi.getId() + "], cost "
+                + (System.currentTimeMillis() - startTime) + " ms, got 
committed maps["
+                + commitCount + "], map number of stage is " + numMaps);
+            if (commitCount >= numMaps) {
+              RssFinishShuffleResponse rfsResponse =
+                  getShuffleServerClient(ssi).finishShuffle(new 
RssFinishShuffleRequest(appId, shuffleId));
+              if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
+                String msg = "Failed to finish shuffle to " + ssi + " for 
shuffleId[" + shuffleId
+                    + "] with statusCode " + rfsResponse.getStatusCode();
+                LOG.error(msg);
+                throw new Exception(msg);
+              } else {
+                LOG.info("Successfully finish shuffle to " + ssi + " for 
shuffleId[" + shuffleId + "]");
+              }
             }
+          } else {
+            String msg = errorMsg + " with statusCode " + 
response.getStatusCode();
+            LOG.error(msg);
+            throw new Exception(msg);
           }
-        } else {
-          String msg = errorMsg + " with statusCode " + 
response.getStatusCode();
-          LOG.error(msg);
-          throw new Exception(msg);
+          successfulCommit.incrementAndGet();
+        } catch (Exception e) {
+          LOG.error(errorMsg, e);
         }
-        successfulCommit.incrementAndGet();
-      } catch (Exception e) {
-        LOG.error(errorMsg, e);
-      }
-    });
+      });
+    }).join();
+    forkJoinPool.shutdownNow();

Review Comment:
   > It looks ShutdownNow or Shutdown are all OK due to the operation of `join`
   
   It's ok for me to use method `shutdownNow()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to