This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new c72562aeb32 SOLR-17294: ConcurrentUpdateSolrClient no longer detects 
"false-positive" stalls (#2461)
c72562aeb32 is described below

commit c72562aeb326b6c9b7da6d4a85b4a5708af44e5a
Author: Mark Robert Miller <[email protected]>
AuthorDate: Mon Apr 21 22:28:20 2025 -0500

    SOLR-17294: ConcurrentUpdateSolrClient no longer detects "false-positive" 
stalls (#2461)
---
 solr/CHANGES.txt                                   |   2 +
 .../impl/ConcurrentUpdateHttp2SolrClient.java      |  97 +----
 .../solrj/impl/ConcurrentUpdateSolrClient.java     | 103 ++---
 .../solr/client/solrj/impl/StallDetection.java     | 160 ++++++++
 .../apache/solr/client/solrj/impl/TimeSource.java  |  35 ++
 .../solr/client/solrj/impl/FakeTimeSource.java     |  65 ++++
 .../solr/client/solrj/impl/StallDetectionTest.java | 420 +++++++++++++++++++++
 7 files changed, 727 insertions(+), 155 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 6980b8678be..6ece23c3e52 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -265,6 +265,8 @@ Bug Fixes
 
 * SOLR-17740: When the V2 API is receiving raw files, it could sometimes skip 
the first byte. (David Smiley)
 
+* SOLR-17294: ConcurrentUpdateSolrClient no longer detects "false-positive" 
stalls. (Mark Miller, Jason Gerlowski)
+
 Dependency Upgrades
 ---------------------
 * SOLR-17471: Upgrade Lucene to 9.12.1. (Pierre Salagnac, Christine Poerschke)
diff --git 
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
 
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
index dcbaff0a832..5603fac07e9 100644
--- 
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
+++ 
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
@@ -64,11 +64,12 @@ public class ConcurrentUpdateHttp2SolrClient extends 
SolrClient {
   private boolean shutdownClient;
   private boolean shutdownExecutor;
   private long pollQueueTimeMillis;
-  private long stallTimeMillis;
   private final boolean streamDeletes;
   private volatile boolean closed;
   private volatile CountDownLatch lock = null; // used to block everything
 
+  protected StallDetection stallDetection;
+
   private static class CustomBlockingQueue<E> implements Iterable<E> {
     private final BlockingQueue<E> queue;
     private final Semaphore available;
@@ -150,15 +151,19 @@ public class ConcurrentUpdateHttp2SolrClient extends 
SolrClient {
     this.basePath = builder.baseSolrUrl;
     this.defaultCollection = builder.defaultCollection;
     this.pollQueueTimeMillis = builder.pollQueueTimeMillis;
-    this.stallTimeMillis = Integer.getInteger("solr.cloud.client.stallTime", 
15000);
+
+    // Initialize stall detection
+    long stallTimeMillis = Integer.getInteger("solr.cloud.client.stallTime", 
15000);
 
     // make sure the stall time is larger than the polling time
     // to give a chance for the queue to change
     long minimalStallTimeMillis = pollQueueTimeMillis * 2;
-    if (minimalStallTimeMillis > this.stallTimeMillis) {
-      this.stallTimeMillis = minimalStallTimeMillis;
+    if (minimalStallTimeMillis > stallTimeMillis) {
+      stallTimeMillis = minimalStallTimeMillis;
     }
 
+    this.stallDetection = new StallDetection(stallTimeMillis, queue::size);
+
     if (builder.executorService != null) {
       this.scheduler = builder.executorService;
       this.shutdownExecutor = false;
@@ -168,6 +173,8 @@ public class ConcurrentUpdateHttp2SolrClient extends 
SolrClient {
               new SolrNamedThreadFactory("concurrentUpdateScheduler"));
       this.shutdownExecutor = true;
     }
+
+    // processedCount is now managed by StallDetection
   }
 
   /** Opens a connection and sends everything... */
@@ -289,6 +296,7 @@ public class ConcurrentUpdateHttp2SolrClient extends 
SolrClient {
             } else {
               onSuccess(response, rspBody);
             }
+            stallDetection.incrementProcessedCount();
 
           } finally {
             try {
@@ -402,8 +410,6 @@ public class ConcurrentUpdateHttp2SolrClient extends 
SolrClient {
       Update update = new Update(req, effectiveCollection);
       boolean success = queue.offer(update);
 
-      long lastStallTime = -1;
-      int lastQueueSize = -1;
       for (; ; ) {
         synchronized (runners) {
           // see if queue is half full, and we can add more runners
@@ -438,28 +444,7 @@ public class ConcurrentUpdateHttp2SolrClient extends 
SolrClient {
         }
         if (!success) {
           // stall prevention
-          int currentQueueSize = queue.size();
-          if (currentQueueSize != lastQueueSize) {
-            // there's still some progress in processing the queue - not 
stalled
-            lastQueueSize = currentQueueSize;
-            lastStallTime = -1;
-          } else {
-            if (lastStallTime == -1) {
-              // mark a stall but keep trying
-              lastStallTime = System.nanoTime();
-            } else {
-              long currentStallTime =
-                  TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
lastStallTime);
-              if (currentStallTime > stallTimeMillis) {
-                throw new IOException(
-                    "Request processing has stalled for "
-                        + currentStallTime
-                        + "ms with "
-                        + queue.size()
-                        + " remaining elements in the queue.");
-              }
-            }
-          }
+          stallDetection.stallCheck();
         }
       }
     } catch (InterruptedException e) {
@@ -480,9 +465,6 @@ public class ConcurrentUpdateHttp2SolrClient extends 
SolrClient {
       waitForEmptyQueue();
       interruptRunnerThreadsPolling();
 
-      long lastStallTime = -1;
-      int lastQueueSize = -1;
-
       synchronized (runners) {
 
         // NOTE: if the executor is shut down, runners may never become empty. 
A scheduled task may
@@ -498,29 +480,11 @@ public class ConcurrentUpdateHttp2SolrClient extends 
SolrClient {
           // Need to check if the queue is empty before really considering 
this is finished
           // (SOLR-4260)
           int queueSize = queue.size();
-          // stall prevention
-          if (lastQueueSize != queueSize) {
-            // init, or no stall
-            lastQueueSize = queueSize;
-            lastStallTime = -1;
-          } else {
-            if (lastStallTime == -1) {
-              lastStallTime = System.nanoTime();
-            } else {
-              long currentStallTime =
-                  TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
lastStallTime);
-              if (currentStallTime > stallTimeMillis) {
-                throw new IOException(
-                    "Task queue processing has stalled for "
-                        + currentStallTime
-                        + " ms with "
-                        + queueSize
-                        + " remaining elements to process.");
-                //                Thread.currentThread().interrupt();
-                //                break;
-              }
-            }
+          // stall prevention - only if queue is not empty
+          if (queueSize > 0) {
+            stallDetection.stallCheck();
           }
+
           if (queueSize > 0 && runners.isEmpty()) {
             // TODO: can this still happen?
             log.warn(
@@ -558,8 +522,6 @@ public class ConcurrentUpdateHttp2SolrClient extends 
SolrClient {
   private void waitForEmptyQueue() throws IOException {
     boolean threadInterrupted = Thread.currentThread().isInterrupted();
 
-    long lastStallTime = -1;
-    int lastQueueSize = -1;
     while (!queue.isEmpty()) {
       if (ExecutorUtil.isTerminated(scheduler)) {
         log.warn(
@@ -592,28 +554,9 @@ public class ConcurrentUpdateHttp2SolrClient extends 
SolrClient {
               queue.size());
         }
       }
-      int currentQueueSize = queue.size();
-      // stall prevention
-      if (currentQueueSize != lastQueueSize) {
-        lastQueueSize = currentQueueSize;
-        lastStallTime = -1;
-      } else {
-        lastQueueSize = currentQueueSize;
-        if (lastStallTime == -1) {
-          lastStallTime = System.nanoTime();
-        } else {
-          long currentStallTime = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
-          if (currentStallTime > stallTimeMillis) {
-            throw new IOException(
-                "Task queue processing has stalled for "
-                    + currentStallTime
-                    + " ms with "
-                    + currentQueueSize
-                    + " remaining elements to process.");
-            //            threadInterrupted = true;
-            //            break;
-          }
-        }
+      // Only check for stalls if the queue is not empty
+      if (!queue.isEmpty()) {
+        stallDetection.stallCheck();
       }
     }
     if (threadInterrupted) {
diff --git 
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
 
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
index e4ec034982b..1eff51e4a5c 100644
--- 
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
+++ 
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
@@ -83,7 +83,6 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
   final int threadCount;
   boolean shutdownExecutor = false;
   int pollQueueTimeMillis = 250;
-  int stallTimeMillis;
   private final boolean streamDeletes;
   private boolean internalHttpClient;
   private final int connectionTimeout;
@@ -95,6 +94,8 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
   AtomicInteger blockLoops;
   AtomicInteger emptyQueueLoops;
 
+  protected StallDetection stallDetection;
+
   /**
    * Use builder to construct this class. Uses the supplied HttpClient to send 
documents to the Solr
    * server.
@@ -121,16 +122,20 @@ public class ConcurrentUpdateSolrClient extends 
SolrClient {
     this.connectionTimeout = builder.connectionTimeoutMillis;
     this.soTimeout = builder.socketTimeoutMillis;
     this.pollQueueTimeMillis = builder.pollQueueTime;
-    this.stallTimeMillis = Integer.getInteger("solr.cloud.client.stallTime", 
15000);
     this.defaultCollection = builder.defaultCollection;
 
+    // Initialize stall detection
+    int stallTimeMillis = Integer.getInteger("solr.cloud.client.stallTime", 
15000);
+
     // make sure the stall time is larger than the polling time
     // to give a chance for the queue to change
     int minimalStallTime = pollQueueTimeMillis * 2;
-    if (minimalStallTime > this.stallTimeMillis) {
-      this.stallTimeMillis = minimalStallTime;
+    if (minimalStallTime > stallTimeMillis) {
+      stallTimeMillis = minimalStallTime;
     }
 
+    this.stallDetection = new StallDetection(stallTimeMillis, () -> 
queue.size());
+
     if (builder.executorService != null) {
       this.scheduler = builder.executorService;
       this.shutdownExecutor = false;
@@ -147,6 +152,8 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
       this.blockLoops = new AtomicInteger();
       this.emptyQueueLoops = new AtomicInteger();
     }
+
+    // processedCount is now managed by StallDetection
   }
 
   public Set<String> getUrlParamNames() {
@@ -397,6 +404,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
           } else {
             onSuccess(response);
           }
+          stallDetection.incrementProcessedCount();
 
         } finally {
           try {
@@ -527,8 +535,6 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
       Update update = new Update(req, collection);
       boolean success = queue.offer(update);
 
-      long lastStallTime = -1;
-      int lastQueueSize = -1;
       for (; ; ) {
         synchronized (runners) {
           // see if queue is half full, and we can add more runners
@@ -562,29 +568,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient 
{
           success = queue.offer(update, 100, TimeUnit.MILLISECONDS);
         }
         if (!success) {
-          // stall prevention
-          int currentQueueSize = queue.size();
-          if (currentQueueSize != lastQueueSize) {
-            // there's still some progress in processing the queue - not 
stalled
-            lastQueueSize = currentQueueSize;
-            lastStallTime = -1;
-          } else {
-            if (lastStallTime == -1) {
-              // mark a stall but keep trying
-              lastStallTime = System.nanoTime();
-            } else {
-              long currentStallTime =
-                  TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
lastStallTime);
-              if (currentStallTime > stallTimeMillis) {
-                throw new IOException(
-                    "Request processing has stalled for "
-                        + currentStallTime
-                        + "ms with "
-                        + queue.size()
-                        + " remaining elements in the queue.");
-              }
-            }
-          }
+          stallCheck();
         }
       }
     } catch (InterruptedException e) {
@@ -598,6 +582,10 @@ public class ConcurrentUpdateSolrClient extends SolrClient 
{
     return dummy;
   }
 
+  void stallCheck() throws IOException {
+    stallDetection.stallCheck();
+  }
+
   public synchronized void blockUntilFinished() throws IOException {
     lock = new CountDownLatch(1);
     try {
@@ -605,9 +593,6 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
       waitForEmptyQueue();
       interruptRunnerThreadsPolling();
 
-      long lastStallTime = -1;
-      int lastQueueSize = -1;
-
       synchronized (runners) {
 
         // NOTE: if the executor is shut down, runners may never become empty. 
A scheduled task may
@@ -625,29 +610,11 @@ public class ConcurrentUpdateSolrClient extends 
SolrClient {
           // Need to check if the queue is empty before really considering 
this is finished
           // (SOLR-4260)
           int queueSize = queue.size();
-          // stall prevention
-          if (lastQueueSize != queueSize) {
-            // init, or no stall
-            lastQueueSize = queueSize;
-            lastStallTime = -1;
-          } else {
-            if (lastStallTime == -1) {
-              lastStallTime = System.nanoTime();
-            } else {
-              long currentStallTime =
-                  TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
lastStallTime);
-              if (currentStallTime > stallTimeMillis) {
-                throw new IOException(
-                    "Task queue processing has stalled for "
-                        + currentStallTime
-                        + " ms with "
-                        + queueSize
-                        + " remaining elements to process.");
-                //                Thread.currentThread().interrupt();
-                //                break;
-              }
-            }
+          // stall prevention - only if queue is not empty
+          if (queueSize > 0) {
+            stallCheck();
           }
+
           if (queueSize > 0 && runners.isEmpty()) {
             // TODO: can this still happen?
             log.warn(
@@ -685,8 +652,6 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
   private void waitForEmptyQueue() throws IOException {
     boolean threadInterrupted = Thread.currentThread().isInterrupted();
 
-    long lastStallTime = -1;
-    int lastQueueSize = -1;
     while (!queue.isEmpty()) {
       if (log.isDebugEnabled()) emptyQueueLoops.incrementAndGet();
       if (ExecutorUtil.isTerminated(scheduler)) {
@@ -720,28 +685,10 @@ public class ConcurrentUpdateSolrClient extends 
SolrClient {
               queue.size());
         }
       }
-      int currentQueueSize = queue.size();
-      // stall prevention
-      if (currentQueueSize != lastQueueSize) {
-        lastQueueSize = currentQueueSize;
-        lastStallTime = -1;
-      } else {
-        lastQueueSize = currentQueueSize;
-        if (lastStallTime == -1) {
-          lastStallTime = System.nanoTime();
-        } else {
-          long currentStallTime = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
-          if (currentStallTime > stallTimeMillis) {
-            throw new IOException(
-                "Task queue processing has stalled for "
-                    + currentStallTime
-                    + " ms with "
-                    + currentQueueSize
-                    + " remaining elements to process.");
-            //            threadInterrupted = true;
-            //            break;
-          }
-        }
+
+      // Only check for stalls if the queue is not empty
+      if (!queue.isEmpty()) {
+        stallCheck();
       }
     }
     if (threadInterrupted) {
diff --git 
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StallDetection.java 
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StallDetection.java
new file mode 100644
index 00000000000..16c7b641a0a
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StallDetection.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.IntSupplier;
+
+/**
+ * Utility class for detecting stalls in request processing.
+ *
+ * <p>This class is used by {@link ConcurrentUpdateHttp2SolrClient} and {@link
+ * ConcurrentUpdateSolrClient} to detect when request processing has stalled, 
which can happen if
+ * the server is unresponsive or if there's a problem with the connection.
+ */
+public class StallDetection {
+  private final LongAdder processedCount;
+  private volatile long lastProcessedCount;
+  private final AtomicLong startStallTime = new AtomicLong(-1);
+  private final long stallTimeMillis;
+  private final IntSupplier queueSizeSupplier;
+  private final TimeSource timeSource;
+
+  /**
+   * Creates a new StallDetection instance.
+   *
+   * @param stallTimeMillis The time in milliseconds after which to consider 
processing stalled
+   * @param queueSizeSupplier A supplier that returns the current queue size
+   */
+  public StallDetection(long stallTimeMillis, IntSupplier queueSizeSupplier) {
+    this(stallTimeMillis, queueSizeSupplier, TimeSource.SYSTEM);
+  }
+
+  /**
+   * Creates a new StallDetection instance with a custom time source.
+   *
+   * @param stallTimeMillis The time in milliseconds after which to consider 
processing stalled
+   * @param queueSizeSupplier A supplier that returns the current queue size
+   * @param timeSource The time source to use for time measurements
+   */
+  public StallDetection(
+      long stallTimeMillis, IntSupplier queueSizeSupplier, TimeSource 
timeSource) {
+    if (stallTimeMillis < 0) {
+      throw new IllegalArgumentException("stallTimeMillis must be 
non-negative");
+    }
+    this.stallTimeMillis = stallTimeMillis;
+    this.queueSizeSupplier = queueSizeSupplier;
+    this.processedCount = new LongAdder();
+    this.lastProcessedCount = 0;
+    this.timeSource = timeSource;
+  }
+
+  /**
+   * Increments the processed count.
+   *
+   * <p>This should be called whenever a request is successfully processed.
+   */
+  public void incrementProcessedCount() {
+    processedCount.increment();
+  }
+
+  /**
+   * Checks if request processing has stalled.
+   *
+   * <p>This method should be called periodically to check if request 
processing has stalled. If the
+   * queue is not empty and the processed count hasn't changed since the last 
check, a timer is
+   * started. If the timer exceeds the stall time, an IOException is thrown.
+   *
+   * <p>This method will never throw an exception if the queue is empty.
+   *
+   * @throws IOException if request processing has stalled
+   */
+  public void stallCheck() throws IOException {
+    int currentQueueSize = queueSizeSupplier.getAsInt();
+    if (currentQueueSize == 0) {
+      // If the queue is empty, we're not stalled
+      startStallTime.set(-1);
+      return;
+    }
+
+    long processed = processedCount.sum();
+    if (processed > lastProcessedCount) {
+      // there's still some progress in processing the queue - not stalled
+      lastProcessedCount = processed;
+      startStallTime.set(-1); // Reset timer when we see progress
+    } else {
+      long currentStartStallTime = startStallTime.get();
+      long currentTime = timeSource.nanoTime();
+
+      // Start the timer if it hasn't been started yet
+      if (currentStartStallTime == -1) {
+        startStallTime.set(currentTime);
+        return; // First detection, give it time before throwing exception
+      }
+
+      // Calculate elapsed time since stall was first detected
+      long timeElapsed = TimeUnit.NANOSECONDS.toMillis(currentTime - 
currentStartStallTime);
+
+      if (timeElapsed > stallTimeMillis) {
+        // Double-check that the queue is still not empty before throwing
+        int latestQueueSize = queueSizeSupplier.getAsInt();
+        if (latestQueueSize > 0) {
+          throw new IOException(
+              "Request processing has stalled for "
+                  + timeElapsed
+                  + "ms with "
+                  + latestQueueSize
+                  + " remaining elements in the queue.");
+        } else {
+          // Queue is now empty, reset the timer
+          startStallTime.set(-1);
+        }
+      }
+    }
+  }
+
+  /**
+   * Gets the current processed count.
+   *
+   * @return the current processed count
+   */
+  public long getProcessedCount() {
+    return processedCount.sum();
+  }
+
+  /**
+   * Gets the stall time in milliseconds.
+   *
+   * @return the stall time in milliseconds
+   */
+  public long getStallTimeMillis() {
+    return stallTimeMillis;
+  }
+
+  /**
+   * Resets the stall timer.
+   *
+   * <p>This can be useful if you know that processing hasn't actually stalled 
even though the
+   * processed count hasn't changed.
+   */
+  public void resetStallTimer() {
+    startStallTime.set(-1);
+  }
+}
diff --git 
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/TimeSource.java 
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/TimeSource.java
new file mode 100644
index 00000000000..82fcc96952c
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/TimeSource.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.impl;
+
+/**
+ * Interface to abstract time measurements for easier testing. This allows 
tests to control time
+ * advancement rather than relying on actual wall clock time.
+ */
+public interface TimeSource {
+
+  /**
+   * Returns the current value of the running Java Virtual Machine's 
high-resolution time source, in
+   * nanoseconds.
+   *
+   * @return the current value of the running Java Virtual Machine's 
high-resolution time source.
+   */
+  long nanoTime();
+
+  /** Default implementation that uses System.nanoTime(). */
+  TimeSource SYSTEM = System::nanoTime;
+}
diff --git 
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/FakeTimeSource.java 
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/FakeTimeSource.java
new file mode 100644
index 00000000000..ee3e9de1487
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/FakeTimeSource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.impl;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A fake time source for testing that allows precise control over the passage 
of time. Instead of
+ * relying on real system time and Thread.sleep(), tests can use this to 
advance time manually and
+ * test time-dependent logic deterministically.
+ */
+public class FakeTimeSource implements TimeSource {
+
+  private long nanoTime = 0;
+
+  @Override
+  public long nanoTime() {
+    return nanoTime;
+  }
+
+  /**
+   * Advances the fake time by the specified number of nanoseconds.
+   *
+   * @param nanos The number of nanoseconds to advance the clock
+   */
+  public void advanceNanos(long nanos) {
+    if (nanos < 0) {
+      throw new IllegalArgumentException("Cannot advance time backwards");
+    }
+    nanoTime += nanos;
+  }
+
+  /**
+   * Advances the fake time by the specified time amount.
+   *
+   * @param time The amount of time to advance
+   * @param unit The time unit
+   */
+  public void advance(long time, TimeUnit unit) {
+    advanceNanos(unit.toNanos(time));
+  }
+
+  /**
+   * Advances the fake time by the specified number of milliseconds.
+   *
+   * @param millis The number of milliseconds to advance the clock
+   */
+  public void advanceMillis(long millis) {
+    advance(millis, TimeUnit.MILLISECONDS);
+  }
+}
diff --git 
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/StallDetectionTest.java 
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/StallDetectionTest.java
new file mode 100644
index 00000000000..17c4e75ff4e
--- /dev/null
+++ 
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/StallDetectionTest.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntSupplier;
+import org.apache.solr.SolrTestCase;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StallDetectionTest extends SolrTestCase {
+
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @Test
+  public void testNoStallWithEmptyQueue() throws IOException {
+
+    long stallTimeMillis = 100;
+    FakeTimeSource timeSource = new FakeTimeSource();
+    StallDetection stallDetection = new StallDetection(stallTimeMillis, () -> 
0, timeSource);
+
+    // This should not throw an exception because the queue is empty
+    stallDetection.stallCheck();
+
+    // Advance time past the stall time
+    timeSource.advanceMillis(stallTimeMillis + 10);
+
+    // This should still not throw an exception because the queue is empty
+    stallDetection.stallCheck();
+  }
+
+  @Test
+  public void testNoStallWithProgressingQueue() throws IOException {
+
+    long stallTimeMillis = 100;
+    FakeTimeSource timeSource = new FakeTimeSource();
+    StallDetection stallDetection = new StallDetection(stallTimeMillis, () -> 
1, timeSource);
+
+    // Call stallCheck once to initialize the timer
+    stallDetection.stallCheck();
+
+    // Advance time but not enough to trigger a stall
+    timeSource.advanceMillis(stallTimeMillis / 2);
+
+    // Increment the processed count to simulate progress
+    stallDetection.incrementProcessedCount();
+
+    // This should not throw an exception because progress was made
+    stallDetection.stallCheck();
+
+    // Advance time past the stall time
+    timeSource.advanceMillis(stallTimeMillis + 10);
+
+    // This still should not throw because the timer was reset during the last 
check
+    stallDetection.stallCheck();
+  }
+
+  @Test
+  public void testStallDetection() throws IOException {
+
+    long stallTimeMillis = 5;
+
+    FakeTimeSource timeSource = new FakeTimeSource();
+
+    StallDetection stallDetection = new StallDetection(stallTimeMillis, () -> 
1, timeSource);
+
+    // Call stallCheck once to initialize the timer
+    stallDetection.stallCheck();
+
+    // Advance time past the stall time
+    timeSource.advanceMillis(stallTimeMillis + 1);
+
+    // This should throw an IOException
+    IOException exception = expectThrows(IOException.class, 
stallDetection::stallCheck);
+    assertTrue(
+        "Exception message should mention stall time",
+        exception.getMessage().contains("Request processing has stalled for"));
+    assertTrue(
+        "Exception message should mention queue size",
+        exception.getMessage().contains("remaining elements in the queue"));
+  }
+
+  @Test
+  public void testResetStallTimer() throws IOException {
+
+    long stallTimeMillis = 15;
+
+    FakeTimeSource timeSource = new FakeTimeSource();
+
+    StallDetection stallDetection = new StallDetection(stallTimeMillis, () -> 
1, timeSource);
+
+    // Call stallCheck once to initialize the timer
+    stallDetection.stallCheck();
+
+    // Advance time, but not enough to trigger a stall
+    timeSource.advanceMillis(5);
+
+    // Reset the timer
+    stallDetection.resetStallTimer();
+
+    // Call stallCheck again to reinitialize the timer
+    stallDetection.stallCheck();
+
+    // Advance time, but not enough to trigger a stall
+    timeSource.advanceMillis(5);
+
+    // This should not throw an exception because we reset the timer
+    stallDetection.stallCheck();
+
+    // Advance time past the stall time
+    timeSource.advanceMillis(stallTimeMillis + 1);
+
+    // Now it should throw an exception
+    expectThrows(IOException.class, stallDetection::stallCheck);
+  }
+
+  @Test
+  public void testDynamicQueueSize() throws IOException {
+
+    long stallTimeMillis = 15;
+    AtomicInteger queueSize = new AtomicInteger(1);
+    FakeTimeSource timeSource = new FakeTimeSource();
+    StallDetection stallDetection = new StallDetection(stallTimeMillis, 
queueSize::get, timeSource);
+
+    // Call stallCheck once to initialize the timer
+    stallDetection.stallCheck();
+
+    // Advance time but not enough to trigger a stall
+    timeSource.advanceMillis(stallTimeMillis / 2);
+
+    queueSize.set(0);
+
+    // This should not throw an exception and should reset the timer
+    stallDetection.stallCheck();
+
+    // Advance time past the stall time
+    timeSource.advanceMillis(stallTimeMillis * 2);
+
+    queueSize.set(1);
+
+    // This should not throw an exception because the timer was reset when 
queue was empty
+    stallDetection.stallCheck();
+
+    // Advance time past the stall time
+    timeSource.advanceMillis(stallTimeMillis + 5);
+
+    // Now it should throw an exception because queue is non-empty and no 
progress was made
+    expectThrows(IOException.class, stallDetection::stallCheck);
+  }
+
+  @Test
+  public void testGetProcessedCount() {
+    FakeTimeSource timeSource = new FakeTimeSource();
+    StallDetection stallDetection = new StallDetection(100, () -> 1, 
timeSource);
+
+    // Initially the processed count should be 0
+    assertEquals(0, stallDetection.getProcessedCount());
+
+    // Increment the processed count
+    stallDetection.incrementProcessedCount();
+
+    // Now the processed count should be 1
+    assertEquals(1, stallDetection.getProcessedCount());
+
+    // Increment multiple times
+    for (int i = 0; i < 5; i++) {
+      stallDetection.incrementProcessedCount();
+    }
+
+    // Now the processed count should be 6
+    assertEquals(6, stallDetection.getProcessedCount());
+  }
+
+  @Test
+  public void testGetStallTimeMillis() {
+    long stallTimeMillis = 100;
+    FakeTimeSource timeSource = new FakeTimeSource();
+    StallDetection stallDetection = new StallDetection(stallTimeMillis, () -> 
1, timeSource);
+
+    // The stall time should be what we set
+    assertEquals(stallTimeMillis, stallDetection.getStallTimeMillis());
+
+    // Test with a different value
+    long differentStallTime = 500;
+    StallDetection anotherStallDetection =
+        new StallDetection(differentStallTime, () -> 1, timeSource);
+    assertEquals(differentStallTime, 
anotherStallDetection.getStallTimeMillis());
+  }
+
+  @Test
+  public void testQueueEmptiesDuringStallCheck() throws IOException {
+
+    long stallTimeMillis = 5;
+    AtomicInteger queueSize = new AtomicInteger(1);
+
+    FakeTimeSource timeSource = new FakeTimeSource();
+
+    StallDetection stallDetection = new StallDetection(stallTimeMillis, 
queueSize::get, timeSource);
+
+    // Call stallCheck once to initialize the timer
+    stallDetection.stallCheck();
+
+    // Advance time past the stall time
+    timeSource.advanceMillis(stallTimeMillis + 1);
+
+    // Change the queue size to 0 just before the next check
+    queueSize.set(0);
+
+    // This should not throw an exception because the queue is now empty
+    stallDetection.stallCheck();
+
+    // Change the queue size back to 1
+    queueSize.set(1);
+
+    // This should not throw an exception because the timer was reset when 
queue was empty
+    stallDetection.stallCheck();
+
+    // Advance time past the stall time again
+    timeSource.advanceMillis(stallTimeMillis + 1);
+
+    // Now it should throw an exception
+    expectThrows(IOException.class, stallDetection::stallCheck);
+  }
+
+  @Test
+  public void testProgressResetsDuringStallCheck() throws IOException {
+
+    long stallTimeMillis = 200;
+
+    FakeTimeSource timeSource = new FakeTimeSource();
+
+    final IntSupplier fixedQueueSupplier = () -> 1;
+    StallDetection stallDetection =
+        new StallDetection(stallTimeMillis, fixedQueueSupplier, timeSource);
+
+    // First call - initialize timer
+    stallDetection.stallCheck();
+
+    // Advance time past the stall time
+    timeSource.advanceMillis(stallTimeMillis + 50);
+
+    // Make first progress
+    stallDetection.incrementProcessedCount();
+
+    // Second call - should not throw due to progress
+    stallDetection.stallCheck();
+
+    // Advance time past the stall time again
+    timeSource.advanceMillis(stallTimeMillis + 50);
+
+    // Third call - should NOT throw because timer was reset on second call
+    try {
+      stallDetection.stallCheck();
+    } catch (IOException e) {
+      fail("Unexpected exception thrown: " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testConcurrentAccess() throws Exception {
+
+    final long stallTimeMillis = 1000;
+    final FakeTimeSource timeSource = new FakeTimeSource();
+    final AtomicInteger queueSize = new AtomicInteger(10);
+    final StallDetection stallDetection =
+        new StallDetection(stallTimeMillis, queueSize::get, timeSource);
+
+    // Initialize the timer
+    stallDetection.stallCheck();
+
+    final int threadCount = 10;
+
+    final CyclicBarrier barrier = new CyclicBarrier(threadCount);
+    final CountDownLatch allDone = new CountDownLatch(threadCount);
+    final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
+
+    ExecutorService executor =
+        ExecutorUtil.newMDCAwareFixedThreadPool(
+            threadCount, new SolrNamedThreadFactory("StallDetectionTest"));
+
+    try {
+      for (int i = 0; i < threadCount; i++) {
+        final int threadId = i;
+        executor.submit(
+            () -> {
+              try {
+                // Wait for all threads to be ready
+                barrier.await();
+
+                // Half the threads increment the progress counter
+                // The other half call stallCheck
+                if (threadId % 2 == 0) {
+                  // Increment progress
+                  for (int j = 0; j < 100; j++) {
+                    stallDetection.incrementProcessedCount();
+                    Thread.yield(); // Hint to allow other threads to run
+                  }
+                } else {
+                  // Check for stalls
+                  for (int j = 0; j < 100; j++) {
+                    try {
+                      stallDetection.stallCheck();
+                      // Advance time a bit between checks, but not enough to 
trigger a stall
+                      synchronized (timeSource) {
+                        timeSource.advanceMillis(5);
+                      }
+                      Thread.yield(); // Hint to allow other threads to run
+                    } catch (IOException e) {
+                      exceptionThrown.set(true);
+                      fail("Unexpected IOException: " + e.getMessage());
+                    }
+                  }
+                }
+              } catch (Exception e) {
+                exceptionThrown.set(true);
+                log.error("Exception in thread {}", threadId, e);
+              } finally {
+                allDone.countDown();
+              }
+            });
+      }
+
+      // Wait for all threads to complete
+      assertTrue("Timed out waiting for threads to complete", 
allDone.await(30, TimeUnit.SECONDS));
+
+      // Verify no exceptions were thrown
+      assertFalse("Exception was thrown during concurrent operation", 
exceptionThrown.get());
+
+      // Verify the final processed count is correct
+      assertEquals(
+          "Processed count should equal number of incrementing threads × 100",
+          (threadCount / 2) * 100,
+          stallDetection.getProcessedCount());
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test
+  public void testZeroStallTime() {
+    // A zero stall time should be allowed but would cause quick stall 
detection
+    FakeTimeSource timeSource = new FakeTimeSource();
+    StallDetection stallDetection = new StallDetection(0, () -> 1, timeSource);
+    assertEquals(0, stallDetection.getStallTimeMillis());
+
+    try {
+      // Call stallCheck to initialize timer
+      stallDetection.stallCheck();
+
+      // Any advancement of time should trigger a stall with zero stall time
+      timeSource.advanceMillis(1);
+
+      // This should throw an exception immediately
+      expectThrows(IOException.class, stallDetection::stallCheck);
+    } catch (IOException e) {
+      fail("First call to stallCheck should not throw: " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testNegativeStallTime() {
+    // Negative stall time should not be allowed
+    expectThrows(IllegalArgumentException.class, () -> new StallDetection(-1, 
() -> 1));
+  }
+
+  @Test
+  public void testVeryLargeStallTime() throws IOException {
+    // Test with a large stall time value (close to Long.MAX_VALUE)
+    long veryLargeStallTime = Long.MAX_VALUE / 2;
+    FakeTimeSource timeSource = new FakeTimeSource();
+    StallDetection stallDetection = new StallDetection(veryLargeStallTime, () 
-> 1, timeSource);
+
+    // First call to initialize timer
+    stallDetection.stallCheck();
+
+    // Advance time, but not enough to trigger a stall
+    timeSource.advanceMillis(Integer.MAX_VALUE);
+
+    // This should not throw an exception
+    stallDetection.stallCheck();
+
+    // Now increment to test the reset behavior also works with large values
+    stallDetection.incrementProcessedCount();
+    stallDetection.stallCheck();
+
+    // Advance time past the large stall time
+    // This would cause overflow if we're not careful
+    timeSource.advanceNanos(Long.MAX_VALUE - 100);
+
+    // Now try to advance time to very close to overflow
+    timeSource.advanceNanos(99);
+
+    // This should still not throw because we reset the timer earlier
+    stallDetection.stallCheck();
+  }
+}


Reply via email to