rishisankar commented on a change in pull request #1770:
URL: https://github.com/apache/lucene-solr/pull/1770#discussion_r485329143



##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
##########
@@ -537,40 +541,100 @@ public void registerDocCollectionWatcher(String 
collection, DocCollectionWatcher
       }
     }
 
-    final NamedList<Throwable> exceptions = new NamedList<>();
-    @SuppressWarnings({"rawtypes"})
-    final NamedList<NamedList> shardResponses = new 
NamedList<>(routes.size()+1); // +1 for deleteQuery
+    final NamedList<NamedList<?>> shardResponses = new 
NamedList<>(routes.size()+1); // +1 for deleteQuery
 
     long start = System.nanoTime();
 
+    CompletableFuture<Void> updateFuture;
     if (parallelUpdates) {
-      final Map<String, Future<NamedList<?>>> responseFutures = new 
HashMap<>(routes.size());
-      for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
-        final String url = entry.getKey();
-        final LBSolrClient.Req lbRequest = entry.getValue();
-        try {
-          MDC.put("CloudSolrClient.url", url);
-          responseFutures.put(url, threadPool.submit(() -> {
-            return getLbClient().request(lbRequest).getResponse();
-          }));
-        } finally {
-          MDC.remove("CloudSolrClient.url");
+      updateFuture = doUpdatesWithExecutor(routes, shardResponses, 
isAsyncRequest);
+    } else {
+      updateFuture = doUpdatesWithoutExecutor(routes, shardResponses, 
isAsyncRequest);
+    }
+
+    CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>();
+    if (!isAsyncRequest) {
+      try {
+        updateFuture.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof SolrServerException) {
+          throw (SolrServerException) cause;
+        } else if (cause instanceof RuntimeException) {
+          throw (RuntimeException) cause;
+        } else {
+          throw new SolrServerException(cause);
         }
       }
+      doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, 
apiFuture, start, isAsyncRequest);
+    } else {
+      updateFuture.whenComplete((result, error) -> {
+        if (updateFuture.isCompletedExceptionally()) {
+          apiFuture.completeExceptionally(error);
+        } else {
+          doDeleteQuery(updateRequest, nonRoutableParams, routes, 
shardResponses, apiFuture, start, isAsyncRequest);
+        }
+      });
 
-      for (final Map.Entry<String, Future<NamedList<?>>> entry: 
responseFutures.entrySet()) {
-        final String url = entry.getKey();
-        final Future<NamedList<?>> responseFuture = entry.getValue();
-        try {
-          shardResponses.add(url, responseFuture.get());
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-          exceptions.add(url, e.getCause());
+      apiFuture.exceptionally((error) -> {
+        if (apiFuture.isCancelled()) {
+          updateFuture.cancel(true);
         }
+        return null;
+      });
+    }
+
+    return apiFuture;
+  }
+
+  private CompletableFuture<Void> doUpdatesWithExecutor(final Map<String, ? 
extends LBSolrClient.Req> routes,
+                                                        
NamedList<NamedList<?>> shardResponses,
+                                                        boolean 
isAsyncRequest) {
+    final NamedList<Throwable> exceptions = new NamedList<>();
+    final Map<String, CompletableFuture<NamedList<Object>>> responseFutures = 
new HashMap<>(routes.size());
+    for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
+      final String url = entry.getKey();
+      final LBSolrClient.Req lbRequest = entry.getValue();
+      try {
+        MDC.put("CloudSolrClient.url", url);
+        final CompletableFuture<NamedList<Object>> future = new 
CompletableFuture<>();
+        if (isAsyncRequest) {
+          CompletableFuture<LBSolrClient.Rsp> reqFuture = 
getLbClient().requestAsync(lbRequest);
+          reqFuture.whenComplete((result, error) -> {
+            if (!reqFuture.isCompletedExceptionally()) {
+              future.complete(result.getResponse());
+            } else {
+              future.completeExceptionally(error);
+            }
+          });
+        } else {

Review comment:
       I've added an implementation (in the SolrClient class) for requestAsync 
with an executor (so that the request can be canceled). Currently, it only 
takes an `ExecutorService`, although I can switch `ExecutorService` to 
`Executor` with the drawback of the async request not being cancellable - 
curious which you think would be better?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to