dsmiley commented on code in PR #2402:
URL: https://github.com/apache/solr/pull/2402#discussion_r1562016978
##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java:
##########
@@ -417,67 +419,101 @@ public void send(OutStream outStream, SolrRequest<?>
req, String collection) thr
outStream.flush();
}
- @SuppressWarnings("StaticAssignmentOfThrowable")
- private static final Exception CANCELLED_EXCEPTION = new Exception();
-
- private static final Cancellable FAILED_MAKING_REQUEST_CANCELLABLE = () ->
{};
-
@Override
public Cancellable asyncRequest(
SolrRequest<?> solrRequest,
String collection,
AsyncListener<NamedList<Object>> asyncListener) {
- MDCCopyHelper mdcCopyHelper = new MDCCopyHelper();
- Request req;
+ asyncListener.onStart();
+ CompletableFuture<NamedList<Object>> cf =
+ requestAsync(solrRequest, collection)
+ .whenComplete(
+ (nl, t) -> {
+ if (t != null) {
+ asyncListener.onFailure(t);
+ } else {
+ asyncListener.onSuccess(nl);
+ }
+ });
+ return () -> cf.cancel(true);
+ }
+
+ @Override
+ public CompletableFuture<NamedList<Object>> requestAsync(
+ final SolrRequest<?> solrRequest, String collection) {
+ if (ClientUtils.shouldApplyDefaultCollection(collection, solrRequest)) {
+ collection = defaultCollection;
+ }
+ MDCCopyHelper mdcCopyHelper = new MDCCopyHelper();
+ CompletableFuture<NamedList<Object>> future = new CompletableFuture<>();
+ final MakeRequestReturnValue mrrv;
+ final String url;
try {
- String url = getRequestPath(solrRequest, collection);
- InputStreamResponseListener listener =
- new InputStreamReleaseTrackingResponseListener() {
- @Override
- public void onHeaders(Response response) {
- super.onHeaders(response);
- executor.execute(
- () -> {
- InputStream is = getInputStream();
- try {
- NamedList<Object> body =
- processErrorsAndResponse(solrRequest, response, is,
url);
- mdcCopyHelper.onBegin(null);
- log.debug("response processing success");
- asyncListener.onSuccess(body);
- } catch (RemoteSolrException e) {
- if (SolrException.getRootCause(e) !=
CANCELLED_EXCEPTION) {
+ url = getRequestPath(solrRequest, collection);
+ mrrv = makeRequest(solrRequest, url, true);
+ } catch (SolrServerException | IOException e) {
+ future.completeExceptionally(e);
+ return future;
+ }
+ final ResponseParser parser =
+ solrRequest.getResponseParser() == null ? this.parser :
solrRequest.getResponseParser();
+ mrrv.request
+ .onRequestQueued(asyncTracker.queuedListener)
+ .onComplete(asyncTracker.completeListener)
+ .send(
+ new InputStreamResponseListener() {
+ @Override
+ public void onHeaders(Response response) {
+ super.onHeaders(response);
+ InputStreamResponseListener listener = this;
+ executor.execute(
+ () -> {
+ InputStream is = listener.getInputStream();
+
+ // TODO: Original PR had this, but we do not close the
stream.
+ // The legacy implementation did not track the input
stream,
+ // or close it.
+ // assert ObjectReleaseTracker.track(is);
+
+ try {
+ NamedList<Object> body =
+ processErrorsAndResponse(solrRequest, response,
is, url);
+ mdcCopyHelper.onBegin(null);
+ log.debug("response processing success");
+ future.complete(body);
+ } catch (RemoteSolrException | SolrServerException e) {
mdcCopyHelper.onBegin(null);
log.debug("response processing failed", e);
- asyncListener.onFailure(e);
+ future.completeExceptionally(e);
+ } finally {
+ log.debug("response processing completed");
Review Comment:
for consistency with 2 cases right above, these 2 lines should be reversed.
##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java:
##########
@@ -247,7 +247,9 @@ private HttpClient createHttpClient(Builder builder) {
this.authenticationStore = new AuthenticationStoreHolder();
httpClient.setAuthenticationStore(this.authenticationStore);
- httpClient.setConnectTimeout(builder.connectionTimeoutMillis);
+ if (builder.connectionTimeoutMillis != null) {
Review Comment:
out-of-scope of this PR?
##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java:
##########
@@ -417,67 +419,101 @@ public void send(OutStream outStream, SolrRequest<?>
req, String collection) thr
outStream.flush();
}
- @SuppressWarnings("StaticAssignmentOfThrowable")
- private static final Exception CANCELLED_EXCEPTION = new Exception();
-
- private static final Cancellable FAILED_MAKING_REQUEST_CANCELLABLE = () ->
{};
-
@Override
public Cancellable asyncRequest(
SolrRequest<?> solrRequest,
String collection,
AsyncListener<NamedList<Object>> asyncListener) {
- MDCCopyHelper mdcCopyHelper = new MDCCopyHelper();
- Request req;
+ asyncListener.onStart();
+ CompletableFuture<NamedList<Object>> cf =
+ requestAsync(solrRequest, collection)
+ .whenComplete(
+ (nl, t) -> {
+ if (t != null) {
+ asyncListener.onFailure(t);
+ } else {
+ asyncListener.onSuccess(nl);
+ }
+ });
+ return () -> cf.cancel(true);
+ }
+
+ @Override
+ public CompletableFuture<NamedList<Object>> requestAsync(
+ final SolrRequest<?> solrRequest, String collection) {
+ if (ClientUtils.shouldApplyDefaultCollection(collection, solrRequest)) {
+ collection = defaultCollection;
+ }
+ MDCCopyHelper mdcCopyHelper = new MDCCopyHelper();
+ CompletableFuture<NamedList<Object>> future = new CompletableFuture<>();
+ final MakeRequestReturnValue mrrv;
+ final String url;
try {
- String url = getRequestPath(solrRequest, collection);
- InputStreamResponseListener listener =
- new InputStreamReleaseTrackingResponseListener() {
- @Override
- public void onHeaders(Response response) {
- super.onHeaders(response);
- executor.execute(
- () -> {
- InputStream is = getInputStream();
- try {
- NamedList<Object> body =
- processErrorsAndResponse(solrRequest, response, is,
url);
- mdcCopyHelper.onBegin(null);
- log.debug("response processing success");
- asyncListener.onSuccess(body);
- } catch (RemoteSolrException e) {
- if (SolrException.getRootCause(e) !=
CANCELLED_EXCEPTION) {
+ url = getRequestPath(solrRequest, collection);
+ mrrv = makeRequest(solrRequest, url, true);
+ } catch (SolrServerException | IOException e) {
+ future.completeExceptionally(e);
+ return future;
+ }
+ final ResponseParser parser =
+ solrRequest.getResponseParser() == null ? this.parser :
solrRequest.getResponseParser();
+ mrrv.request
+ .onRequestQueued(asyncTracker.queuedListener)
+ .onComplete(asyncTracker.completeListener)
+ .send(
+ new InputStreamResponseListener() {
+ @Override
+ public void onHeaders(Response response) {
+ super.onHeaders(response);
+ InputStreamResponseListener listener = this;
+ executor.execute(
+ () -> {
+ InputStream is = listener.getInputStream();
+
+ // TODO: Original PR had this, but we do not close the
stream.
Review Comment:
this is a "nocommit" since surely we won't commit/merge this in the end.
Any way, I agree -- don't need ObjectReleaseTracker usage here.
##########
solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpSolrClientTestBase.java:
##########
@@ -585,17 +597,21 @@ protected void testUpdateAsync() throws Exception {
}
}
- protected void testQueryAsync() throws Exception {
+ protected void testQueryAsync(boolean useDeprecatedApi) throws Exception {
ResponseParser rp = new XMLResponseParser();
DebugServlet.clear();
DebugServlet.addResponseHeader("Content-Type", "application/xml;
charset=UTF-8");
String url = getBaseUrl() + DEBUG_SERVLET_PATH;
HttpSolrClientBuilderBase<?, ?> b =
builder(url, DEFAULT_CONNECTION_TIMEOUT,
DEFAULT_CONNECTION_TIMEOUT).withResponseParser(rp);
int limit = 10;
- CountDownLatch cdl = new CountDownLatch(limit);
- DebugAsyncListener[] listeners = new DebugAsyncListener[limit];
- Cancellable[] cancellables = new Cancellable[limit];
+
+ CountDownLatch cdl = new CountDownLatch(limit); // Deprecated API use
Review Comment:
nitpick: just call this "latch". Same goes elsewhere. Abbreviations like
this are hard to decipher later.
##########
solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java:
##########
@@ -155,43 +154,33 @@ public void submit(
return;
}
- // all variables that set inside this listener must be at least volatile
- responseCancellableMap.put(
- srsp,
- this.lbClient.asyncReq(
- lbReq,
- new AsyncListener<>() {
- volatile long startTime = System.nanoTime();
-
- @Override
- public void onStart() {
- SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
- if (requestInfo != null)
-
req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
- }
-
- @Override
- public void onSuccess(LBSolrClient.Rsp rsp) {
- ssr.nl = rsp.getResponse();
- srsp.setShardAddress(rsp.getServer());
- ssr.elapsedTime =
- TimeUnit.MILLISECONDS.convert(
- System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
- responses.add(srsp);
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- ssr.elapsedTime =
- TimeUnit.MILLISECONDS.convert(
- System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
- srsp.setException(throwable);
- if (throwable instanceof SolrException) {
- srsp.setResponseCode(((SolrException) throwable).code());
- }
- responses.add(srsp);
- }
- }));
+ long startTime = System.nanoTime();
+ SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
+ if (requestInfo != null) {
+ req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
+ }
+
+ CompletableFuture<LBSolrClient.Rsp> future =
this.lbClient.requestAsync(lbReq);
+ future.whenComplete(
+ (rsp, throwable) -> {
+ if (!future.isCompletedExceptionally()) {
+ ssr.nl = rsp.getResponse();
+ srsp.setShardAddress(rsp.getServer());
+ ssr.elapsedTime =
+ TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime,
TimeUnit.NANOSECONDS);
+ responses.add(srsp);
+ } else if (!future.isCancelled()) {
Review Comment:
And come to think of it, you could factor out some common code by maybe just
wrapping the whole callback with this check. Then the "responses" gathering
and elapsedTime parts are done once, and it becomes clear that this is always
done.
##########
solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java:
##########
@@ -155,43 +154,33 @@ public void submit(
return;
}
- // all variables that set inside this listener must be at least volatile
- responseCancellableMap.put(
- srsp,
- this.lbClient.asyncReq(
- lbReq,
- new AsyncListener<>() {
- volatile long startTime = System.nanoTime();
-
- @Override
- public void onStart() {
- SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
- if (requestInfo != null)
-
req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
- }
-
- @Override
- public void onSuccess(LBSolrClient.Rsp rsp) {
- ssr.nl = rsp.getResponse();
- srsp.setShardAddress(rsp.getServer());
- ssr.elapsedTime =
- TimeUnit.MILLISECONDS.convert(
- System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
- responses.add(srsp);
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- ssr.elapsedTime =
- TimeUnit.MILLISECONDS.convert(
- System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
- srsp.setException(throwable);
- if (throwable instanceof SolrException) {
- srsp.setResponseCode(((SolrException) throwable).code());
- }
- responses.add(srsp);
- }
- }));
+ long startTime = System.nanoTime();
+ SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
+ if (requestInfo != null) {
+ req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
+ }
+
+ CompletableFuture<LBSolrClient.Rsp> future =
this.lbClient.requestAsync(lbReq);
+ future.whenComplete(
+ (rsp, throwable) -> {
+ if (!future.isCompletedExceptionally()) {
Review Comment:
If feels a little weird to refer to the future instead of the callback
arguments. In this spot, I think you can merely check that throwable is null.
##########
solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBHttp2SolrClientTest.java:
##########
@@ -51,4 +67,249 @@ public void
testLBHttp2SolrClientWithTheseParamNamesInTheUrl() {
http2SolrClient.getUrlParamNames().toArray());
}
}
+
+ @Test
+ public void testAsyncDeprecated() {
+ testAsync(true);
+ }
+
+ @Test
+ public void testAsync() {
+ testAsync(false);
+ }
+
+ @Test
+ public void testAsyncWithFailures() {
+
+ // TODO: This demonstrates that the failing endpoint always gets retried,
but
+ // I would expect it to be labelled as a "zombie" and be skipped with
additional iterations.
+
+ LBSolrClient.Endpoint ep1 = new
LBSolrClient.Endpoint("http://endpoint.one");
+ LBSolrClient.Endpoint ep2 = new
LBSolrClient.Endpoint("http://endpoint.two");
+ List<LBSolrClient.Endpoint> endpointList = List.of(ep1, ep2);
+
+ Http2SolrClient.Builder b = new Http2SolrClient.Builder("http://base.url");
+ try (MockHttp2SolrClient client = new
MockHttp2SolrClient("http://base.url", b);
+ LBHttp2SolrClient testClient = new LBHttp2SolrClient.Builder(client,
ep1, ep2).build()) {
+
+ for (int j = 0; j < 2; j++) {
+ // j: first time Endpoint One will retrun error code 500.
+ // second time Endpoint One will be healthy
+
+ String basePathToSucceed;
+ if (j == 0) {
+ client.basePathToFail = ep1.getBaseUrl();
+ basePathToSucceed = ep2.getBaseUrl();
+ } else {
+ client.basePathToFail = ep2.getBaseUrl();
+ basePathToSucceed = ep1.getBaseUrl();
+ }
+
+ for (int i = 0; i < 10; i++) {
+ // i: we'll try 10 times to see if it behaves the same every time.
+
+ QueryRequest queryRequest = new QueryRequest(new
MapSolrParams(Map.of("q", "" + i)));
+ LBSolrClient.Req req = new LBSolrClient.Req(queryRequest,
endpointList);
+ String iterMessage = "iter j/i " + j + "/" + i;
+ try {
+ testClient.requestAsync(req).get(1, TimeUnit.MINUTES);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ fail("interrupted");
+ } catch (TimeoutException | ExecutionException e) {
+ fail(iterMessage + " Response ended in failure: " + e);
+ }
+ if (j == 0) {
+ // The first endpoint gives an exception, so it retries.
+ assertEquals(iterMessage, 2, client.lastBasePaths.size());
+
+ String failedBasePath = client.lastBasePaths.remove(0);
+ assertEquals(iterMessage, client.basePathToFail, failedBasePath);
+ } else {
+ // The first endpoint does not give the exception, it doesn't
retry.
+ assertEquals(iterMessage, 1, client.lastBasePaths.size());
+ }
+ String successBasePath = client.lastBasePaths.remove(0);
+ assertEquals(iterMessage, basePathToSucceed, successBasePath);
+ }
+ }
+ }
+ }
+
+ private void testAsync(boolean useDeprecatedApi) {
+ LBSolrClient.Endpoint ep1 = new
LBSolrClient.Endpoint("http://endpoint.one");
+ LBSolrClient.Endpoint ep2 = new
LBSolrClient.Endpoint("http://endpoint.two");
+ List<LBSolrClient.Endpoint> endpointList = List.of(ep1, ep2);
+
+ Http2SolrClient.Builder b = new Http2SolrClient.Builder("http://base.url");
+ try (MockHttp2SolrClient client = new
MockHttp2SolrClient("http://base.url", b);
+ LBHttp2SolrClient testClient = new LBHttp2SolrClient.Builder(client,
ep1, ep2).build()) {
+
+ int limit = 10; // For simplicity use an even limit
+ int halfLimit = limit / 2; // see TODO below
+
+ CountDownLatch cdl = new CountDownLatch(limit); // deprecated API use
+ List<LBTestAsyncListener> listeners = new ArrayList<>(); // deprecated
API use
+ List<CompletableFuture<LBSolrClient.Rsp>> responses = new ArrayList<>();
+
+ for (int i = 0; i < limit; i++) {
+ QueryRequest queryRequest = new QueryRequest(new
MapSolrParams(Map.of("q", "" + i)));
+ LBSolrClient.Req req = new LBSolrClient.Req(queryRequest,
endpointList);
+ if (useDeprecatedApi) {
+ LBTestAsyncListener listener = new LBTestAsyncListener(cdl);
+ listeners.add(listener);
+ testClient.asyncReq(req, listener);
+ } else {
+ responses.add(testClient.requestAsync(req));
+ }
+ }
+
+ if (useDeprecatedApi) {
+ try {
+ // This is just a formality. This is a single-threaded test.
+ cdl.await(1, TimeUnit.MINUTES);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ fail("interrupted");
+ }
+ }
+
+ QueryRequest[] queryRequests = new QueryRequest[limit];
+ int numEndpointOne = 0;
+ int numEndpointTwo = 0; // see TODO below
+ for (int i = 0; i < limit; i++) {
+ SolrRequest<?> lastSolrReq = client.lastSolrRequests.get(i);
+ assertTrue(lastSolrReq instanceof QueryRequest);
+ QueryRequest lastQueryReq = (QueryRequest) lastSolrReq;
+ int index = Integer.parseInt(lastQueryReq.getParams().get("q"));
+ assertNull("Found same request twice: " + index, queryRequests[index]);
+ queryRequests[index] = lastQueryReq;
+ if (lastQueryReq.getBasePath().equals(ep1.toString())) {
+ numEndpointOne++;
+ } else if (lastQueryReq.getBasePath().equals(ep2.toString())) {
+ numEndpointTwo++;
+ }
+ NamedList<Object> lastResponse;
+ if (useDeprecatedApi) {
+ LBTestAsyncListener lastAsyncListener = listeners.get(index);
+ assertTrue(lastAsyncListener.onStartCalled);
+ assertNull(lastAsyncListener.failure);
+ assertNotNull(lastAsyncListener.success);
+ lastResponse = lastAsyncListener.success.getResponse();
+ } else {
+ LBSolrClient.Rsp lastRsp = null;
+ try {
+ lastRsp = responses.get(index).get();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ fail("interrupted");
+ } catch (ExecutionException ee) {
+ fail("Response " + index + " ended in failure: " + ee);
+ }
+ lastResponse = lastRsp.getResponse();
+ }
+
+ // The Mock will return {"response": index}.
+ assertEquals("" + index, lastResponse.get("response"));
+ }
+
+ // TODO: LBHttp2SolrClient creates a new "endpoint iterator" per
request, thus
Review Comment:
Huh. I think they should behave the same, first of all, because it's not
pertinent to the HTTP Client. But I think the first is supposed to be chosen
because the order can be chosen intentionally (say prefer the replica that is a
leader, or prefer pull replicas, etc.) and passed in sorted by such rules.
##########
solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java:
##########
@@ -155,43 +154,33 @@ public void submit(
return;
}
- // all variables that set inside this listener must be at least volatile
- responseCancellableMap.put(
- srsp,
- this.lbClient.asyncReq(
- lbReq,
- new AsyncListener<>() {
- volatile long startTime = System.nanoTime();
-
- @Override
- public void onStart() {
- SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
- if (requestInfo != null)
-
req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
- }
-
- @Override
- public void onSuccess(LBSolrClient.Rsp rsp) {
- ssr.nl = rsp.getResponse();
- srsp.setShardAddress(rsp.getServer());
- ssr.elapsedTime =
- TimeUnit.MILLISECONDS.convert(
- System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
- responses.add(srsp);
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- ssr.elapsedTime =
- TimeUnit.MILLISECONDS.convert(
- System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
- srsp.setException(throwable);
- if (throwable instanceof SolrException) {
- srsp.setResponseCode(((SolrException) throwable).code());
- }
- responses.add(srsp);
- }
- }));
+ long startTime = System.nanoTime();
+ SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
+ if (requestInfo != null) {
+ req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
+ }
+
+ CompletableFuture<LBSolrClient.Rsp> future =
this.lbClient.requestAsync(lbReq);
+ future.whenComplete(
+ (rsp, throwable) -> {
+ if (!future.isCompletedExceptionally()) {
+ ssr.nl = rsp.getResponse();
+ srsp.setShardAddress(rsp.getServer());
+ ssr.elapsedTime =
+ TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime,
TimeUnit.NANOSECONDS);
+ responses.add(srsp);
+ } else if (!future.isCancelled()) {
Review Comment:
Or say `!(throwable instanceof CancellationException)`
##########
solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java:
##########
@@ -18,9 +18,11 @@
package org.apache.solr.client.solrj.util;
/**
- * The return type for solrJ asynchronous requests, providing a mechanism
whereby callers may
- * request cancellation.
+ * @deprecated Use the async variants that return CompletableFuture.
Review Comment:
This line is supposed to be at the end of the javadoc; isn't supposed to
encompass current description.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]