Copilot commented on code in PR #4440:
URL: https://github.com/apache/solr/pull/4440#discussion_r3255870835
##########
solr/core/src/test/org/apache/solr/handler/component/ParallelHttpShardHandlerTest.java:
##########
@@ -107,4 +124,287 @@ public void testSubmitFailureIsRecordedWhenSuperThrows()
throws Exception {
recorded.getException());
assertTrue(recorded.getException() instanceof SolrException);
}
+
+ /**
+ * Verifies the contract that when the commExecutor rejects the runnable,
the failure is recorded
+ * via recordShardSubmitError (i.e., shows up in the responses queue) rather
than being propagated
+ * synchronously to the caller.
+ *
+ * <p>This exercises issue #1 from the ParallelHttpShardHandler review: with
a single-thread
+ * ThreadPoolExecutor backed by a SynchronousQueue, once the worker is busy,
the next
+ * CompletableFuture.runAsync(...) call throws RejectedExecutionException
synchronously out of
+ * makeShardRequest. The expected (post-fix) behavior is that the error is
routed through
+ * recordShardSubmitError instead.
+ */
+ @Test
+ public void testRejectedExecutorRecordsErrorInsteadOfThrowing() throws
Exception {
+ CountDownLatch holdWorker = new CountDownLatch(1);
+ CountDownLatch workerStarted = new CountDownLatch(1);
+ ThreadPoolExecutor busyExecutor =
+ new ThreadPoolExecutor(
+ 1, 1, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>()); //
default AbortPolicy
+ try {
+ // Occupy the single worker thread so the next submission has nowhere to
go.
+ busyExecutor.execute(
+ () -> {
+ workerStarted.countDown();
+ try {
+ holdWorker.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ assertTrue("worker did not start within timeout", workerStarted.await(5,
TimeUnit.SECONDS));
+
+ ParallelHttpShardHandlerFactory factory = new
ParallelHttpShardHandlerFactory();
+ factory.commExecutor = busyExecutor;
+ ParallelHttpShardHandler handler = new ParallelHttpShardHandler(factory);
+
+ ShardRequest shardRequest = buildShardRequest("shardA");
+
+ ShardResponse shardResponse = new ShardResponse();
+ shardResponse.setShardRequest(shardRequest);
+ shardResponse.setShard("shardA");
+
+ HttpShardHandler.SimpleSolrResponse simpleResponse =
+ new HttpShardHandler.SimpleSolrResponse();
+ shardResponse.setSolrResponse(simpleResponse);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ QueryRequest queryRequest = new QueryRequest(params);
+ LBSolrClient.Endpoint endpoint = new
LBSolrClient.Endpoint("http://ignored:8983/solr");
+ LBSolrClient.Req lbReq = new LBSolrClient.Req(queryRequest,
List.of(endpoint));
+
+ // The desired contract: rejection is captured and surfaced through the
responses queue
+ // (i.e., this call should not throw RejectedExecutionException).
+ try {
+ handler.makeShardRequest(
+ shardRequest,
+ "shardA",
+ params,
+ lbReq,
+ simpleResponse,
+ shardResponse,
+ System.nanoTime());
+ } catch (RejectedExecutionException ree) {
+ fail(
+ "makeShardRequest should not propagate RejectedExecutionException;
the failure "
+ + "should be recorded via recordShardSubmitError. Got: "
+ + ree);
+ }
+
+ ShardResponse recorded = handler.responses.poll(2, TimeUnit.SECONDS);
+ assertNotNull(
+ "Expected the executor rejection to be recorded as a shard failure
in the responses"
+ + " queue, but no response arrived",
+ recorded);
+ assertSame(
+ "The recorded shard response should be the same instance passed in",
+ shardResponse,
+ recorded);
+ assertNotNull(
+ "Expected an exception to be attached to the recorded shard
response",
+ recorded.getException());
+ } finally {
+ holdWorker.countDown();
+ busyExecutor.shutdownNow();
+ busyExecutor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
+
+ private ShardRequest buildShardRequest(String shard) {
+ ShardRequest sreq = new ShardRequest();
+ sreq.params = new ModifiableSolrParams();
+ sreq.actualShards = new String[] {shard};
+ return sreq;
+ }
+
+ /**
+ * Runs handler.takeCompletedIncludingErrors() on a worker thread with a
timeout. If take() does
+ * not return within timeoutMs, fails the test with a clear message naming
the iteration and phase
+ * — this is the signal for the lost-wakeup bug.
+ */
+ private ShardResponse runTakeWithTimeout(
+ ParallelHttpShardHandler handler,
+ ExecutorService takeExecutor,
+ int iteration,
+ String phaseLabel,
+ long timeoutMs)
+ throws Exception {
+ Future<ShardResponse> future =
takeExecutor.submit(handler::takeCompletedIncludingErrors);
+ try {
+ return future.get(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException te) {
+ future.cancel(true);
+ fail(
+ "take() hung in iteration "
+ + iteration
+ + " "
+ + phaseLabel
+ + ": did not return within "
+ + timeoutMs
+ + "ms. The worker thread is parked in LinkedBlockingQueue.take()
waiting for"
+ + " an element that will never arrive because the handler's
state transitioned"
+ + " to empty without anything being enqueued on the responses
queue.");
+ throw new AssertionError("unreachable");
Review Comment:
The `throw new AssertionError("unreachable")` is dead code: `fail(...)`
always throws AssertionError, so this line can never execute. It does silence
the compiler about a missing return path, but since the surrounding method is
declared `throws Exception`, you could simply remove this line (or replace the
whole catch with `throw new AssertionError("hang in iteration " + ...)`). Minor.
##########
solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java:
##########
@@ -96,6 +96,18 @@ public class HttpShardHandler extends ShardHandler {
protected final BlockingQueue<ShardResponse> responses;
private final AtomicBoolean canceled = new AtomicBoolean(false);
+ // Returns the monitor object that guards all cancellation-related state
transitions
+ // (the canceled flag, the responseFutureMap, the responses queue's
CANCELLATION_NOTIFICATION).
+ // Subclasses that track additional cancellable state must synchronize on
this monitor when
+ // reading or transitioning that state so the whole cancellation invariant
stays atomic.
+ protected final Object cancellationLock() {
+ return canceled;
+ }
+
+ protected final boolean isCanceled() {
+ return canceled.get();
+ }
Review Comment:
Exposing `cancellationLock()` as a `protected` method returning the internal
`canceled` `AtomicBoolean` as a bare `Object` couples subclasses to an internal
implementation detail (the identity of the `canceled` field, which is also used
for its `get()/set()` semantics elsewhere). Future refactors of how
cancellation state is stored (e.g. moving `canceled` behind another
abstraction, replacing it with a `StampedLock`, splitting it per-request) would
now silently break any subclass synchronizing on this object.
A more maintainable shape would be either (a) a dedicated final `Object
cancellationLock = new Object();` field that is the only thing exposed, with
`canceled`'s mutations also moved behind it, or (b) replacing the
`synchronized(canceled)` pattern entirely with a `ReentrantLock`/`Condition`
that lets the subclass-tracker work *with* a real condition variable (which
would also let you eliminate the new 50ms polling in `take()`).
##########
solr/core/src/test/org/apache/solr/handler/component/ParallelHttpShardHandlerTest.java:
##########
@@ -107,4 +124,287 @@ public void testSubmitFailureIsRecordedWhenSuperThrows()
throws Exception {
recorded.getException());
assertTrue(recorded.getException() instanceof SolrException);
}
+
+ /**
+ * Verifies the contract that when the commExecutor rejects the runnable,
the failure is recorded
+ * via recordShardSubmitError (i.e., shows up in the responses queue) rather
than being propagated
+ * synchronously to the caller.
+ *
+ * <p>This exercises issue #1 from the ParallelHttpShardHandler review: with
a single-thread
+ * ThreadPoolExecutor backed by a SynchronousQueue, once the worker is busy,
the next
+ * CompletableFuture.runAsync(...) call throws RejectedExecutionException
synchronously out of
+ * makeShardRequest. The expected (post-fix) behavior is that the error is
routed through
+ * recordShardSubmitError instead.
+ */
+ @Test
+ public void testRejectedExecutorRecordsErrorInsteadOfThrowing() throws
Exception {
+ CountDownLatch holdWorker = new CountDownLatch(1);
+ CountDownLatch workerStarted = new CountDownLatch(1);
+ ThreadPoolExecutor busyExecutor =
+ new ThreadPoolExecutor(
+ 1, 1, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>()); //
default AbortPolicy
+ try {
+ // Occupy the single worker thread so the next submission has nowhere to
go.
+ busyExecutor.execute(
+ () -> {
+ workerStarted.countDown();
+ try {
+ holdWorker.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ assertTrue("worker did not start within timeout", workerStarted.await(5,
TimeUnit.SECONDS));
+
+ ParallelHttpShardHandlerFactory factory = new
ParallelHttpShardHandlerFactory();
+ factory.commExecutor = busyExecutor;
+ ParallelHttpShardHandler handler = new ParallelHttpShardHandler(factory);
+
+ ShardRequest shardRequest = buildShardRequest("shardA");
+
+ ShardResponse shardResponse = new ShardResponse();
+ shardResponse.setShardRequest(shardRequest);
+ shardResponse.setShard("shardA");
+
+ HttpShardHandler.SimpleSolrResponse simpleResponse =
+ new HttpShardHandler.SimpleSolrResponse();
+ shardResponse.setSolrResponse(simpleResponse);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ QueryRequest queryRequest = new QueryRequest(params);
+ LBSolrClient.Endpoint endpoint = new
LBSolrClient.Endpoint("http://ignored:8983/solr");
+ LBSolrClient.Req lbReq = new LBSolrClient.Req(queryRequest,
List.of(endpoint));
+
+ // The desired contract: rejection is captured and surfaced through the
responses queue
+ // (i.e., this call should not throw RejectedExecutionException).
+ try {
+ handler.makeShardRequest(
+ shardRequest,
+ "shardA",
+ params,
+ lbReq,
+ simpleResponse,
+ shardResponse,
+ System.nanoTime());
+ } catch (RejectedExecutionException ree) {
+ fail(
+ "makeShardRequest should not propagate RejectedExecutionException;
the failure "
+ + "should be recorded via recordShardSubmitError. Got: "
+ + ree);
+ }
+
+ ShardResponse recorded = handler.responses.poll(2, TimeUnit.SECONDS);
+ assertNotNull(
+ "Expected the executor rejection to be recorded as a shard failure
in the responses"
+ + " queue, but no response arrived",
+ recorded);
+ assertSame(
+ "The recorded shard response should be the same instance passed in",
+ shardResponse,
+ recorded);
+ assertNotNull(
+ "Expected an exception to be attached to the recorded shard
response",
+ recorded.getException());
+ } finally {
+ holdWorker.countDown();
+ busyExecutor.shutdownNow();
+ busyExecutor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
+
+ private ShardRequest buildShardRequest(String shard) {
+ ShardRequest sreq = new ShardRequest();
+ sreq.params = new ModifiableSolrParams();
+ sreq.actualShards = new String[] {shard};
+ return sreq;
+ }
+
+ /**
+ * Runs handler.takeCompletedIncludingErrors() on a worker thread with a
timeout. If take() does
+ * not return within timeoutMs, fails the test with a clear message naming
the iteration and phase
+ * — this is the signal for the lost-wakeup bug.
+ */
+ private ShardResponse runTakeWithTimeout(
+ ParallelHttpShardHandler handler,
+ ExecutorService takeExecutor,
+ int iteration,
+ String phaseLabel,
+ long timeoutMs)
+ throws Exception {
+ Future<ShardResponse> future =
takeExecutor.submit(handler::takeCompletedIncludingErrors);
+ try {
+ return future.get(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException te) {
+ future.cancel(true);
+ fail(
+ "take() hung in iteration "
+ + iteration
+ + " "
+ + phaseLabel
+ + ": did not return within "
+ + timeoutMs
+ + "ms. The worker thread is parked in LinkedBlockingQueue.take()
waiting for"
+ + " an element that will never arrive because the handler's
state transitioned"
+ + " to empty without anything being enqueued on the responses
queue.");
+ throw new AssertionError("unreachable");
+ } catch (ExecutionException ee) {
+ throw new AssertionError(
+ "take() threw unexpectedly in iteration " + iteration + " " +
phaseLabel, ee.getCause());
+ }
+ }
+
+ /**
+ * More aggressive variant of the lost-wakeup stress test that uses
asynchronous inner-future
+ * completion on a dedicated scheduler. In production the inner future (from
{@code
+ * lbClient.requestAsync}) completes on a Jetty IO thread, not synchronously
at the registration
+ * site. That timing gap between {@code super.makeShardRequest} returning
(and the outer {@code
+ * whenComplete} firing to remove {@code submitFutures}) and the inner
{@code whenComplete} firing
+ * (to add to {@code responses}) is exactly where the observed 930-handler
hang lives. This test
+ * matches that timing.
+ */
+ @Test
+ public void testTakeDoesNotHangUnderAsyncInnerFutureCompletion() throws
Exception {
+ final int iterations = 1000;
+ final long perIterationTimeoutMs = 3_000;
+
+ ExecutorService commExecutor =
+ new ThreadPoolExecutor(
+ 0,
+ Integer.MAX_VALUE,
+ 5L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new SolrNamedThreadFactory("testCommExecutor"));
+
+ // Simulates Jetty IO threads: a small pool that completes the inner
future asynchronously
+ // some tiny amount of time after requestAsync() returns, exposing the
race window.
+ ExecutorService mockIoThreads =
+ Executors.newFixedThreadPool(2, new
SolrNamedThreadFactory("testMockIo"));
+
+ ExecutorService takeExecutor =
+ Executors.newCachedThreadPool(new
SolrNamedThreadFactory("testTakeRunner"));
+
+ try {
+ for (int i = 0; i < iterations; i++) {
+ runAsyncRaceCycle(commExecutor, mockIoThreads, takeExecutor, i,
perIterationTimeoutMs);
+ }
+ } finally {
+ takeExecutor.shutdownNow();
+ takeExecutor.awaitTermination(5, TimeUnit.SECONDS);
+ mockIoThreads.shutdownNow();
+ mockIoThreads.awaitTermination(5, TimeUnit.SECONDS);
+ commExecutor.shutdown();
+ if (!commExecutor.awaitTermination(15, TimeUnit.SECONDS)) {
+ commExecutor.shutdownNow();
+ commExecutor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
+ }
Review Comment:
This stress test runs 1000 iterations with a 3-second per-iteration timeout,
so a regression that causes hangs could keep the test running for up to ~50
minutes before failing. Combined with constructing a new handler, factory and
mock per iteration, this is heavy for the standard unit-test suite. Consider
either reducing the iteration count (the race window is wide enough that 50–100
iterations typically suffice), shortening the per-iteration timeout, or
annotating this as a `@Nightly`/`@Slow` test so it does not slow down or
destabilize routine CI runs.
##########
solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java:
##########
@@ -278,22 +290,40 @@ protected void makeShardRequest(
// on the map already having the future.
future.whenComplete(
(LBSolrClient.Rsp rsp, Throwable throwable) -> {
- if (rsp != null) {
- ssr.nl = rsp.getResponse();
- srsp.setShardAddress(rsp.getServer());
- } else if (throwable != null) {
- srsp.setException(throwable);
- if (throwable instanceof SolrException) {
- srsp.setResponseCode(((SolrException) throwable).code());
+ try {
+ if (rsp != null) {
+ ssr.nl = rsp.getResponse();
+ srsp.setShardAddress(rsp.getServer());
+ } else if (throwable != null) {
+ srsp.setException(throwable);
+ if (throwable instanceof SolrException) {
+ srsp.setResponseCode(((SolrException) throwable).code());
+ }
}
- }
- ssr.elapsedTime =
- TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNS,
TimeUnit.NANOSECONDS);
- // Synchronize on cancelled so this code and cancelAll() cannot
happen at the same time
- synchronized (canceled) {
- // We don't want to add responses after the requests have been
canceled
- if (responseFutureMap.containsKey(srsp)) {
- responses.add(HttpShardHandler.this.transformResponse(sreq,
srsp, shard));
+ ssr.elapsedTime =
+ TimeUnit.MILLISECONDS.convert(
+ System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS);
+ // Synchronize on cancelled so this code and cancelAll() cannot
happen at the same time
+ synchronized (canceled) {
+ // We don't want to add responses after the requests have been
canceled
+ if (responseFutureMap.containsKey(srsp)) {
+ responses.add(HttpShardHandler.this.transformResponse(sreq,
srsp, shard));
+ }
+ }
+ } catch (Throwable t) {
+ // If anything above throws (subclass transformResponse, malformed
Rsp, OOM in the
+ // lambda) the response would never be enqueued — but
responseFutureMap still tracks
+ // srsp, so take() would park forever. Record the failure on srsp
and queue it
+ // directly, under the same monitor as the happy path so this
stays atomic with
+ // cancelAll.
+ srsp.setException(t);
+ if (t instanceof SolrException) {
+ srsp.setResponseCode(((SolrException) t).code());
+ }
+ synchronized (canceled) {
+ if (responseFutureMap.containsKey(srsp)) {
+ responses.add(srsp);
+ }
}
}
Review Comment:
Defensive `try { … } catch (Throwable t)` around the response-processing
block will also swallow `Error`s (e.g. `OutOfMemoryError`,
`StackOverflowError`, `VirtualMachineError`) and silently turn them into a
`ShardResponse` exception. The comment even calls out OOM as a reason this
catch exists, but catching `VirtualMachineError`/`Error` is generally
considered an anti-pattern — at minimum these should be rethrown after
recording (or you should catch `Exception` instead of `Throwable`). Without
this, an OOM on a Jetty IO thread becomes a plain shard error and the JVM keeps
running in an inconsistent state.
##########
changelog/unreleased/SOLR-18244-parallel-http-shard-handler-lost-wakeup-fix.yml:
##########
@@ -0,0 +1,31 @@
+# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
+title: Fix several concurrency bugs in HttpShardHandler /
ParallelHttpShardHandler that
+ could leave search threads parked in HttpShardHandler.take() indefinitely or
crash the
+ coordinator under load. (1) ParallelHttpShardHandler.cancelAll and
makeShardRequest
+ now synchronize their submitFutures operations on the same monitor that
+ HttpShardHandler uses, so the canceled flag, responseFutureMap, the
responses queue's
+ CANCELLATION_NOTIFICATION, and submitFutures all transition atomically —
matching
+ the base class's cancellation invariant. (2) RejectedExecutionException from
+ commExecutor (queue full or executor shutting down) is now caught and routed
through
+ recordShardSubmitError as a SERVICE_UNAVAILABLE shard failure instead of
propagating
+ synchronously out of submit() and crashing SearchHandler's distributed loop —
+ preserving shards.tolerant=true semantics under thread pool saturation. (3)
The
+ Parallel runnable now reads its own outer CompletableFuture via an
AtomicReference
+ holder and short-circuits when cancelled before scheduling, avoiding a wasted
+ lbClient.requestAsync that super.makeShardRequest would just immediately
cancel.
+ (4) The inner whenComplete in HttpShardHandler.makeShardRequest now wraps
response
+ processing in a try/catch — any throwable (including from a subclass
+ transformResponse) is recorded on the ShardResponse and queued under the
canceled
+ monitor, so a thrown response handler can no longer leave responseFutureMap
populated
+ with an entry that nothing will ever enqueue a response for. (5)
HttpShardHandler.take()
+ now polls the responses queue with a 50ms timeout instead of blocking
indefinitely,
+ closing a fast-path deadlock — when a subclass async tracker like
submitFutures drains
+ AFTER the coordinator re-evaluated responsesPending()==true and entered
responses.take(),
+ no further response will ever arrive to wake it. The polling loop re-checks
+ responsesPending() periodically and exits cleanly when the tracker drains.
Review Comment:
The changelog entry is unusually long (a ~25-line wall-of-text describing
five separate fixes). Compare with surrounding entries under
`changelog/unreleased/` which are typically one short sentence. Consider
trimming the body to a short user-facing summary (e.g. "Fix several concurrency
bugs in `HttpShardHandler` / `ParallelHttpShardHandler` that could cause search
threads to hang in `take()` or return HTTP 500 instead of honoring
`shards.tolerant` under thread-pool saturation") and leaving the per-bullet
implementation notes in the JIRA / PR description.
##########
solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java:
##########
@@ -328,7 +358,15 @@ private ShardResponse take(boolean bailOnError) {
ShardResponse previousResponse = null;
try {
while (responsesPending()) {
- ShardResponse rsp = responses.take();
+ // Poll instead of take() to avoid a fast-path deadlock in subclasses
that gate
+ // responsesPending() on an async tracker (e.g.
ParallelHttpShardHandler.submitFutures):
+ // the tracker can drain AFTER we observed responsesPending()==true
but BEFORE we entered
+ // the queue wait, leaving us blocked on a wakeup that will never
arrive. Polling lets the
+ // while-condition re-evaluate periodically and exit cleanly.
+ ShardResponse rsp = responses.poll(50, TimeUnit.MILLISECONDS);
+ if (rsp == null) {
+ continue;
+ }
Review Comment:
Switching from `responses.take()` to `responses.poll(50, MILLISECONDS)`
changes the wakeup model for *every* `HttpShardHandler` user, not just
`ParallelHttpShardHandler`. The base (non-parallel) handler doesn't have the
lost-wakeup race described in the comment — its `responsesPending()` is
computed entirely from state that is mutated under the same `canceled` monitor
as `responses.add(...)`. The polling therefore adds purely defensive cost (a
periodic wakeup every 50ms on every search thread waiting on shard responses,
and up to a 50ms shutdown/cancellation tail) for cases where it isn't needed.
Consider scoping the change to the subclass that actually needs it (e.g.
make `take()` use a hookable wait method that `ParallelHttpShardHandler`
overrides to poll, while the base class keeps the blocking `take()`), or fix
the underlying lost-wakeup more directly by having `ParallelHttpShardHandler`
enqueue a sentinel into `responses` when `submitFutures` drains so the base
class's blocking `take()` is properly woken up.
##########
solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java:
##########
@@ -80,22 +87,64 @@ protected void makeShardRequest(
SimpleSolrResponse ssr,
ShardResponse srsp,
long startTimeNS) {
- CompletableFuture<Void> completableFuture =
- CompletableFuture.runAsync(
- () -> super.makeShardRequest(sreq, shard, params, lbReq, ssr,
srsp, startTimeNS),
- commExecutor);
- submitFutures.put(srsp, completableFuture);
+ // Holder so the lambda can read its own outer future. We can't capture
the variable directly
+ // (it would have to be effectively final, but we assign it from
runAsync). AtomicReference
+ // gives the lambda volatile-style visibility on the assignment that
happens after runAsync
+ // returns.
+ AtomicReference<CompletableFuture<Void>> selfRef = new AtomicReference<>();
+ CompletableFuture<Void> completableFuture;
+ try {
+ completableFuture =
+ CompletableFuture.runAsync(
+ () -> {
+ // Skip the work if THIS specific outer future was cancelled
(e.g. cancelAll cancelled
+ // it before this runnable got CPU time). Avoids a wasted
lbClient.requestAsync that
+ // super.makeShardRequest would just immediately cancel
anyway. selfRef may briefly
+ // be null if the runnable runs before the assignment below —
in that case we fall
+ // through to super, which has its own canceled-check guard.
+ CompletableFuture<Void> self = selfRef.get();
+ if (self != null && self.isCancelled()) {
+ return;
+ }
+ super.makeShardRequest(sreq, shard, params, lbReq, ssr, srsp,
startTimeNS);
+ },
+ commExecutor);
+ } catch (RejectedExecutionException ree) {
+ // Saturation or shutdown of commExecutor would otherwise propagate
synchronously,
+ // crash SearchHandler's distributed loop before cancelAll() runs,
abandon any
+ // already-submitted shard requests, and return HTTP 500 even when
shards.tolerant=true.
+ // Treat it as a shard failure so the responses queue stays consistent
and shards.tolerant
+ // semantics are honored. SERVICE_UNAVAILABLE (503) marks it as
transient.
+ recordShardSubmitError(
+ srsp,
+ new SolrException(
+ SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+ "Comm executor thread pool is full, unable to send request to
shard: " + shard,
+ ree));
+ return;
+ }
Review Comment:
`recordShardSubmitError` (called here on rejection) only enqueues the shard
response into `responses` when `!canceled.get()`. That is the right behavior in
general, but combined with the rest of this method it means that if the
executor rejects because it's shutting down as part of cancellation, the shard
failure for `srsp` is silently dropped — no entry is added to `responses`, no
entry was added to `submitFutures` (we `return` before the registration block),
and the caller has no indication that this shard ever attempted to run.
Consider documenting this explicitly, or routing the rejection through a path
that still updates `submitFutures`/`responseFutureMap` so `take()`'s
bookkeeping stays exactly accurate even under shutdown-induced rejections.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]