jerqi commented on code in PR #59:
URL: https://github.com/apache/incubator-uniffle/pull/59#discussion_r922690680
##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -247,43 +257,58 @@ public SendShuffleDataResult sendShuffleData(String
appId, List<ShuffleBlockInfo
return new SendShuffleDataResult(successBlockIds, failedBlockIds);
}
+ /**
+ * This method will wait until all shuffle data have been flushed
+ * to durable storage in assigned shuffle servers.
+ * @param shuffleServerInfoSet
+ * @param appId
+ * @param shuffleId
+ * @param numMaps
+ * @return
+ */
@Override
public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet,
String appId, int shuffleId, int numMaps) {
+ ForkJoinPool forkJoinPool = new ForkJoinPool(
+ commitSenderPoolSize == -1 ? shuffleServerInfoSet.size() :
commitSenderPoolSize
+ );
AtomicInteger successfulCommit = new AtomicInteger(0);
- shuffleServerInfoSet.stream().forEach(ssi -> {
- RssSendCommitRequest request = new RssSendCommitRequest(appId,
shuffleId);
- String errorMsg = "Failed to commit shuffle data to " + ssi + " for
shuffleId[" + shuffleId + "]";
- long startTime = System.currentTimeMillis();
- try {
- RssSendCommitResponse response =
getShuffleServerClient(ssi).sendCommit(request);
- if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
- int commitCount = response.getCommitCount();
- LOG.info("Successfully sendCommit for appId[" + appId + "],
shuffleId[" + shuffleId
- + "] to ShuffleServer[" + ssi.getId() + "], cost "
- + (System.currentTimeMillis() - startTime) + " ms, got committed
maps["
- + commitCount + "], map number of stage is " + numMaps);
- if (commitCount >= numMaps) {
- RssFinishShuffleResponse rfsResponse =
- getShuffleServerClient(ssi).finishShuffle(new
RssFinishShuffleRequest(appId, shuffleId));
- if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
- String msg = "Failed to finish shuffle to " + ssi + " for
shuffleId[" + shuffleId
- + "] with statusCode " + rfsResponse.getStatusCode();
- LOG.error(msg);
- throw new Exception(msg);
- } else {
- LOG.info("Successfully finish shuffle to " + ssi + " for
shuffleId[" + shuffleId + "]");
+ forkJoinPool.submit(() -> {
+ shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+ RssSendCommitRequest request = new RssSendCommitRequest(appId,
shuffleId);
+ String errorMsg = "Failed to commit shuffle data to " + ssi + " for
shuffleId[" + shuffleId + "]";
+ long startTime = System.currentTimeMillis();
+ try {
+ RssSendCommitResponse response =
getShuffleServerClient(ssi).sendCommit(request);
+ if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+ int commitCount = response.getCommitCount();
+ LOG.info("Successfully sendCommit for appId[" + appId + "],
shuffleId[" + shuffleId
+ + "] to ShuffleServer[" + ssi.getId() + "], cost "
+ + (System.currentTimeMillis() - startTime) + " ms, got
committed maps["
+ + commitCount + "], map number of stage is " + numMaps);
+ if (commitCount >= numMaps) {
+ RssFinishShuffleResponse rfsResponse =
+ getShuffleServerClient(ssi).finishShuffle(new
RssFinishShuffleRequest(appId, shuffleId));
+ if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
+ String msg = "Failed to finish shuffle to " + ssi + " for
shuffleId[" + shuffleId
+ + "] with statusCode " + rfsResponse.getStatusCode();
+ LOG.error(msg);
+ throw new Exception(msg);
+ } else {
+ LOG.info("Successfully finish shuffle to " + ssi + " for
shuffleId[" + shuffleId + "]");
+ }
}
+ } else {
+ String msg = errorMsg + " with statusCode " +
response.getStatusCode();
+ LOG.error(msg);
+ throw new Exception(msg);
}
- } else {
- String msg = errorMsg + " with statusCode " +
response.getStatusCode();
- LOG.error(msg);
- throw new Exception(msg);
+ successfulCommit.incrementAndGet();
+ } catch (Exception e) {
+ LOG.error(errorMsg, e);
}
- successfulCommit.incrementAndGet();
- } catch (Exception e) {
- LOG.error(errorMsg, e);
- }
- });
+ });
+ }).join();
+ forkJoinPool.shutdownNow();
Review Comment:
I just worry that we can't shutdown forkjoinpool because of exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]