This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new c72562aeb32 SOLR-17294: ConcurrentUpdateSolrClient no longer detects
"false-positive" stalls (#2461)
c72562aeb32 is described below
commit c72562aeb326b6c9b7da6d4a85b4a5708af44e5a
Author: Mark Robert Miller <[email protected]>
AuthorDate: Mon Apr 21 22:28:20 2025 -0500
SOLR-17294: ConcurrentUpdateSolrClient no longer detects "false-positive"
stalls (#2461)
---
solr/CHANGES.txt | 2 +
.../impl/ConcurrentUpdateHttp2SolrClient.java | 97 +----
.../solrj/impl/ConcurrentUpdateSolrClient.java | 103 ++---
.../solr/client/solrj/impl/StallDetection.java | 160 ++++++++
.../apache/solr/client/solrj/impl/TimeSource.java | 35 ++
.../solr/client/solrj/impl/FakeTimeSource.java | 65 ++++
.../solr/client/solrj/impl/StallDetectionTest.java | 420 +++++++++++++++++++++
7 files changed, 727 insertions(+), 155 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 6980b8678be..6ece23c3e52 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -265,6 +265,8 @@ Bug Fixes
* SOLR-17740: When the V2 API is receiving raw files, it could sometimes skip
the first byte. (David Smiley)
+* SOLR-17294: ConcurrentUpdateSolrClient no longer detects "false-positive"
stalls. (Mark Miller, Jason Gerlowski)
+
Dependency Upgrades
---------------------
* SOLR-17471: Upgrade Lucene to 9.12.1. (Pierre Salagnac, Christine Poerschke)
diff --git
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
index dcbaff0a832..5603fac07e9 100644
---
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
+++
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
@@ -64,11 +64,12 @@ public class ConcurrentUpdateHttp2SolrClient extends
SolrClient {
private boolean shutdownClient;
private boolean shutdownExecutor;
private long pollQueueTimeMillis;
- private long stallTimeMillis;
private final boolean streamDeletes;
private volatile boolean closed;
private volatile CountDownLatch lock = null; // used to block everything
+ protected StallDetection stallDetection;
+
private static class CustomBlockingQueue<E> implements Iterable<E> {
private final BlockingQueue<E> queue;
private final Semaphore available;
@@ -150,15 +151,19 @@ public class ConcurrentUpdateHttp2SolrClient extends
SolrClient {
this.basePath = builder.baseSolrUrl;
this.defaultCollection = builder.defaultCollection;
this.pollQueueTimeMillis = builder.pollQueueTimeMillis;
- this.stallTimeMillis = Integer.getInteger("solr.cloud.client.stallTime",
15000);
+
+ // Initialize stall detection
+ long stallTimeMillis = Integer.getInteger("solr.cloud.client.stallTime",
15000);
// make sure the stall time is larger than the polling time
// to give a chance for the queue to change
long minimalStallTimeMillis = pollQueueTimeMillis * 2;
- if (minimalStallTimeMillis > this.stallTimeMillis) {
- this.stallTimeMillis = minimalStallTimeMillis;
+ if (minimalStallTimeMillis > stallTimeMillis) {
+ stallTimeMillis = minimalStallTimeMillis;
}
+ this.stallDetection = new StallDetection(stallTimeMillis, queue::size);
+
if (builder.executorService != null) {
this.scheduler = builder.executorService;
this.shutdownExecutor = false;
@@ -168,6 +173,8 @@ public class ConcurrentUpdateHttp2SolrClient extends
SolrClient {
new SolrNamedThreadFactory("concurrentUpdateScheduler"));
this.shutdownExecutor = true;
}
+
+ // processedCount is now managed by StallDetection
}
/** Opens a connection and sends everything... */
@@ -289,6 +296,7 @@ public class ConcurrentUpdateHttp2SolrClient extends
SolrClient {
} else {
onSuccess(response, rspBody);
}
+ stallDetection.incrementProcessedCount();
} finally {
try {
@@ -402,8 +410,6 @@ public class ConcurrentUpdateHttp2SolrClient extends
SolrClient {
Update update = new Update(req, effectiveCollection);
boolean success = queue.offer(update);
- long lastStallTime = -1;
- int lastQueueSize = -1;
for (; ; ) {
synchronized (runners) {
// see if queue is half full, and we can add more runners
@@ -438,28 +444,7 @@ public class ConcurrentUpdateHttp2SolrClient extends
SolrClient {
}
if (!success) {
// stall prevention
- int currentQueueSize = queue.size();
- if (currentQueueSize != lastQueueSize) {
- // there's still some progress in processing the queue - not
stalled
- lastQueueSize = currentQueueSize;
- lastStallTime = -1;
- } else {
- if (lastStallTime == -1) {
- // mark a stall but keep trying
- lastStallTime = System.nanoTime();
- } else {
- long currentStallTime =
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
lastStallTime);
- if (currentStallTime > stallTimeMillis) {
- throw new IOException(
- "Request processing has stalled for "
- + currentStallTime
- + "ms with "
- + queue.size()
- + " remaining elements in the queue.");
- }
- }
- }
+ stallDetection.stallCheck();
}
}
} catch (InterruptedException e) {
@@ -480,9 +465,6 @@ public class ConcurrentUpdateHttp2SolrClient extends
SolrClient {
waitForEmptyQueue();
interruptRunnerThreadsPolling();
- long lastStallTime = -1;
- int lastQueueSize = -1;
-
synchronized (runners) {
// NOTE: if the executor is shut down, runners may never become empty.
A scheduled task may
@@ -498,29 +480,11 @@ public class ConcurrentUpdateHttp2SolrClient extends
SolrClient {
// Need to check if the queue is empty before really considering
this is finished
// (SOLR-4260)
int queueSize = queue.size();
- // stall prevention
- if (lastQueueSize != queueSize) {
- // init, or no stall
- lastQueueSize = queueSize;
- lastStallTime = -1;
- } else {
- if (lastStallTime == -1) {
- lastStallTime = System.nanoTime();
- } else {
- long currentStallTime =
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
lastStallTime);
- if (currentStallTime > stallTimeMillis) {
- throw new IOException(
- "Task queue processing has stalled for "
- + currentStallTime
- + " ms with "
- + queueSize
- + " remaining elements to process.");
- // Thread.currentThread().interrupt();
- // break;
- }
- }
+ // stall prevention - only if queue is not empty
+ if (queueSize > 0) {
+ stallDetection.stallCheck();
}
+
if (queueSize > 0 && runners.isEmpty()) {
// TODO: can this still happen?
log.warn(
@@ -558,8 +522,6 @@ public class ConcurrentUpdateHttp2SolrClient extends
SolrClient {
private void waitForEmptyQueue() throws IOException {
boolean threadInterrupted = Thread.currentThread().isInterrupted();
- long lastStallTime = -1;
- int lastQueueSize = -1;
while (!queue.isEmpty()) {
if (ExecutorUtil.isTerminated(scheduler)) {
log.warn(
@@ -592,28 +554,9 @@ public class ConcurrentUpdateHttp2SolrClient extends
SolrClient {
queue.size());
}
}
- int currentQueueSize = queue.size();
- // stall prevention
- if (currentQueueSize != lastQueueSize) {
- lastQueueSize = currentQueueSize;
- lastStallTime = -1;
- } else {
- lastQueueSize = currentQueueSize;
- if (lastStallTime == -1) {
- lastStallTime = System.nanoTime();
- } else {
- long currentStallTime =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
- if (currentStallTime > stallTimeMillis) {
- throw new IOException(
- "Task queue processing has stalled for "
- + currentStallTime
- + " ms with "
- + currentQueueSize
- + " remaining elements to process.");
- // threadInterrupted = true;
- // break;
- }
- }
+ // Only check for stalls if the queue is not empty
+ if (!queue.isEmpty()) {
+ stallDetection.stallCheck();
}
}
if (threadInterrupted) {
diff --git
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
index e4ec034982b..1eff51e4a5c 100644
---
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
+++
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
@@ -83,7 +83,6 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
final int threadCount;
boolean shutdownExecutor = false;
int pollQueueTimeMillis = 250;
- int stallTimeMillis;
private final boolean streamDeletes;
private boolean internalHttpClient;
private final int connectionTimeout;
@@ -95,6 +94,8 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
AtomicInteger blockLoops;
AtomicInteger emptyQueueLoops;
+ protected StallDetection stallDetection;
+
/**
* Use builder to construct this class. Uses the supplied HttpClient to send
documents to the Solr
* server.
@@ -121,16 +122,20 @@ public class ConcurrentUpdateSolrClient extends
SolrClient {
this.connectionTimeout = builder.connectionTimeoutMillis;
this.soTimeout = builder.socketTimeoutMillis;
this.pollQueueTimeMillis = builder.pollQueueTime;
- this.stallTimeMillis = Integer.getInteger("solr.cloud.client.stallTime",
15000);
this.defaultCollection = builder.defaultCollection;
+ // Initialize stall detection
+ int stallTimeMillis = Integer.getInteger("solr.cloud.client.stallTime",
15000);
+
// make sure the stall time is larger than the polling time
// to give a chance for the queue to change
int minimalStallTime = pollQueueTimeMillis * 2;
- if (minimalStallTime > this.stallTimeMillis) {
- this.stallTimeMillis = minimalStallTime;
+ if (minimalStallTime > stallTimeMillis) {
+ stallTimeMillis = minimalStallTime;
}
+ this.stallDetection = new StallDetection(stallTimeMillis, () ->
queue.size());
+
if (builder.executorService != null) {
this.scheduler = builder.executorService;
this.shutdownExecutor = false;
@@ -147,6 +152,8 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
this.blockLoops = new AtomicInteger();
this.emptyQueueLoops = new AtomicInteger();
}
+
+ // processedCount is now managed by StallDetection
}
public Set<String> getUrlParamNames() {
@@ -397,6 +404,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
} else {
onSuccess(response);
}
+ stallDetection.incrementProcessedCount();
} finally {
try {
@@ -527,8 +535,6 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
Update update = new Update(req, collection);
boolean success = queue.offer(update);
- long lastStallTime = -1;
- int lastQueueSize = -1;
for (; ; ) {
synchronized (runners) {
// see if queue is half full, and we can add more runners
@@ -562,29 +568,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient
{
success = queue.offer(update, 100, TimeUnit.MILLISECONDS);
}
if (!success) {
- // stall prevention
- int currentQueueSize = queue.size();
- if (currentQueueSize != lastQueueSize) {
- // there's still some progress in processing the queue - not
stalled
- lastQueueSize = currentQueueSize;
- lastStallTime = -1;
- } else {
- if (lastStallTime == -1) {
- // mark a stall but keep trying
- lastStallTime = System.nanoTime();
- } else {
- long currentStallTime =
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
lastStallTime);
- if (currentStallTime > stallTimeMillis) {
- throw new IOException(
- "Request processing has stalled for "
- + currentStallTime
- + "ms with "
- + queue.size()
- + " remaining elements in the queue.");
- }
- }
- }
+ stallCheck();
}
}
} catch (InterruptedException e) {
@@ -598,6 +582,10 @@ public class ConcurrentUpdateSolrClient extends SolrClient
{
return dummy;
}
+ void stallCheck() throws IOException {
+ stallDetection.stallCheck();
+ }
+
public synchronized void blockUntilFinished() throws IOException {
lock = new CountDownLatch(1);
try {
@@ -605,9 +593,6 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
waitForEmptyQueue();
interruptRunnerThreadsPolling();
- long lastStallTime = -1;
- int lastQueueSize = -1;
-
synchronized (runners) {
// NOTE: if the executor is shut down, runners may never become empty.
A scheduled task may
@@ -625,29 +610,11 @@ public class ConcurrentUpdateSolrClient extends
SolrClient {
// Need to check if the queue is empty before really considering
this is finished
// (SOLR-4260)
int queueSize = queue.size();
- // stall prevention
- if (lastQueueSize != queueSize) {
- // init, or no stall
- lastQueueSize = queueSize;
- lastStallTime = -1;
- } else {
- if (lastStallTime == -1) {
- lastStallTime = System.nanoTime();
- } else {
- long currentStallTime =
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
lastStallTime);
- if (currentStallTime > stallTimeMillis) {
- throw new IOException(
- "Task queue processing has stalled for "
- + currentStallTime
- + " ms with "
- + queueSize
- + " remaining elements to process.");
- // Thread.currentThread().interrupt();
- // break;
- }
- }
+ // stall prevention - only if queue is not empty
+ if (queueSize > 0) {
+ stallCheck();
}
+
if (queueSize > 0 && runners.isEmpty()) {
// TODO: can this still happen?
log.warn(
@@ -685,8 +652,6 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
private void waitForEmptyQueue() throws IOException {
boolean threadInterrupted = Thread.currentThread().isInterrupted();
- long lastStallTime = -1;
- int lastQueueSize = -1;
while (!queue.isEmpty()) {
if (log.isDebugEnabled()) emptyQueueLoops.incrementAndGet();
if (ExecutorUtil.isTerminated(scheduler)) {
@@ -720,28 +685,10 @@ public class ConcurrentUpdateSolrClient extends
SolrClient {
queue.size());
}
}
- int currentQueueSize = queue.size();
- // stall prevention
- if (currentQueueSize != lastQueueSize) {
- lastQueueSize = currentQueueSize;
- lastStallTime = -1;
- } else {
- lastQueueSize = currentQueueSize;
- if (lastStallTime == -1) {
- lastStallTime = System.nanoTime();
- } else {
- long currentStallTime =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
- if (currentStallTime > stallTimeMillis) {
- throw new IOException(
- "Task queue processing has stalled for "
- + currentStallTime
- + " ms with "
- + currentQueueSize
- + " remaining elements to process.");
- // threadInterrupted = true;
- // break;
- }
- }
+
+ // Only check for stalls if the queue is not empty
+ if (!queue.isEmpty()) {
+ stallCheck();
}
}
if (threadInterrupted) {
diff --git
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StallDetection.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StallDetection.java
new file mode 100644
index 00000000000..16c7b641a0a
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StallDetection.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.IntSupplier;
+
+/**
+ * Utility class for detecting stalls in request processing.
+ *
+ * <p>This class is used by {@link ConcurrentUpdateHttp2SolrClient} and {@link
+ * ConcurrentUpdateSolrClient} to detect when request processing has stalled,
which can happen if
+ * the server is unresponsive or if there's a problem with the connection.
+ */
+public class StallDetection {
+ private final LongAdder processedCount;
+ private volatile long lastProcessedCount;
+ private final AtomicLong startStallTime = new AtomicLong(-1);
+ private final long stallTimeMillis;
+ private final IntSupplier queueSizeSupplier;
+ private final TimeSource timeSource;
+
+ /**
+ * Creates a new StallDetection instance.
+ *
+ * @param stallTimeMillis The time in milliseconds after which to consider
processing stalled
+ * @param queueSizeSupplier A supplier that returns the current queue size
+ */
+ public StallDetection(long stallTimeMillis, IntSupplier queueSizeSupplier) {
+ this(stallTimeMillis, queueSizeSupplier, TimeSource.SYSTEM);
+ }
+
+ /**
+ * Creates a new StallDetection instance with a custom time source.
+ *
+ * @param stallTimeMillis The time in milliseconds after which to consider
processing stalled
+ * @param queueSizeSupplier A supplier that returns the current queue size
+ * @param timeSource The time source to use for time measurements
+ */
+ public StallDetection(
+ long stallTimeMillis, IntSupplier queueSizeSupplier, TimeSource
timeSource) {
+ if (stallTimeMillis < 0) {
+ throw new IllegalArgumentException("stallTimeMillis must be
non-negative");
+ }
+ this.stallTimeMillis = stallTimeMillis;
+ this.queueSizeSupplier = queueSizeSupplier;
+ this.processedCount = new LongAdder();
+ this.lastProcessedCount = 0;
+ this.timeSource = timeSource;
+ }
+
+ /**
+ * Increments the processed count.
+ *
+ * <p>This should be called whenever a request is successfully processed.
+ */
+ public void incrementProcessedCount() {
+ processedCount.increment();
+ }
+
+ /**
+ * Checks if request processing has stalled.
+ *
+ * <p>This method should be called periodically to check if request
processing has stalled. If the
+ * queue is not empty and the processed count hasn't changed since the last
check, a timer is
+ * started. If the timer exceeds the stall time, an IOException is thrown.
+ *
+ * <p>This method will never throw an exception if the queue is empty.
+ *
+ * @throws IOException if request processing has stalled
+ */
+ public void stallCheck() throws IOException {
+ int currentQueueSize = queueSizeSupplier.getAsInt();
+ if (currentQueueSize == 0) {
+ // If the queue is empty, we're not stalled
+ startStallTime.set(-1);
+ return;
+ }
+
+ long processed = processedCount.sum();
+ if (processed > lastProcessedCount) {
+ // there's still some progress in processing the queue - not stalled
+ lastProcessedCount = processed;
+ startStallTime.set(-1); // Reset timer when we see progress
+ } else {
+ long currentStartStallTime = startStallTime.get();
+ long currentTime = timeSource.nanoTime();
+
+ // Start the timer if it hasn't been started yet
+ if (currentStartStallTime == -1) {
+ startStallTime.set(currentTime);
+ return; // First detection, give it time before throwing exception
+ }
+
+ // Calculate elapsed time since stall was first detected
+ long timeElapsed = TimeUnit.NANOSECONDS.toMillis(currentTime -
currentStartStallTime);
+
+ if (timeElapsed > stallTimeMillis) {
+ // Double-check that the queue is still not empty before throwing
+ int latestQueueSize = queueSizeSupplier.getAsInt();
+ if (latestQueueSize > 0) {
+ throw new IOException(
+ "Request processing has stalled for "
+ + timeElapsed
+ + "ms with "
+ + latestQueueSize
+ + " remaining elements in the queue.");
+ } else {
+ // Queue is now empty, reset the timer
+ startStallTime.set(-1);
+ }
+ }
+ }
+ }
+
+ /**
+ * Gets the current processed count.
+ *
+ * @return the current processed count
+ */
+ public long getProcessedCount() {
+ return processedCount.sum();
+ }
+
+ /**
+ * Gets the stall time in milliseconds.
+ *
+ * @return the stall time in milliseconds
+ */
+ public long getStallTimeMillis() {
+ return stallTimeMillis;
+ }
+
+ /**
+ * Resets the stall timer.
+ *
+ * <p>This can be useful if you know that processing hasn't actually stalled
even though the
+ * processed count hasn't changed.
+ */
+ public void resetStallTimer() {
+ startStallTime.set(-1);
+ }
+}
diff --git
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/TimeSource.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/TimeSource.java
new file mode 100644
index 00000000000..82fcc96952c
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/TimeSource.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.impl;
+
+/**
+ * Interface to abstract time measurements for easier testing. This allows
tests to control time
+ * advancement rather than relying on actual wall clock time.
+ */
+public interface TimeSource {
+
+ /**
+ * Returns the current value of the running Java Virtual Machine's
high-resolution time source, in
+ * nanoseconds.
+ *
+ * @return the current value of the running Java Virtual Machine's
high-resolution time source.
+ */
+ long nanoTime();
+
+ /** Default implementation that uses System.nanoTime(). */
+ TimeSource SYSTEM = System::nanoTime;
+}
diff --git
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/FakeTimeSource.java
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/FakeTimeSource.java
new file mode 100644
index 00000000000..ee3e9de1487
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/FakeTimeSource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.impl;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A fake time source for testing that allows precise control over the passage
of time. Instead of
+ * relying on real system time and Thread.sleep(), tests can use this to
advance time manually and
+ * test time-dependent logic deterministically.
+ */
+public class FakeTimeSource implements TimeSource {
+
+ private long nanoTime = 0;
+
+ @Override
+ public long nanoTime() {
+ return nanoTime;
+ }
+
+ /**
+ * Advances the fake time by the specified number of nanoseconds.
+ *
+ * @param nanos The number of nanoseconds to advance the clock
+ */
+ public void advanceNanos(long nanos) {
+ if (nanos < 0) {
+ throw new IllegalArgumentException("Cannot advance time backwards");
+ }
+ nanoTime += nanos;
+ }
+
+ /**
+ * Advances the fake time by the specified time amount.
+ *
+ * @param time The amount of time to advance
+ * @param unit The time unit
+ */
+ public void advance(long time, TimeUnit unit) {
+ advanceNanos(unit.toNanos(time));
+ }
+
+ /**
+ * Advances the fake time by the specified number of milliseconds.
+ *
+ * @param millis The number of milliseconds to advance the clock
+ */
+ public void advanceMillis(long millis) {
+ advance(millis, TimeUnit.MILLISECONDS);
+ }
+}
diff --git
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/StallDetectionTest.java
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/StallDetectionTest.java
new file mode 100644
index 00000000000..17c4e75ff4e
--- /dev/null
+++
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/StallDetectionTest.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntSupplier;
+import org.apache.solr.SolrTestCase;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StallDetectionTest extends SolrTestCase {
+
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Test
+ public void testNoStallWithEmptyQueue() throws IOException {
+
+ long stallTimeMillis = 100;
+ FakeTimeSource timeSource = new FakeTimeSource();
+ StallDetection stallDetection = new StallDetection(stallTimeMillis, () ->
0, timeSource);
+
+ // This should not throw an exception because the queue is empty
+ stallDetection.stallCheck();
+
+ // Advance time past the stall time
+ timeSource.advanceMillis(stallTimeMillis + 10);
+
+ // This should still not throw an exception because the queue is empty
+ stallDetection.stallCheck();
+ }
+
+ @Test
+ public void testNoStallWithProgressingQueue() throws IOException {
+
+ long stallTimeMillis = 100;
+ FakeTimeSource timeSource = new FakeTimeSource();
+ StallDetection stallDetection = new StallDetection(stallTimeMillis, () ->
1, timeSource);
+
+ // Call stallCheck once to initialize the timer
+ stallDetection.stallCheck();
+
+ // Advance time but not enough to trigger a stall
+ timeSource.advanceMillis(stallTimeMillis / 2);
+
+ // Increment the processed count to simulate progress
+ stallDetection.incrementProcessedCount();
+
+ // This should not throw an exception because progress was made
+ stallDetection.stallCheck();
+
+ // Advance time past the stall time
+ timeSource.advanceMillis(stallTimeMillis + 10);
+
+ // This still should not throw because the timer was reset during the last
check
+ stallDetection.stallCheck();
+ }
+
+ @Test
+ public void testStallDetection() throws IOException {
+
+ long stallTimeMillis = 5;
+
+ FakeTimeSource timeSource = new FakeTimeSource();
+
+ StallDetection stallDetection = new StallDetection(stallTimeMillis, () ->
1, timeSource);
+
+ // Call stallCheck once to initialize the timer
+ stallDetection.stallCheck();
+
+ // Advance time past the stall time
+ timeSource.advanceMillis(stallTimeMillis + 1);
+
+ // This should throw an IOException
+ IOException exception = expectThrows(IOException.class,
stallDetection::stallCheck);
+ assertTrue(
+ "Exception message should mention stall time",
+ exception.getMessage().contains("Request processing has stalled for"));
+ assertTrue(
+ "Exception message should mention queue size",
+ exception.getMessage().contains("remaining elements in the queue"));
+ }
+
+ @Test
+ public void testResetStallTimer() throws IOException {
+
+ long stallTimeMillis = 15;
+
+ FakeTimeSource timeSource = new FakeTimeSource();
+
+ StallDetection stallDetection = new StallDetection(stallTimeMillis, () ->
1, timeSource);
+
+ // Call stallCheck once to initialize the timer
+ stallDetection.stallCheck();
+
+ // Advance time, but not enough to trigger a stall
+ timeSource.advanceMillis(5);
+
+ // Reset the timer
+ stallDetection.resetStallTimer();
+
+ // Call stallCheck again to reinitialize the timer
+ stallDetection.stallCheck();
+
+ // Advance time, but not enough to trigger a stall
+ timeSource.advanceMillis(5);
+
+ // This should not throw an exception because we reset the timer
+ stallDetection.stallCheck();
+
+ // Advance time past the stall time
+ timeSource.advanceMillis(stallTimeMillis + 1);
+
+ // Now it should throw an exception
+ expectThrows(IOException.class, stallDetection::stallCheck);
+ }
+
+ @Test
+ public void testDynamicQueueSize() throws IOException {
+
+ long stallTimeMillis = 15;
+ AtomicInteger queueSize = new AtomicInteger(1);
+ FakeTimeSource timeSource = new FakeTimeSource();
+ StallDetection stallDetection = new StallDetection(stallTimeMillis,
queueSize::get, timeSource);
+
+ // Call stallCheck once to initialize the timer
+ stallDetection.stallCheck();
+
+ // Advance time but not enough to trigger a stall
+ timeSource.advanceMillis(stallTimeMillis / 2);
+
+ queueSize.set(0);
+
+ // This should not throw an exception and should reset the timer
+ stallDetection.stallCheck();
+
+ // Advance time past the stall time
+ timeSource.advanceMillis(stallTimeMillis * 2);
+
+ queueSize.set(1);
+
+ // This should not throw an exception because the timer was reset when
queue was empty
+ stallDetection.stallCheck();
+
+ // Advance time past the stall time
+ timeSource.advanceMillis(stallTimeMillis + 5);
+
+ // Now it should throw an exception because queue is non-empty and no
progress was made
+ expectThrows(IOException.class, stallDetection::stallCheck);
+ }
+
+ @Test
+ public void testGetProcessedCount() {
+ FakeTimeSource timeSource = new FakeTimeSource();
+ StallDetection stallDetection = new StallDetection(100, () -> 1,
timeSource);
+
+ // Initially the processed count should be 0
+ assertEquals(0, stallDetection.getProcessedCount());
+
+ // Increment the processed count
+ stallDetection.incrementProcessedCount();
+
+ // Now the processed count should be 1
+ assertEquals(1, stallDetection.getProcessedCount());
+
+ // Increment multiple times
+ for (int i = 0; i < 5; i++) {
+ stallDetection.incrementProcessedCount();
+ }
+
+ // Now the processed count should be 6
+ assertEquals(6, stallDetection.getProcessedCount());
+ }
+
+ @Test
+ public void testGetStallTimeMillis() {
+ long stallTimeMillis = 100;
+ FakeTimeSource timeSource = new FakeTimeSource();
+ StallDetection stallDetection = new StallDetection(stallTimeMillis, () ->
1, timeSource);
+
+ // The stall time should be what we set
+ assertEquals(stallTimeMillis, stallDetection.getStallTimeMillis());
+
+ // Test with a different value
+ long differentStallTime = 500;
+ StallDetection anotherStallDetection =
+ new StallDetection(differentStallTime, () -> 1, timeSource);
+ assertEquals(differentStallTime,
anotherStallDetection.getStallTimeMillis());
+ }
+
+ @Test
+ public void testQueueEmptiesDuringStallCheck() throws IOException {
+
+ long stallTimeMillis = 5;
+ AtomicInteger queueSize = new AtomicInteger(1);
+
+ FakeTimeSource timeSource = new FakeTimeSource();
+
+ StallDetection stallDetection = new StallDetection(stallTimeMillis,
queueSize::get, timeSource);
+
+ // Call stallCheck once to initialize the timer
+ stallDetection.stallCheck();
+
+ // Advance time past the stall time
+ timeSource.advanceMillis(stallTimeMillis + 1);
+
+ // Change the queue size to 0 just before the next check
+ queueSize.set(0);
+
+ // This should not throw an exception because the queue is now empty
+ stallDetection.stallCheck();
+
+ // Change the queue size back to 1
+ queueSize.set(1);
+
+ // This should not throw an exception because the timer was reset when
queue was empty
+ stallDetection.stallCheck();
+
+ // Advance time past the stall time again
+ timeSource.advanceMillis(stallTimeMillis + 1);
+
+ // Now it should throw an exception
+ expectThrows(IOException.class, stallDetection::stallCheck);
+ }
+
+ @Test
+ public void testProgressResetsDuringStallCheck() throws IOException {
+
+ long stallTimeMillis = 200;
+
+ FakeTimeSource timeSource = new FakeTimeSource();
+
+ final IntSupplier fixedQueueSupplier = () -> 1;
+ StallDetection stallDetection =
+ new StallDetection(stallTimeMillis, fixedQueueSupplier, timeSource);
+
+ // First call - initialize timer
+ stallDetection.stallCheck();
+
+ // Advance time past the stall time
+ timeSource.advanceMillis(stallTimeMillis + 50);
+
+ // Make first progress
+ stallDetection.incrementProcessedCount();
+
+ // Second call - should not throw due to progress
+ stallDetection.stallCheck();
+
+ // Advance time past the stall time again
+ timeSource.advanceMillis(stallTimeMillis + 50);
+
+ // Third call - should NOT throw because timer was reset on second call
+ try {
+ stallDetection.stallCheck();
+ } catch (IOException e) {
+ fail("Unexpected exception thrown: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testConcurrentAccess() throws Exception {
+
+ final long stallTimeMillis = 1000;
+ final FakeTimeSource timeSource = new FakeTimeSource();
+ final AtomicInteger queueSize = new AtomicInteger(10);
+ final StallDetection stallDetection =
+ new StallDetection(stallTimeMillis, queueSize::get, timeSource);
+
+ // Initialize the timer
+ stallDetection.stallCheck();
+
+ final int threadCount = 10;
+
+ final CyclicBarrier barrier = new CyclicBarrier(threadCount);
+ final CountDownLatch allDone = new CountDownLatch(threadCount);
+ final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
+
+ ExecutorService executor =
+ ExecutorUtil.newMDCAwareFixedThreadPool(
+ threadCount, new SolrNamedThreadFactory("StallDetectionTest"));
+
+ try {
+ for (int i = 0; i < threadCount; i++) {
+ final int threadId = i;
+ executor.submit(
+ () -> {
+ try {
+ // Wait for all threads to be ready
+ barrier.await();
+
+ // Half the threads increment the progress counter
+ // The other half call stallCheck
+ if (threadId % 2 == 0) {
+ // Increment progress
+ for (int j = 0; j < 100; j++) {
+ stallDetection.incrementProcessedCount();
+ Thread.yield(); // Hint to allow other threads to run
+ }
+ } else {
+ // Check for stalls
+ for (int j = 0; j < 100; j++) {
+ try {
+ stallDetection.stallCheck();
+ // Advance time a bit between checks, but not enough to
trigger a stall
+ synchronized (timeSource) {
+ timeSource.advanceMillis(5);
+ }
+ Thread.yield(); // Hint to allow other threads to run
+ } catch (IOException e) {
+ exceptionThrown.set(true);
+ fail("Unexpected IOException: " + e.getMessage());
+ }
+ }
+ }
+ } catch (Exception e) {
+ exceptionThrown.set(true);
+ log.error("Exception in thread {}", threadId, e);
+ } finally {
+ allDone.countDown();
+ }
+ });
+ }
+
+ // Wait for all threads to complete
+ assertTrue("Timed out waiting for threads to complete",
allDone.await(30, TimeUnit.SECONDS));
+
+ // Verify no exceptions were thrown
+ assertFalse("Exception was thrown during concurrent operation",
exceptionThrown.get());
+
+ // Verify the final processed count is correct
+ assertEquals(
+ "Processed count should equal number of incrementing threads × 100",
+ (threadCount / 2) * 100,
+ stallDetection.getProcessedCount());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testZeroStallTime() {
+ // A zero stall time should be allowed but would cause quick stall
detection
+ FakeTimeSource timeSource = new FakeTimeSource();
+ StallDetection stallDetection = new StallDetection(0, () -> 1, timeSource);
+ assertEquals(0, stallDetection.getStallTimeMillis());
+
+ try {
+ // Call stallCheck to initialize timer
+ stallDetection.stallCheck();
+
+ // Any advancement of time should trigger a stall with zero stall time
+ timeSource.advanceMillis(1);
+
+ // This should throw an exception immediately
+ expectThrows(IOException.class, stallDetection::stallCheck);
+ } catch (IOException e) {
+ fail("First call to stallCheck should not throw: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testNegativeStallTime() {
+ // Negative stall time should not be allowed
+ expectThrows(IllegalArgumentException.class, () -> new StallDetection(-1,
() -> 1));
+ }
+
+ @Test
+ public void testVeryLargeStallTime() throws IOException {
+ // Test with a large stall time value (close to Long.MAX_VALUE)
+ long veryLargeStallTime = Long.MAX_VALUE / 2;
+ FakeTimeSource timeSource = new FakeTimeSource();
+ StallDetection stallDetection = new StallDetection(veryLargeStallTime, ()
-> 1, timeSource);
+
+ // First call to initialize timer
+ stallDetection.stallCheck();
+
+ // Advance time, but not enough to trigger a stall
+ timeSource.advanceMillis(Integer.MAX_VALUE);
+
+ // This should not throw an exception
+ stallDetection.stallCheck();
+
+ // Now increment to test the reset behavior also works with large values
+ stallDetection.incrementProcessedCount();
+ stallDetection.stallCheck();
+
+ // Advance time past the large stall time
+ // This would cause overflow if we're not careful
+ timeSource.advanceNanos(Long.MAX_VALUE - 100);
+
+ // Now try to advance time to very close to overflow
+ timeSource.advanceNanos(99);
+
+ // This should still not throw because we reset the timer earlier
+ stallDetection.stallCheck();
+ }
+}