jerqi commented on code in PR #249:
URL: https://github.com/apache/incubator-uniffle/pull/249#discussion_r991774740
##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -591,6 +593,45 @@ public void close() {
dataTransferPool.shutdownNow();
}
+ @Override
+ public void unregisterShuffle(String appId, int shuffleId) {
+ RssUnregisterShuffleRequest request = new
RssUnregisterShuffleRequest(appId, shuffleId);
+ List<Callable<Void>> callableList = Lists.newArrayList();
+
+ shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+ callableList.add(() -> {
+ try {
+ ShuffleServerClient client =
+
ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType,
shuffleServerInfo);
+ RssUnregisterShuffleResponse response =
client.unregisterShuffle(request);
+ if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+ LOG.warn("Failed to unregister shuffle to " +
shuffleServerInfo);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error happened when unregistering to " +
shuffleServerInfo, e);
+ }
+ return null;
+ });
+ }
+ );
+
+ try {
+ ExecutorService executorService =
+ Executors.newFixedThreadPool(
+ Math.min(10, shuffleServerInfoSet.size()),
+ ThreadUtils.getThreadFactory("unregister-shuffle-%d")
+ );
+ List<Future<Void>> futures = executorService.invokeAll(callableList, 10,
TimeUnit.SECONDS);
Review Comment:
Sometimes the timeout could be enough, we should give a default value
according to our experience. Usually we can't image all the situations that
users use.
##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -591,6 +593,45 @@ public void close() {
dataTransferPool.shutdownNow();
}
+ @Override
+ public void unregisterShuffle(String appId, int shuffleId) {
+ RssUnregisterShuffleRequest request = new
RssUnregisterShuffleRequest(appId, shuffleId);
+ List<Callable<Void>> callableList = Lists.newArrayList();
+
+ shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+ callableList.add(() -> {
+ try {
+ ShuffleServerClient client =
+
ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType,
shuffleServerInfo);
+ RssUnregisterShuffleResponse response =
client.unregisterShuffle(request);
+ if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+ LOG.warn("Failed to unregister shuffle to " +
shuffleServerInfo);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error happened when unregistering to " +
shuffleServerInfo, e);
+ }
+ return null;
+ });
+ }
+ );
+
+ try {
+ ExecutorService executorService =
+ Executors.newFixedThreadPool(
+ Math.min(10, shuffleServerInfoSet.size()),
+ ThreadUtils.getThreadFactory("unregister-shuffle-%d")
+ );
+ List<Future<Void>> futures = executorService.invokeAll(callableList, 10,
TimeUnit.SECONDS);
Review Comment:
Sometimes the timeout may not be enough, we should give a default value
according to our experience. Usually we can't image all the situations that
users use.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]