madrob commented on a change in pull request #1470: URL: https://github.com/apache/lucene-solr/pull/1470#discussion_r418163481
########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java ########## @@ -136,6 +137,91 @@ public boolean equals(Object obj) { } } + protected static class ServerIterator { Review comment: Why is this in LBSolrClient instead of LBHttp2SolrClient? ########## File path: solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java ########## @@ -64,18 +62,23 @@ * by the RealtimeGet handler, since other types of replicas shouldn't respond to RTG requests */ public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime"; + private static final ShardResponse END_QUEUE = new ShardResponse(); private HttpShardHandlerFactory httpShardHandlerFactory; - private CompletionService<ShardResponse> completionService; - private Set<Future<ShardResponse>> pending; + private LinkedList<Cancellable> requests; Review comment: How many requests are we planning on handling? LinkedList is not great for performance due to node allocation overhead. I also don't see us removing requests anywhere? ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java ########## @@ -359,65 +366,95 @@ public void send(OutStream outStream, SolrRequest req, String collection) throws outStream.flush(); } - public NamedList<Object> request(SolrRequest solrRequest, - String collection, - OnComplete onComplete) throws IOException, SolrServerException { - Request req = makeRequest(solrRequest, collection); + private static final Exception CANCELLED_EXCEPTION = new Exception(); + + public Cancellable asyncRequest(SolrRequest solrRequest, String collection, OnComplete onComplete) { + Request req; + try { + req = makeRequest(solrRequest, collection); + } catch (SolrServerException | IOException e) { + onComplete.onFailure(e); + return () -> {}; Review comment: Might be worthwhile to create a singleton instance and reuse that, since we can give it a descriptive name like alreadyFailedRequest, assuming I understand the intent here. ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java ########## @@ -66,4 +80,132 @@ public LBHttp2SolrClient(Http2SolrClient httpClient, String... baseSolrUrls) { protected SolrClient getClient(String baseUrl) { return httpClient; } + + public interface OnComplete { Review comment: There is already an interface OnComplete in Http2SolrClient, can we either reuse that or maybe we can use CompletableFuture or something here? Have two similarly named interfaces is going to get confusing I think. ########## File path: solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java ########## @@ -243,12 +233,13 @@ public ShardResponse takeCompletedOrError() { } private ShardResponse take(boolean bailOnError) { + try { + while (pending.get() > 0) { + ShardResponse rsp = responses.take(); + if (rsp == END_QUEUE) Review comment: Do we need to reinsert the END_QUEUE element in case there are other threads? ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java ########## @@ -359,65 +366,95 @@ public void send(OutStream outStream, SolrRequest req, String collection) throws outStream.flush(); } - public NamedList<Object> request(SolrRequest solrRequest, - String collection, - OnComplete onComplete) throws IOException, SolrServerException { - Request req = makeRequest(solrRequest, collection); + private static final Exception CANCELLED_EXCEPTION = new Exception(); + + public Cancellable asyncRequest(SolrRequest solrRequest, String collection, OnComplete onComplete) { + Request req; + try { + req = makeRequest(solrRequest, collection); + } catch (SolrServerException | IOException e) { + onComplete.onFailure(e); + return () -> {}; + } final ResponseParser parser = solrRequest.getResponseParser() == null ? this.parser: solrRequest.getResponseParser(); - - if (onComplete != null) { - // This async call only suitable for indexing since the response size is limited by 5MB - req.onRequestQueued(asyncTracker.queuedListener) - .onComplete(asyncTracker.completeListener).send(new BufferingResponseListener(5 * 1024 * 1024) { - - @Override - public void onComplete(Result result) { - if (result.isFailed()) { - onComplete.onFailure(result.getFailure()); - return; + req.onRequestQueued(asyncTracker.queuedListener) + .onComplete(asyncTracker.completeListener) + .send(new InputStreamResponseListener() { + @Override + public void onHeaders(Response response) { + super.onHeaders(response); + InputStreamResponseListener listener = this; + executor.execute(() -> { + InputStream is = listener.getInputStream(); + assert ObjectReleaseTracker.track(is); + try { + NamedList<Object> body = processErrorsAndResponse(response, parser, is, getEncoding(response), isV2ApiRequest(solrRequest)); + onComplete.onSuccess(body); + } catch (RemoteSolrException e) { + if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) { + onComplete.onFailure(e); + } + } catch (SolrServerException e) { + onComplete.onFailure(e); + } + }); } - NamedList<Object> rsp; - try { - InputStream is = getContentAsInputStream(); - assert ObjectReleaseTracker.track(is); - rsp = processErrorsAndResponse(result.getResponse(), - parser, is, getEncoding(), isV2ApiRequest(solrRequest)); - onComplete.onSuccess(rsp); - } catch (Exception e) { - onComplete.onFailure(e); + @Override + public void onFailure(Response response, Throwable failure) { + super.onFailure(response, failure); + if (failure != CANCELLED_EXCEPTION) { + onComplete.onFailure(createException(req, failure)); + } } - } - }); - return null; - } else { - try { - InputStreamResponseListener listener = new InputStreamResponseListener(); - req.send(listener); - Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS); - InputStream is = listener.getInputStream(); - assert ObjectReleaseTracker.track(is); - return processErrorsAndResponse(response, parser, is, getEncoding(response), isV2ApiRequest(solrRequest)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (TimeoutException e) { - throw new SolrServerException( - "Timeout occured while waiting response from server at: " + req.getURI(), e); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof ConnectException) { - throw new SolrServerException("Server refused connection at: " + req.getURI(), cause); - } - if (cause instanceof SolrServerException) { - throw (SolrServerException) cause; - } else if (cause instanceof IOException) { - throw new SolrServerException( - "IOException occured when talking to server at: " + getBaseURL(), cause); - } - throw new SolrServerException(cause.getMessage(), cause); + }); + return () -> req.abort(CANCELLED_EXCEPTION); + } + + @Override + public NamedList<Object> request(SolrRequest solrRequest, String collection) throws SolrServerException, IOException { + Request req = makeRequest(solrRequest, collection); + final ResponseParser parser = solrRequest.getResponseParser() == null + ? this.parser: solrRequest.getResponseParser(); + + try { + InputStreamResponseListener listener = new InputStreamResponseListener(); + req.send(listener); + Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS); + InputStream is = listener.getInputStream(); + assert ObjectReleaseTracker.track(is); + return processErrorsAndResponse(response, parser, is, getEncoding(response), isV2ApiRequest(solrRequest)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (RuntimeException e) { + throw e; + } catch (Throwable e) { + throw createException(req, e); Review comment: Catching Throwable seems very broad to me, can we do something less aggressive here? ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java ########## @@ -359,65 +366,95 @@ public void send(OutStream outStream, SolrRequest req, String collection) throws outStream.flush(); } - public NamedList<Object> request(SolrRequest solrRequest, - String collection, - OnComplete onComplete) throws IOException, SolrServerException { - Request req = makeRequest(solrRequest, collection); + private static final Exception CANCELLED_EXCEPTION = new Exception(); + + public Cancellable asyncRequest(SolrRequest solrRequest, String collection, OnComplete onComplete) { + Request req; + try { + req = makeRequest(solrRequest, collection); + } catch (SolrServerException | IOException e) { + onComplete.onFailure(e); + return () -> {}; + } final ResponseParser parser = solrRequest.getResponseParser() == null ? this.parser: solrRequest.getResponseParser(); - - if (onComplete != null) { - // This async call only suitable for indexing since the response size is limited by 5MB - req.onRequestQueued(asyncTracker.queuedListener) - .onComplete(asyncTracker.completeListener).send(new BufferingResponseListener(5 * 1024 * 1024) { - - @Override - public void onComplete(Result result) { - if (result.isFailed()) { - onComplete.onFailure(result.getFailure()); - return; + req.onRequestQueued(asyncTracker.queuedListener) + .onComplete(asyncTracker.completeListener) + .send(new InputStreamResponseListener() { + @Override + public void onHeaders(Response response) { + super.onHeaders(response); + InputStreamResponseListener listener = this; + executor.execute(() -> { + InputStream is = listener.getInputStream(); + assert ObjectReleaseTracker.track(is); + try { + NamedList<Object> body = processErrorsAndResponse(response, parser, is, getEncoding(response), isV2ApiRequest(solrRequest)); + onComplete.onSuccess(body); + } catch (RemoteSolrException e) { + if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) { + onComplete.onFailure(e); + } + } catch (SolrServerException e) { + onComplete.onFailure(e); + } + }); } - NamedList<Object> rsp; - try { - InputStream is = getContentAsInputStream(); - assert ObjectReleaseTracker.track(is); - rsp = processErrorsAndResponse(result.getResponse(), - parser, is, getEncoding(), isV2ApiRequest(solrRequest)); - onComplete.onSuccess(rsp); - } catch (Exception e) { - onComplete.onFailure(e); + @Override + public void onFailure(Response response, Throwable failure) { + super.onFailure(response, failure); + if (failure != CANCELLED_EXCEPTION) { + onComplete.onFailure(createException(req, failure)); + } } - } - }); - return null; - } else { - try { - InputStreamResponseListener listener = new InputStreamResponseListener(); - req.send(listener); - Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS); - InputStream is = listener.getInputStream(); - assert ObjectReleaseTracker.track(is); - return processErrorsAndResponse(response, parser, is, getEncoding(response), isV2ApiRequest(solrRequest)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (TimeoutException e) { - throw new SolrServerException( - "Timeout occured while waiting response from server at: " + req.getURI(), e); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof ConnectException) { - throw new SolrServerException("Server refused connection at: " + req.getURI(), cause); - } - if (cause instanceof SolrServerException) { - throw (SolrServerException) cause; - } else if (cause instanceof IOException) { - throw new SolrServerException( - "IOException occured when talking to server at: " + getBaseURL(), cause); - } - throw new SolrServerException(cause.getMessage(), cause); + }); + return () -> req.abort(CANCELLED_EXCEPTION); + } + + @Override + public NamedList<Object> request(SolrRequest solrRequest, String collection) throws SolrServerException, IOException { + Request req = makeRequest(solrRequest, collection); + final ResponseParser parser = solrRequest.getResponseParser() == null + ? this.parser: solrRequest.getResponseParser(); + + try { + InputStreamResponseListener listener = new InputStreamResponseListener(); + req.send(listener); + Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS); + InputStream is = listener.getInputStream(); + assert ObjectReleaseTracker.track(is); + return processErrorsAndResponse(response, parser, is, getEncoding(response), isV2ApiRequest(solrRequest)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (RuntimeException e) { + throw e; + } catch (Throwable e) { + throw createException(req, e); + } + } + + private SolrServerException createException(Request req, Throwable throwable) { Review comment: This whole method seems problematic, but I can't immediately think of what the alternative should look like. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org