rishisankar commented on a change in pull request #1770: URL: https://github.com/apache/lucene-solr/pull/1770#discussion_r485329143
########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java ########## @@ -537,40 +541,100 @@ public void registerDocCollectionWatcher(String collection, DocCollectionWatcher } } - final NamedList<Throwable> exceptions = new NamedList<>(); - @SuppressWarnings({"rawtypes"}) - final NamedList<NamedList> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery + final NamedList<NamedList<?>> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery long start = System.nanoTime(); + CompletableFuture<Void> updateFuture; if (parallelUpdates) { - final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size()); - for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) { - final String url = entry.getKey(); - final LBSolrClient.Req lbRequest = entry.getValue(); - try { - MDC.put("CloudSolrClient.url", url); - responseFutures.put(url, threadPool.submit(() -> { - return getLbClient().request(lbRequest).getResponse(); - })); - } finally { - MDC.remove("CloudSolrClient.url"); + updateFuture = doUpdatesWithExecutor(routes, shardResponses, isAsyncRequest); + } else { + updateFuture = doUpdatesWithoutExecutor(routes, shardResponses, isAsyncRequest); + } + + CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>(); + if (!isAsyncRequest) { + try { + updateFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof SolrServerException) { + throw (SolrServerException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new SolrServerException(cause); } } + doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, apiFuture, start, isAsyncRequest); + } else { + updateFuture.whenComplete((result, error) -> { + if (updateFuture.isCompletedExceptionally()) { + apiFuture.completeExceptionally(error); + } else { + doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, apiFuture, start, isAsyncRequest); + } + }); - for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) { - final String url = entry.getKey(); - final Future<NamedList<?>> responseFuture = entry.getValue(); - try { - shardResponses.add(url, responseFuture.get()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - exceptions.add(url, e.getCause()); + apiFuture.exceptionally((error) -> { + if (apiFuture.isCancelled()) { + updateFuture.cancel(true); } + return null; + }); + } + + return apiFuture; + } + + private CompletableFuture<Void> doUpdatesWithExecutor(final Map<String, ? extends LBSolrClient.Req> routes, + NamedList<NamedList<?>> shardResponses, + boolean isAsyncRequest) { + final NamedList<Throwable> exceptions = new NamedList<>(); + final Map<String, CompletableFuture<NamedList<Object>>> responseFutures = new HashMap<>(routes.size()); + for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) { + final String url = entry.getKey(); + final LBSolrClient.Req lbRequest = entry.getValue(); + try { + MDC.put("CloudSolrClient.url", url); + final CompletableFuture<NamedList<Object>> future = new CompletableFuture<>(); + if (isAsyncRequest) { + CompletableFuture<LBSolrClient.Rsp> reqFuture = getLbClient().requestAsync(lbRequest); + reqFuture.whenComplete((result, error) -> { + if (!reqFuture.isCompletedExceptionally()) { + future.complete(result.getResponse()); + } else { + future.completeExceptionally(error); + } + }); + } else { Review comment: I've added an implementation (in the SolrClient class) for requestAsync with an executor (so that the request can be canceled). Currently, it only takes an `ExecutorService`, although I can switch `ExecutorService` to `Executor` with the drawback of the async request not being cancellable - curious which you think would be better? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org