This is an automated email from the ASF dual-hosted git repository.
houston pushed a commit to branch branch_9_9
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9_9 by this push:
new f94343603a4 SOLR-17792: Fix deadlocks in ParallelHttpShardHandler
(#3398)
f94343603a4 is described below
commit f94343603a494eb2df147f615fafa646d9e67a1b
Author: Houston Putman <[email protected]>
AuthorDate: Fri Jul 11 14:25:05 2025 -0700
SOLR-17792: Fix deadlocks in ParallelHttpShardHandler (#3398)
Re-do the way that HttpShardHandler does synchronization
(cherry picked from commit 2a5cc9d97b059d8264b8a80d386ca205e4c1e8a9)
---
solr/CHANGES.txt | 2 +
.../solr/handler/component/HttpShardHandler.java | 230 ++++++++++-----------
.../component/ParallelHttpShardHandler.java | 77 +++----
.../solr/handler/component/SearchHandler.java | 1 -
.../solr/client/solrj/impl/LBHttp2SolrClient.java | 13 +-
5 files changed, 156 insertions(+), 167 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 0d7a0ab90c3..2c83f2c5e42 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -148,6 +148,8 @@ Bug Fixes
* SOLR-17777: Fix a very minor memory leak in metrics reporting code when a
core is deleted. (Pierre Salagnac)
+* SOLR-17792: Fix deadlocks in ParallelHttpShardHandler, re-implement
synchronization in HttpShardHandler (Houston Putman)
+
Dependency Upgrades
---------------------
* PR#3078: chore(deps): update apache.kafka to v3.9.0 (solrbot)
diff --git
a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index 219197d8cd8..4a0e9b46d73 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -19,15 +19,17 @@ package org.apache.solr.handler.component;
import static org.apache.solr.common.params.CommonParams.PARTIAL_RESULTS;
import static org.apache.solr.request.SolrQueryRequest.disallowPartialResults;
+import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.BiConsumer;
+import java.util.concurrent.atomic.AtomicBoolean;
import net.jcip.annotations.NotThreadSafe;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
@@ -51,6 +53,8 @@ import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.security.AllowListUrlChecker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Solr's default {@link ShardHandler} implementation; uses Jetty's async HTTP
Client APIs for
@@ -66,6 +70,9 @@ import org.apache.solr.security.AllowListUrlChecker;
@NotThreadSafe
public class HttpShardHandler extends ShardHandler {
+ @SuppressWarnings("unused")
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
/**
* If the request context map has an entry with this key and Boolean.TRUE as
value, {@link
* #prepDistributed(ResponseBuilder)} will only include {@link
@@ -74,43 +81,20 @@ public class HttpShardHandler extends ShardHandler {
* org.apache.solr.common.cloud.Replica.Type#TLOG}). This is used by the
RealtimeGet handler,
* since other types of replicas shouldn't respond to RTG requests
*/
- public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime";
+ public static final String ONLY_NRT_REPLICAS = "distribOnlyRealtime";
- private final HttpShardHandlerFactory httpShardHandlerFactory;
-
- /*
- * Three critical fields:
- * - pending: keeps track of how many things we started
- * - responseFutureMap: holds futures for anything not yet complete
- * - responses: the result of things we started, when responses.size()
- *
- * All of this must be kept consistent and is therefore synchronized on
RESPONSES_LOCK
- * The exception is when a response is added so long as pending is
incremented first
- * because responses is a LinkedBlockingQueue and that is synchronized. The
response
- * future map is not synchronized however, and so we need to guard it for
both order
- * and memory consistency (happens before) reasons.
- *
- * The code works by looping/decrementing pending until responses.size()
matches the
- * size of the shard list. Thus, there is a tricky, hidden assumption of one
response
- * for every shard, even if the shard is down (so we add a fake response
with a shard
- * down exception). Note that down shards have a shard url of empty string
in this case.
- *
- * This seems overcomplicated. Perhaps this can someday be changed to simply
- * test responses.size == pending.size?
+ /**
+ * This is a fake ShardResponse used internally to trigger the {@link
#take(boolean)} method to
+ * stop waiting and cancel outstanding shard requests.
*/
- protected final Object FUTURE_MAP_LOCK = new Object();
+ private static final ShardResponse CANCELLATION_NOTIFICATION = new
ShardResponse();
- protected Map<ShardResponse, CompletableFuture<LBSolrClient.Rsp>>
responseFutureMap;
- protected BlockingQueue<ShardResponse> responses;
+ private final HttpShardHandlerFactory httpShardHandlerFactory;
- /**
- * The number of pending requests. This must be incremented before a {@link
ShardResponse} is
- * added to {@link #responses}, and decremented after a ShardResponse is
removed from {@code
- * responses}. We can't rely on responseFutureMap.size() because it is an
unsynchronized
- * collection updated by multiple threads, and it's internal state including
the size field is not
- * volatile/synchronized.
- */
- protected AtomicInteger pending;
+ protected final ConcurrentMap<ShardResponse,
CompletableFuture<LBSolrClient.Rsp>>
+ responseFutureMap;
+ protected final BlockingQueue<ShardResponse> responses;
+ private final AtomicBoolean canceled = new AtomicBoolean(false);
private final Map<String, List<String>> shardToURLs;
protected LBHttp2SolrClient lbClient;
@@ -118,9 +102,8 @@ public class HttpShardHandler extends ShardHandler {
public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) {
this.httpShardHandlerFactory = httpShardHandlerFactory;
this.lbClient = httpShardHandlerFactory.loadbalancer;
- this.pending = new AtomicInteger(0);
this.responses = new LinkedBlockingQueue<>();
- this.responseFutureMap = new HashMap<>();
+ this.responseFutureMap = new ConcurrentHashMap<>();
// maps "localhost:8983|localhost:7574" to a shuffled
// List("http://localhost:8983","http://localhost:7574")
@@ -225,24 +208,21 @@ public class HttpShardHandler extends ShardHandler {
return srsp;
}
- private void recordNoUrlShardResponse(ShardResponse srsp, String shard) {
- // TODO: what's the right error code here? We should use the same thing
when
- // all of the servers for a shard are down.
- // TODO: shard is a blank string in this case, which is somewhat less than
helpful
- SolrException exception =
- new SolrException(
- SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting
shard: " + shard);
+ protected void recordShardSubmitError(ShardResponse srsp, SolrException
exception) {
srsp.setException(exception);
srsp.setResponseCode(exception.code());
- // order of next two statements is important. Both are synchronized
objects so
- // synchronization is needed so long as the order is correct.
- pending.incrementAndGet();
- responses.add(srsp);
+ synchronized (canceled) {
+ if (!canceled.get()) {
+ responses.add(srsp);
+ }
+ }
}
@Override
public void submit(ShardRequest sreq, String shard, ModifiableSolrParams
params) {
+ // Since we are submitting new shard requests, the request is not canceled
+ canceled.set(false);
// do this outside of the callable for thread safety reasons
final List<String> urls = getURLs(shard);
final var lbReq = prepareLBRequest(sreq, shard, params, urls);
@@ -250,7 +230,10 @@ public class HttpShardHandler extends ShardHandler {
final var ssr = new SimpleSolrResponse();
srsp.setSolrResponse(ssr);
if (urls.isEmpty()) {
- recordNoUrlShardResponse(srsp, shard);
+ recordShardSubmitError(
+ srsp,
+ new SolrException(
+ SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting
shard: " + shard));
return;
}
long startTimeNS = System.nanoTime();
@@ -281,15 +264,39 @@ public class HttpShardHandler extends ShardHandler {
ShardResponse srsp,
long startTimeNS) {
CompletableFuture<LBSolrClient.Rsp> future =
this.lbClient.requestAsync(lbReq);
- future.whenComplete(new ShardRequestCallback(ssr, srsp, startTimeNS, sreq,
shard, params));
- synchronized (FUTURE_MAP_LOCK) {
- // we want to ensure that there is a future in flight before incrementing
- // pending. If anything fails such that a request/future is not created
there is
- // potential for the request to hang forever waiting on a
responses.take()
- // and so if anything failed during future creation we would get stuck.
- pending.incrementAndGet();
- responseFutureMap.put(srsp, future);
+ // Synchronize on canceled, so that we know precisely whether to add it to
the responseFutureMap
+ // or not.
+ synchronized (canceled) {
+ if (canceled.get()) {
+ future.cancel(true);
+ return;
+ } else {
+ responseFutureMap.put(srsp, future);
+ }
}
+ // Add the callback explicitly after adding the future to the map, because
the callback relies
+ // on the map already having the future.
+ future.whenComplete(
+ (LBSolrClient.Rsp rsp, Throwable throwable) -> {
+ if (rsp != null) {
+ ssr.nl = rsp.getResponse();
+ srsp.setShardAddress(rsp.getServer());
+ } else if (throwable != null) {
+ srsp.setException(throwable);
+ if (throwable instanceof SolrException) {
+ srsp.setResponseCode(((SolrException) throwable).code());
+ }
+ }
+ ssr.elapsedTime =
+ TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNS,
TimeUnit.NANOSECONDS);
+ // Synchronize on cancelled so this code and cancelAll() cannot
happen at the same time
+ synchronized (canceled) {
+ // We don't want to add responses after the requests have been
canceled
+ if (responseFutureMap.containsKey(srsp)) {
+ responses.add(HttpShardHandler.this.transformResponse(sreq,
srsp, shard));
+ }
+ }
+ });
}
/** Subclasses could modify the request based on the shard */
@@ -318,31 +325,36 @@ public class HttpShardHandler extends ShardHandler {
}
private ShardResponse take(boolean bailOnError) {
+ ShardResponse previousResponse = null;
try {
while (responsesPending()) {
- ShardResponse rsp;
- synchronized (FUTURE_MAP_LOCK) {
- // in the parallel case we need to recheck responsesPending()
- // in case all attempts to submit failed.
- rsp = responses.poll(50, TimeUnit.MILLISECONDS);
- if (rsp == null) {
- continue;
- }
+ ShardResponse rsp = responses.take();
+ if (rsp == CANCELLATION_NOTIFICATION) {
+ // This is only queued in cancelAll(), so all outstanding futures
have already been
+ // canceled.
+ responses.clear();
+
+ // We want to return the last response we received, if possible
+ return previousResponse;
+ } else {
responseFutureMap.remove(rsp);
- pending.decrementAndGet();
- }
- if (bailOnError && rsp.getException() != null)
- return rsp; // if exception, return immediately
- // add response to the response list... we do this after the take() and
- // not after the completion of "call" so we know when the last response
- // for a request was received. Otherwise we might return the same
- // request more than once.
- rsp.getShardRequest().responses.add(rsp);
+ // add response to the response list... we do this after the take()
and
+ // not after the completion of "call" so we know when the last
response
+ // for a request was received. Otherwise, we might return the same
+ // request more than once.
+ rsp.getShardRequest().responses.add(rsp);
+
+ if (rsp.getException() != null
+ && (bailOnError ||
disallowPartialResults(rsp.getShardRequest().params))) {
+ cancelAll();
+ }
+ }
if (rsp.getShardRequest().responses.size() ==
rsp.getShardRequest().actualShards.length) {
return rsp;
}
+ previousResponse = rsp;
}
} catch (InterruptedException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -351,15 +363,30 @@ public class HttpShardHandler extends ShardHandler {
}
protected boolean responsesPending() {
- return pending.get() > 0;
+ return !responseFutureMap.isEmpty() || !responses.isEmpty();
}
@Override
public void cancelAll() {
- synchronized (FUTURE_MAP_LOCK) {
+ // Canceled must be set to true before calling the cancellation code, to
ensure that new tasks
+ // are not enqueued after the outstanding requests have been canceled.
+ // This code isn't perfectly threadsafe, and there can be a
race-condition, but for our purposes
+ // it should be fine. Failing to cancel a request, a very small percentage
of the time, will
+ // have very little noticeable effect. And even if the request is sent
after cancellation, the
+ // responses will not be recorded.
+ // Queue a fake response to notify take() that it should no longer wait on
responses as the
+ // outstanding requests have been canceled
+ synchronized (canceled) {
+ boolean alreadyCanceled = canceled.getAndSet(true);
+ if (!alreadyCanceled) {
+ // We don't want to queue this multiple times if we are already
canceled
+ responses.add(CANCELLATION_NOTIFICATION);
+ }
+ // Cancel all outstanding requests
for (CompletableFuture<LBSolrClient.Rsp> future :
responseFutureMap.values()) {
- future.cancel(true);
- pending.decrementAndGet();
+ if (!future.isDone()) {
+ future.cancel(true);
+ }
}
responseFutureMap.clear();
}
@@ -500,51 +527,4 @@ public class HttpShardHandler extends ShardHandler {
public ShardHandlerFactory getShardHandlerFactory() {
return httpShardHandlerFactory;
}
-
- class ShardRequestCallback implements BiConsumer<LBSolrClient.Rsp,
Throwable> {
- private final SimpleSolrResponse ssr;
- private final ShardResponse srsp;
- private final long startTimeNS;
- private final ShardRequest sreq;
- private final String shard;
- private final ModifiableSolrParams params;
-
- public ShardRequestCallback(
- SimpleSolrResponse ssr,
- ShardResponse srsp,
- long startTimeNS,
- ShardRequest sreq,
- String shard,
- ModifiableSolrParams params) {
- this.ssr = ssr;
- this.srsp = srsp;
- this.startTimeNS = startTimeNS;
- this.sreq = sreq;
- this.shard = shard;
- this.params = params;
- }
-
- @Override
- public void accept(LBSolrClient.Rsp rsp, Throwable throwable) {
- if (rsp != null) {
- ssr.nl = rsp.getResponse();
- srsp.setShardAddress(rsp.getServer());
- ssr.elapsedTime =
- TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNS,
TimeUnit.NANOSECONDS);
- responses.add(HttpShardHandler.this.transformResponse(sreq, srsp,
shard));
- } else if (throwable != null) {
- ssr.elapsedTime =
- TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNS,
TimeUnit.NANOSECONDS);
- srsp.setException(throwable);
- if (throwable instanceof SolrException) {
- srsp.setResponseCode(((SolrException) throwable).code());
- }
- responses.add(HttpShardHandler.this.transformResponse(sreq, srsp,
shard));
- if (disallowPartialResults(params)) {
- HttpShardHandler.this
- .cancelAll(); // Note: method synchronizes
RESPONSE_CANCELABLE_LOCK on entry
- }
- }
- }
- }
}
diff --git
a/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java
b/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java
index 7d6cf13874f..c1d6f9c2007 100644
---
a/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java
+++
b/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java
@@ -18,10 +18,13 @@ package org.apache.solr.handler.component;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.FutureTask;
import net.jcip.annotations.NotThreadSafe;
import org.apache.solr.client.solrj.impl.LBSolrClient;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,34 +49,25 @@ public class ParallelHttpShardHandler extends
HttpShardHandler {
/*
* Unlike the basic HttpShardHandler, this class allows us to exit submit
before
- * pending is incremented and the responseFutureMap is updated. If the
runnables that
+ * the responseFutureMap is updated. If the runnables that
* do that are slow to execute the calling code could attempt to
takeCompleted(),
* while pending is still zero. In this condition, the code would assume
that all
* requests are processed (despite the runnables created by this class still
* waiting). Thus, we need to track that there are attempts still in flight.
- *
- * This tracking is complicated by the fact that there could be a failure in
the
- * runnable that causes the request to never be created and pending to never
be
- * incremented. Thus, we need to know that we have attempted something AND
that that
- * attempt has also been processed by the executor.
- *
- * This condition is added to the check that controls the loop in take via
the
- * override for #responsesPending(). We rely on calling code call submit for
all
- * requests desired before the call to takeCompleted()
*/
- AtomicInteger attemptStart = new AtomicInteger(0);
- AtomicInteger attemptCount = new AtomicInteger(0);
+ private final ConcurrentMap<ShardResponse, FutureTask<Void>> submitFutures;
public ParallelHttpShardHandler(ParallelHttpShardHandlerFactory
httpShardHandlerFactory) {
super(httpShardHandlerFactory);
this.commExecutor = httpShardHandlerFactory.commExecutor;
+ this.submitFutures = new ConcurrentHashMap<>();
}
@Override
protected boolean responsesPending() {
// ensure we can't exit while loop in HttpShardHandler.take(boolean) until
we've completed
- // as many Runnable actions as we created.
- return super.responsesPending() || attemptStart.get() > attemptCount.get();
+ // submitting all of the shard requests
+ return super.responsesPending() || !submitFutures.isEmpty();
}
@Override
@@ -85,32 +79,41 @@ public class ParallelHttpShardHandler extends
HttpShardHandler {
SimpleSolrResponse ssr,
ShardResponse srsp,
long startTimeNS) {
- final Runnable executeRequestRunnable =
- () -> {
+ FutureTask<Void> futureTask =
+ new FutureTask<>(
+ () -> super.makeShardRequest(sreq, shard, params, lbReq, ssr,
srsp, startTimeNS), null);
+ CompletableFuture<Void> completableFuture =
+ CompletableFuture.runAsync(futureTask, commExecutor);
+ submitFutures.put(srsp, futureTask);
+ completableFuture.whenComplete(
+ (r, t) -> {
try {
- CompletableFuture<LBSolrClient.Rsp> future =
this.lbClient.requestAsync(lbReq);
- future.whenComplete(
- new ShardRequestCallback(ssr, srsp, startTimeNS, sreq, shard,
params));
- synchronized (FUTURE_MAP_LOCK) {
- // we want to ensure that there is a future in flight before
incrementing
- // pending, because there is a risk that the request will hang
forever waiting
- // on a responses.take() in HttpShardHandler.take(boolean) if
anything failed
- // during future creation. It is not a problem if the response
shows up before
- // we increment pending. The attemptingSubmit flag guards us
against inadvertently
- // skipping the while loop in HttpShardHandler.take(boolean)
until at least
- // one runnable has been executed.
- pending.incrementAndGet();
- responseFutureMap.put(srsp, future);
+ if (t != null) {
+ recordShardSubmitError(
+ srsp,
+ new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ "Exception occurred while trying to send a request to
shard: " + shard,
+ t));
}
} finally {
- // it must not be possible to exit the runnable in any way without
calling this.
- attemptCount.incrementAndGet();
+ // Remove so that we keep track of in-flight submits only
+ submitFutures.remove(srsp);
}
- };
+ });
+ }
- // not clear how errors emanating from requestAsync or the whenComplete()
callback
- // are to propagated out of the runnable?
- attemptStart.incrementAndGet();
- CompletableFuture.runAsync(executeRequestRunnable, commExecutor);
+ @Override
+ public void cancelAll() {
+ super.cancelAll();
+ submitFutures
+ .values()
+ .forEach(
+ future -> {
+ if (!future.isDone()) {
+ future.cancel(true);
+ }
+ });
+ submitFutures.clear();
}
}
diff --git
a/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
b/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
index 0b34e6ffdef..91abd23447e 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
@@ -633,7 +633,6 @@ public class SearchHandler extends RequestHandlerBase
if (srsp.getException() != null) {
// If things are not tolerant, abort everything and rethrow
if (!tolerant) {
- shardHandler1.cancelAll();
throwSolrException(srsp.getException());
} else {
// Check if the purpose includes 'PURPOSE_GET_TOP_IDS'
diff --git
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
index e119598559c..646a97d6e18 100644
---
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
+++
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
@@ -243,9 +243,14 @@ public class LBHttp2SolrClient extends LBSolrClient {
}
MDC.put("LBSolrClient.url", url.toString());
if (!apiFuture.isCancelled()) {
- CompletableFuture<NamedList<Object>> future =
- doAsyncRequest(url, req, rsp, isNonRetryable,
it.isServingZombieServer(), this);
- currentFuture.set(future);
+ try {
+ CompletableFuture<NamedList<Object>> future =
+ doAsyncRequest(
+ url, req, rsp, isNonRetryable,
it.isServingZombieServer(), this);
+ currentFuture.set(future);
+ } catch (Throwable ex) {
+ apiFuture.completeExceptionally(ex);
+ }
}
} else {
apiFuture.completeExceptionally(e);
@@ -366,7 +371,7 @@ public class LBHttp2SolrClient extends LBSolrClient {
} else {
listener.onFailure(e, false);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
listener.onFailure(new SolrServerException(e), false);
}
}