This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch branch_10_0
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_10_0 by this push:
     new 8c4eca8db20 SOLR-17947: CloudSolrClient refreshes collection state 
asynchronously using a dedicated thread pool to reduce ZooKeeper blocking under 
load. (#3962)
8c4eca8db20 is described below

commit 8c4eca8db20b461129856b30dc07d179ee1f2439
Author: Mark Robert Miller <[email protected]>
AuthorDate: Wed Dec 17 15:48:29 2025 -0600

    SOLR-17947: CloudSolrClient refreshes collection state asynchronously using 
a dedicated thread pool to reduce ZooKeeper blocking under load. (#3962)
---
 ...R-17947-cloudsolrclient async state refresh.yml |   8 +
 .../client/solrj/impl/CloudHttp2SolrClient.java    |  10 +-
 .../solr/client/solrj/impl/CloudSolrClient.java    | 305 ++++++++++++---
 .../solrj/impl/CloudSolrClientCacheTest.java       | 417 ++++++++++++++++++++-
 4 files changed, 674 insertions(+), 66 deletions(-)

diff --git a/changelog/unreleased/SOLR-17947-cloudsolrclient async state 
refresh.yml b/changelog/unreleased/SOLR-17947-cloudsolrclient async state 
refresh.yml
new file mode 100644
index 00000000000..da7aa452ee1
--- /dev/null
+++ b/changelog/unreleased/SOLR-17947-cloudsolrclient async state refresh.yml   
@@ -0,0 +1,8 @@
+title: CloudSolrClient now refreshes collection state asynchronously using a 
dedicated
+  thread pool, reducing ZooKeeper blocking and improving performance under 
load.
+type: changed
+authors:
+- name: Mark Miller
+links:
+- name: SOLR-17947
+  url: https://issues.apache.org/jira/browse/SOLR-17947
diff --git 
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
 
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
index 1eb61d4c9c5..72cfd2d042e 100644
--- 
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
+++ 
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
@@ -44,7 +44,11 @@ public class CloudHttp2SolrClient extends CloudSolrClient {
    * @param builder a {@link CloudSolrClient.Builder} with the options used to 
create the client.
    */
   protected CloudHttp2SolrClient(Builder builder) {
-    super(builder.shardLeadersOnly, builder.parallelUpdates, 
builder.directUpdatesToLeadersOnly);
+    super(
+        builder.shardLeadersOnly,
+        builder.parallelUpdates,
+        builder.directUpdatesToLeadersOnly,
+        builder.parallelCacheRefreshesLocks);
     this.clientIsInternal = builder.httpClient == null;
     try {
       this.myClient = builder.createOrGetHttpClient();
@@ -65,10 +69,6 @@ public class CloudHttp2SolrClient extends CloudSolrClient {
 
     this.collectionStateCache.timeToLiveMs =
         TimeUnit.MILLISECONDS.convert(builder.timeToLiveSeconds, 
TimeUnit.SECONDS);
-
-    //  If caches are expired then they are refreshed after acquiring a lock. 
Set the number of
-    // locks.
-    this.locks = objectList(builder.parallelCacheRefreshesLocks);
   }
 
   private ClusterStateProvider createClusterStateProvider(Builder builder) {
diff --git 
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java 
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 6eabbf6ddb0..c9199fdafff 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -37,10 +37,12 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
@@ -74,7 +76,6 @@ import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.common.util.CollectionUtil;
 import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.Hash;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
@@ -97,6 +98,7 @@ public abstract class CloudSolrClient extends SolrClient {
   // no of times collection state to be reloaded if stale state error is 
received
   private static final int MAX_STALE_RETRIES =
       
Integer.parseInt(System.getProperty("solr.solrj.cloud.max.stale.retries", "5"));
+  static final int DEFAULT_STATE_REFRESH_PARALLELISM = 5;
   private final Random rand = new Random();
 
   private final boolean updatesToLeaders;
@@ -125,7 +127,11 @@ public abstract class CloudSolrClient extends SolrClient {
           // UpdateParams.ROLLBACK
           );
 
-  protected volatile Object[] locks = objectList(3);
+  private final ConcurrentHashMap<String, CompletableFuture<DocCollection>> 
collectionRefreshes =
+      new ConcurrentHashMap<>();
+  private final Semaphore stateRefreshSemaphore;
+  private final int stateRefreshParallelism;
+  private volatile boolean closed;
 
   /**
    * Constructs {@link CloudSolrClient} instances from provided configuration. 
It will use a Jetty
@@ -175,7 +181,7 @@ public abstract class CloudSolrClient extends SolrClient {
 
     protected String defaultCollection;
     protected long timeToLiveSeconds = 60;
-    protected int parallelCacheRefreshesLocks = 3;
+    protected int parallelCacheRefreshesLocks = 
DEFAULT_STATE_REFRESH_PARALLELISM;
     protected int zkConnectTimeout = 
SolrZkClientTimeout.DEFAULT_ZK_CONNECT_TIMEOUT;
     protected int zkClientTimeout = 
SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT;
     protected boolean canUseZkACLs = true;
@@ -323,10 +329,10 @@ public abstract class CloudSolrClient extends SolrClient {
     }
 
     /**
-     * When caches are expired then they are refreshed after acquiring a lock. 
Use this to set the
-     * number of locks.
+     * Configures how many collection state refresh operations may run in 
parallel using a dedicated
+     * thread pool. This controls the maximum number of concurrent 
ZooKeeper/cluster state lookups.
      *
-     * <p>Defaults to 3.
+     * <p>Defaults to 5.
      */
     public Builder withParallelCacheRefreshes(int parallelCacheRefreshesLocks) 
{
       this.parallelCacheRefreshesLocks = parallelCacheRefreshesLocks;
@@ -512,6 +518,10 @@ public abstract class CloudSolrClient extends SolrClient {
       return val;
     }
 
+    ExpiringCachedDocCollection peek(Object key) {
+      return super.get(key);
+    }
+
     @Override
     public ExpiringCachedDocCollection put(String key, 
ExpiringCachedDocCollection value) {
       puts.incrementAndGet();
@@ -565,14 +575,46 @@ public abstract class CloudSolrClient extends SolrClient {
     void setRetriedAt() {
       retriedAtNano = System.nanoTime();
     }
+
+    /**
+     * Marks this entry as {@code maybeStale} if the provided backoff window 
has elapsed since the
+     * last retry.
+     *
+     * @return {@code true} if the entry was flagged as maybe stale
+     */
+    boolean markMaybeStaleIfOutsideBackoff(long retryBackoffNano) {
+      if (maybeStale) {
+        return true;
+      }
+      long lastRetry = retriedAtNano;
+      if (lastRetry != -1 && (System.nanoTime() - lastRetry) <= 
retryBackoffNano) {
+        return false;
+      }
+      maybeStale = true;
+      return true;
+    }
   }
 
   protected CloudSolrClient(
       boolean updatesToLeaders, boolean parallelUpdates, boolean 
directUpdatesToLeadersOnly) {
+    this(
+        updatesToLeaders,
+        parallelUpdates,
+        directUpdatesToLeadersOnly,
+        DEFAULT_STATE_REFRESH_PARALLELISM);
+  }
+
+  protected CloudSolrClient(
+      boolean updatesToLeaders,
+      boolean parallelUpdates,
+      boolean directUpdatesToLeadersOnly,
+      int stateRefreshThreads) {
     this.updatesToLeaders = updatesToLeaders;
     this.parallelUpdates = parallelUpdates;
     this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
     this.requestRLTGenerator = new RequestReplicaListTransformerGenerator();
+    this.stateRefreshParallelism = Math.max(1, stateRefreshThreads);
+    this.stateRefreshSemaphore = new Semaphore(this.stateRefreshParallelism);
   }
 
   protected abstract LBSolrClient getLbClient();
@@ -598,6 +640,8 @@ public abstract class CloudSolrClient extends SolrClient {
 
   @Override
   public void close() {
+    closed = true;
+    collectionRefreshes.clear();
     if (this.threadPool != null && !ExecutorUtil.isShutdown(this.threadPool)) {
       ExecutorUtil.shutdownAndAwaitTermination(this.threadPool);
       this.threadPool = null;
@@ -1081,7 +1125,13 @@ public abstract class CloudSolrClient extends SolrClient 
{
 
     List<String> inputCollections =
         collection == null ? Collections.emptyList() : 
StrUtils.splitSmart(collection, ",", true);
-    return requestWithRetryOnStaleState(request, 0, inputCollections);
+    return requestWithRetryOnStaleState(
+        request,
+        0,
+        inputCollections,
+        /*skipStateVersion*/ false,
+        Map.of(),
+        /*waitedForRefresh*/ false);
   }
 
   /**
@@ -1090,7 +1140,12 @@ public abstract class CloudSolrClient extends SolrClient 
{
    * and retried.
    */
   protected NamedList<Object> requestWithRetryOnStaleState(
-      SolrRequest<?> request, int retryCount, List<String> inputCollections)
+      SolrRequest<?> request,
+      int retryCount,
+      List<String> inputCollections,
+      boolean skipStateVersion,
+      Map<String, CompletableFuture<DocCollection>> pendingRefreshes,
+      boolean waitedForRefresh)
       throws SolrServerException, IOException {
     // build up a _stateVer_ param to pass to the server containing all the
     // external collection state versions involved in this request, which 
allows
@@ -1139,7 +1194,7 @@ public abstract class CloudSolrClient extends SolrClient {
     }
 
     if (request.getParams() instanceof ModifiableSolrParams params) {
-      if (stateVerParam != null) {
+      if (!skipStateVersion && stateVerParam != null) {
         params.set(STATE_VERSION, stateVerParam);
       } else {
         params.remove(STATE_VERSION);
@@ -1201,9 +1256,21 @@ public abstract class CloudSolrClient extends SolrClient 
{
         // in retryExpiryTime time
         if (requestedCollections != null) {
           for (DocCollection ext : requestedCollections) {
-            ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.get(ext.getName());
-            if (cacheEntry == null) continue;
-            cacheEntry.maybeStale = true;
+            String name = ext.getName();
+            ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(name);
+            if (cacheEntry != null) {
+              if (wasCommError) {
+                cacheEntry.maybeStale = true;
+              } else {
+                boolean markedStale =
+                    
cacheEntry.markMaybeStaleIfOutsideBackoff(retryExpiryTimeNano);
+                if (markedStale && cacheEntry.shouldRetry()) {
+                  triggerCollectionRefresh(name);
+                }
+              }
+            } else {
+              triggerCollectionRefresh(name);
+            }
           }
         }
         if (retryCount < MAX_STALE_RETRIES) { // if it is a communication 
error , we must try again
@@ -1220,7 +1287,13 @@ public abstract class CloudSolrClient extends SolrClient 
{
               MAX_STALE_RETRIES,
               wasCommError,
               errorCode);
-          return requestWithRetryOnStaleState(request, retryCount + 1, 
inputCollections);
+          return requestWithRetryOnStaleState(
+              request,
+              retryCount + 1,
+              inputCollections,
+              skipStateVersion,
+              pendingRefreshes,
+              waitedForRefresh);
         }
       } else {
         log.info("request was not communication error it seems");
@@ -1272,16 +1345,57 @@ public abstract class CloudSolrClient extends 
SolrClient {
         }
       }
 
-      if (requestedCollections != null) {
-        requestedCollections.clear(); // done with this
-      }
-
       // if the state was stale, then we retry the request once with new state 
pulled from Zk
       if (stateWasStale) {
         log.warn(
             "Re-trying request to collection(s) {} after stale state error 
from server.",
             inputCollections);
-        resp = requestWithRetryOnStaleState(request, retryCount + 1, 
inputCollections);
+
+        Map<String, CompletableFuture<DocCollection>> refreshesToWaitFor = 
pendingRefreshes;
+        if (!waitedForRefresh && (pendingRefreshes == null || 
pendingRefreshes.isEmpty())) {
+          refreshesToWaitFor = new HashMap<>();
+          for (DocCollection ext : requestedCollections) {
+            refreshesToWaitFor.put(ext.getName(), 
triggerCollectionRefresh(ext.getName()));
+          }
+        }
+
+        // First retry without sending state versions so the server does not 
immediately reject the
+        // request while we intentionally rely on stale routing (e.g., to 
allow forwarding to a new
+        // leader) as the background refresh completes.
+        if (!skipStateVersion && !waitedForRefresh) {
+          resp =
+              requestWithRetryOnStaleState(
+                  request,
+                  retryCount + 1,
+                  inputCollections,
+                  /*skipStateVersion*/ true,
+                  refreshesToWaitFor,
+                  waitedForRefresh);
+        } else if (!waitedForRefresh
+            && refreshesToWaitFor != null
+            && !refreshesToWaitFor.isEmpty()) {
+          for (Map.Entry<String, CompletableFuture<DocCollection>> entry :
+              refreshesToWaitFor.entrySet()) {
+            waitForCollectionRefresh(entry.getKey(), entry.getValue());
+          }
+          resp =
+              requestWithRetryOnStaleState(
+                  request,
+                  retryCount + 1,
+                  inputCollections,
+                  /*skipStateVersion*/ false,
+                  Map.of(),
+                  /*waitedForRefresh*/ true);
+        } else {
+          resp =
+              requestWithRetryOnStaleState(
+                  request,
+                  retryCount + 1,
+                  inputCollections,
+                  /*skipStateVersion*/ false,
+                  Map.of(),
+                  /*waitedForRefresh*/ waitedForRefresh);
+        }
       } else {
         if (exc instanceof SolrException
             || exc instanceof SolrServerException
@@ -1291,6 +1405,10 @@ public abstract class CloudSolrClient extends SolrClient 
{
           throw new SolrServerException(rootCause);
         }
       }
+
+      if (requestedCollections != null) {
+        requestedCollections.clear(); // done with this
+      }
     }
 
     return resp;
@@ -1505,54 +1623,125 @@ public abstract class CloudSolrClient extends 
SolrClient {
     return directUpdatesToLeadersOnly;
   }
 
-  protected static Object[] objectList(int n) {
-    Object[] l = new Object[n];
-    for (int i = 0; i < n; i++) {
-      l[i] = new Object();
-    }
-    return l;
+  /** Visible for tests so they can assert the configured refresh parallelism. 
*/
+  protected int getStateRefreshParallelism() {
+    return stateRefreshParallelism;
   }
 
   protected DocCollection getDocCollection(String collection, Integer 
expectedVersion)
       throws SolrException {
-    if (expectedVersion == null) expectedVersion = -1;
-    if (collection == null) return null;
-    ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.get(collection);
-    DocCollection col = cacheEntry == null ? null : cacheEntry.cached;
-    if (col != null) {
-      if (expectedVersion <= col.getZNodeVersion() && 
!cacheEntry.shouldRetry()) return col;
-    }
-
-    Object[] locks = this.locks;
-    int lockId =
-        Math.abs(Hash.murmurhash3_x86_32(collection, 0, collection.length(), 
0) % locks.length);
-    final Object lock = locks[lockId];
-    synchronized (lock) {
-      /*we have waited for some time just check once again*/
-      cacheEntry = collectionStateCache.get(collection);
-      col = cacheEntry == null ? null : cacheEntry.cached;
-      if (col != null) {
-        if (expectedVersion <= col.getZNodeVersion() && 
!cacheEntry.shouldRetry()) return col;
-      }
-      ClusterState.CollectionRef ref = getCollectionRef(collection);
-      if (ref == null) {
-        // no such collection exists
-        return null;
-      }
-      // We are going to fetch a new version
-      // we MUST try to get a new version
-      DocCollection fetchedCol = ref.get(); // this is a call to ZK
-      if (fetchedCol == null) return null; // this collection no more exists
-      if (col != null && fetchedCol.getZNodeVersion() == 
col.getZNodeVersion()) {
-        cacheEntry.setRetriedAt(); // we retried and found that it is the same 
version
-        cacheEntry.maybeStale = false;
-      } else {
-        collectionStateCache.put(collection, new 
ExpiringCachedDocCollection(fetchedCol));
+    if (expectedVersion == null) {
+      expectedVersion = -1;
+    }
+    if (collection == null) {
+      return null;
+    }
+
+    ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(collection);
+    if (cacheEntry != null && 
cacheEntry.isExpired(collectionStateCache.timeToLiveMs)) {
+      collectionStateCache.remove(collection, cacheEntry);
+      cacheEntry = null;
+    }
+
+    DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
+
+    if (cacheEntry != null && cacheEntry.shouldRetry()) {
+      triggerCollectionRefresh(collection);
+    }
+
+    if (cached != null && expectedVersion <= cached.getZNodeVersion()) {
+      return cached;
+    }
+
+    CompletableFuture<DocCollection> refreshFuture = 
triggerCollectionRefresh(collection);
+    return waitForCollectionRefresh(collection, refreshFuture);
+  }
+
+  private CompletableFuture<DocCollection> triggerCollectionRefresh(String 
collection) {
+    if (closed) {
+      ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(collection);
+      DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
+      return CompletableFuture.completedFuture(cached);
+    }
+    return collectionRefreshes.computeIfAbsent(
+        collection,
+        key -> {
+          ExecutorService executor = threadPool;
+          CompletableFuture<DocCollection> future;
+          if (executor == null || ExecutorUtil.isShutdown(executor)) {
+            future = new CompletableFuture<>();
+            try {
+              future.complete(loadDocCollection(key));
+            } catch (Throwable t) {
+              future.completeExceptionally(t);
+            }
+          } else {
+            future =
+                CompletableFuture.supplyAsync(
+                    () -> {
+                      stateRefreshSemaphore.acquireUninterruptibly();
+                      try {
+                        return loadDocCollection(key);
+                      } finally {
+                        stateRefreshSemaphore.release();
+                      }
+                    },
+                    executor);
+          }
+          future.whenCompleteAsync(
+              (result, error) -> {
+                collectionRefreshes.remove(key, future);
+              });
+          return future;
+        });
+  }
+
+  private DocCollection waitForCollectionRefresh(
+      String collection, CompletableFuture<DocCollection> refreshFuture) {
+    try {
+      return refreshFuture.get();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new SolrException(
+          SolrException.ErrorCode.SERVER_ERROR,
+          "Interrupted while refreshing state for collection " + collection,
+          e);
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SolrException) {
+        throw (SolrException) cause;
       }
-      return fetchedCol;
+      throw new SolrException(
+          SolrException.ErrorCode.SERVER_ERROR,
+          "Error refreshing state for collection " + collection,
+          cause);
     }
   }
 
+  private DocCollection loadDocCollection(String collection) {
+    ClusterState.CollectionRef ref = getCollectionRef(collection);
+    if (ref == null) {
+      collectionStateCache.remove(collection);
+      return null;
+    }
+
+    DocCollection fetchedCol = ref.get();
+    if (fetchedCol == null) {
+      collectionStateCache.remove(collection);
+      return null;
+    }
+
+    ExpiringCachedDocCollection existing = 
collectionStateCache.peek(collection);
+    if (existing != null && existing.cached.getZNodeVersion() == 
fetchedCol.getZNodeVersion()) {
+      existing.setRetriedAt();
+      existing.maybeStale = false;
+      return existing.cached;
+    }
+
+    collectionStateCache.put(collection, new 
ExpiringCachedDocCollection(fetchedCol));
+    return fetchedCol;
+  }
+
   ClusterState.CollectionRef getCollectionRef(String collection) {
     return getClusterStateProvider().getState(collection);
   }
diff --git 
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
 
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
index dca6757b809..d2a864f4a54 100644
--- 
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
+++ 
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
@@ -22,24 +22,42 @@ import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.net.ConnectException;
 import java.net.SocketException;
 import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.http.NoHttpResponseException;
 import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.apache.LBHttpSolrClient;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.junit.BeforeClass;
 
 public class CloudSolrClientCacheTest extends SolrTestCaseJ4 {
@@ -88,7 +106,7 @@ public class CloudSolrClientCacheTest extends SolrTestCaseJ4 
{
       livenodes.addAll(Set.of("192.168.1.108:7574_solr", 
"192.168.1.108:8983_solr"));
       ClusterState cs =
           ClusterState.createFromJson(
-              1, coll1State.getBytes(UTF_8), Collections.emptySet(), 
Instant.now(), null);
+              1, COLL1_STATE.getBytes(UTF_8), Collections.emptySet(), 
Instant.now(), null);
       refs.put(collName, new Ref(collName));
       colls.put(collName, cs.getCollectionOrNull(collName));
       responses.put(
@@ -109,7 +127,173 @@ public class CloudSolrClientCacheTest extends 
SolrTestCaseJ4 {
       UpdateRequest update = new UpdateRequest().add("id", "123", "desc", 
"Something 0");
 
       cloudClient.request(update, collName);
-      assertEquals(2, refs.get(collName).getCount());
+      // Async refresh with deduplication means rapid retries can share the 
same Future.
+      // Race: sometimes async completes fast enough for 2 fetches, sometimes 
only 1.
+      int fetchCount = refs.get(collName).getCount();
+      assertTrue("Expected 1 or 2 fetches, got " + fetchCount, fetchCount >= 1 
&& fetchCount <= 2);
+    }
+  }
+
+  public void testStaleStateRetrySkipsStateVersionBeforeWait() throws 
Exception {
+    String collName = "gettingstarted";
+    Set<String> liveNodes = new HashSet<>(Set.of("192.168.1.108:8983_solr"));
+    AtomicInteger refGets = new AtomicInteger();
+    AtomicReference<DocCollection> currentDoc = new 
AtomicReference<>(loadCollection(collName, 1));
+    Map<String, ClusterState.CollectionRef> refs =
+        Map.of(collName, new TestCollectionRef(currentDoc::get, refGets, null, 
null, -1));
+    try (ClusterStateProvider provider = getStateProvider(liveNodes, refs);
+        RecordingCloudSolrClient client = new 
RecordingCloudSolrClient(provider, 3)) {
+      client.enqueue(
+          (req, cols) -> {
+            throw new SolrException(SolrException.ErrorCode.INVALID_STATE, 
"stale");
+          });
+      client.enqueue((req, cols) -> null);
+
+      DummyRequest request = new DummyRequest(collName);
+      NamedList<Object> resp = client.request(request, collName);
+      assertNotNull(resp);
+
+      List<String> history = client.getStateVersionHistory();
+      assertEquals(2, history.size());
+      assertTrue(history.get(0).startsWith(collName + ":"));
+      assertNull("Second attempt should skip _stateVer_", history.get(1));
+      assertTrue("Expected refresh to be triggered", refGets.get() >= 1);
+    }
+  }
+
+  public void testDirectUpdatesToLeadersSkipStateVersionBeforeWait() throws 
Exception {
+    String collName = "gettingstarted";
+    Set<String> liveNodes = new HashSet<>(Set.of("192.168.1.108:8983_solr"));
+    AtomicInteger refGets = new AtomicInteger();
+    AtomicReference<DocCollection> currentDoc = new 
AtomicReference<>(loadCollection(collName, 1));
+    Map<String, ClusterState.CollectionRef> refs =
+        Map.of(collName, new TestCollectionRef(currentDoc::get, refGets, null, 
null, -1));
+    try (ClusterStateProvider provider = getStateProvider(liveNodes, refs);
+        RecordingCloudSolrClient client =
+            new RecordingCloudSolrClient(provider, true, true, true, 3)) {
+      client.enqueue(
+          (req, cols) -> {
+            throw new SolrException(SolrException.ErrorCode.INVALID_STATE, 
"stale");
+          });
+      client.enqueue((req, cols) -> null);
+
+      DummyUpdateRequest request = new DummyUpdateRequest(collName);
+      NamedList<Object> resp = client.request(request, collName);
+      assertNotNull(resp);
+
+      List<String> history = client.getStateVersionHistory();
+      assertEquals(2, history.size());
+      assertTrue(history.get(0).startsWith(collName + ":"));
+      assertNull(history.get(1));
+      assertTrue(refGets.get() >= 1);
+    }
+  }
+
+  public void testStaleStateRetryWaitsAfterSkipFailure() throws Exception {
+    String collName = "gettingstarted";
+    AtomicReference<DocCollection> currentDoc = new 
AtomicReference<>(loadCollection(collName, 1));
+
+    // Track when refresh is triggered to ensure the async refresh mechanism 
is used
+    AtomicInteger refGets = new AtomicInteger();
+    TestCollectionRef ref = new TestCollectionRef(currentDoc::get, refGets, 
null, null, -1);
+    Map<String, ClusterState.CollectionRef> refs = Map.of(collName, ref);
+    Set<String> liveNodes = new HashSet<>(Set.of("192.168.1.108:8983_solr"));
+
+    try (ClusterStateProvider provider = getStateProvider(liveNodes, refs);
+        RecordingCloudSolrClient client = new 
RecordingCloudSolrClient(provider, 2)) {
+      // First attempt: returns stale error, triggers skipStateVersion retry
+      client.enqueue(
+          (req, cols) -> {
+            throw new SolrException(SolrException.ErrorCode.INVALID_STATE, 
"stale-first");
+          });
+      // Second attempt (skipStateVersion retry): also returns stale error
+      client.enqueue(
+          (req, cols) -> {
+            throw new SolrException(SolrException.ErrorCode.INVALID_STATE, 
"stale-second");
+          });
+      // Third attempt (after waiting for refresh): succeeds
+      client.enqueue((req, cols) -> null);
+
+      DummyRequest request = new DummyRequest(collName);
+      NamedList<Object> resp = client.request(request, collName);
+      assertNotNull(resp);
+
+      // Verify the retry sequence:
+      // 1. First attempt with state version
+      // 2. skipStateVersion retry without state version
+      // 3. Final retry with refreshed state version
+      List<String> history = client.getStateVersionHistory();
+      assertEquals("Should have 3 attempts", 3, history.size());
+      assertTrue(
+          "First attempt should have state param", 
history.get(0).startsWith(collName + ":"));
+      assertNull("skipStateVersion attempt should NOT have state param", 
history.get(1));
+      assertTrue(
+          "Final attempt should have state param after refresh",
+          history.get(2).startsWith(collName + ":"));
+
+      // Verify refresh was triggered (at least initial load + refresh after 
stale errors)
+      assertTrue("Refresh should have been called", refGets.get() >= 2);
+    }
+  }
+
+  public void testStateRefreshThreadsConfiguredViaBuilder() throws Exception {
+    String collName = "gettingstarted";
+    AtomicReference<DocCollection> currentDoc = new 
AtomicReference<>(loadCollection(collName, 1));
+    Map<String, ClusterState.CollectionRef> refs =
+        Map.of(
+            collName, new TestCollectionRef(currentDoc::get, new 
AtomicInteger(), null, null, -1));
+    Set<String> liveNodes = new HashSet<>(Set.of("192.168.1.108:8983_solr"));
+
+    try (ClusterStateProvider provider = getStateProvider(liveNodes, refs);
+        RecordingCloudSolrClient client = new 
RecordingCloudSolrClient(provider, 7)) {
+      assertEquals(7, client.getStateRefreshParallelism());
+    }
+  }
+
+  public void testConcurrentRefreshIsDeduplicated() throws Exception {
+    String collName = "gettingstarted";
+    AtomicReference<DocCollection> currentDoc = new 
AtomicReference<>(loadCollection(collName, 1));
+    AtomicInteger refGets = new AtomicInteger();
+    CountDownLatch refreshStarted = new CountDownLatch(1);
+    CountDownLatch releaseRefresh = new CountDownLatch(1);
+    Map<String, ClusterState.CollectionRef> refs =
+        Map.of(
+            collName,
+            new TestCollectionRef(currentDoc::get, refGets, refreshStarted, 
releaseRefresh, 1));
+    Set<String> liveNodes = new HashSet<>(Set.of("192.168.1.108:8983_solr"));
+
+    try (ClusterStateProvider provider = getStateProvider(liveNodes, refs);
+        RecordingCloudSolrClient client = new 
RecordingCloudSolrClient(provider, 2)) {
+      AtomicInteger sendCount = new AtomicInteger();
+      client.setDefaultInvocation(
+          (req, cols) -> {
+            if (sendCount.incrementAndGet() <= 2) {
+              throw new SolrException(SolrException.ErrorCode.INVALID_STATE, 
"stale");
+            }
+            return null;
+          });
+
+      DummyRequest request = new DummyRequest(collName);
+      ExecutorService executor =
+          ExecutorUtil.newMDCAwareFixedThreadPool(
+              2, new 
SolrNamedThreadFactory("CloudSolrClientCacheTest-parallel"));
+      try {
+        Future<NamedList<Object>> first = executor.submit(() -> 
client.request(request, collName));
+        Future<NamedList<Object>> second = executor.submit(() -> 
client.request(request, collName));
+
+        assertTrue(
+            "Refresh should start within timeout", refreshStarted.await(30, 
TimeUnit.SECONDS));
+        assertEquals("Only one refresh should be in flight", 1, refGets.get());
+        releaseRefresh.countDown();
+
+        NamedList<Object> firstResp = first.get(30, TimeUnit.SECONDS);
+        NamedList<Object> secondResp = second.get(30, TimeUnit.SECONDS);
+
+        assertNotNull(firstResp);
+        assertNotNull(secondResp);
+      } finally {
+        ExecutorUtil.shutdownAndAwaitTermination(executor);
+      }
     }
   }
 
@@ -159,7 +343,234 @@ public class CloudSolrClientCacheTest extends 
SolrTestCaseJ4 {
     };
   }
 
-  private String coll1State =
+  private DocCollection loadCollection(String collection, int version) throws 
Exception {
+    ClusterState state =
+        ClusterState.createFromJson(
+            version, COLL1_STATE.getBytes(UTF_8), Collections.emptySet(), 
Instant.now(), null);
+    return state.getCollectionOrNull(collection);
+  }
+
+  private static class RecordingCloudSolrClient extends CloudSolrClient 
implements AutoCloseable {
+    private final ClusterStateProvider provider;
+    private final ConcurrentLinkedQueue<Invocation> invocations = new 
ConcurrentLinkedQueue<>();
+    private volatile Invocation defaultInvocation;
+    private final List<String> stateHistory = Collections.synchronizedList(new 
ArrayList<>());
+    private final NamedList<Object> okResponse;
+
+    RecordingCloudSolrClient(ClusterStateProvider provider, int 
refreshThreads) {
+      this(provider, true, true, false, refreshThreads);
+    }
+
+    RecordingCloudSolrClient(
+        ClusterStateProvider provider,
+        boolean updatesToLeaders,
+        boolean parallelUpdates,
+        boolean directUpdatesToLeadersOnly,
+        int refreshThreads) {
+      super(updatesToLeaders, parallelUpdates, directUpdatesToLeadersOnly, 
refreshThreads);
+      this.provider = provider;
+      NamedList<Object> header = new NamedList<>();
+      header.add("status", 0);
+      okResponse = new NamedList<>();
+      okResponse.add("responseHeader", header);
+    }
+
+    void enqueue(Invocation invocation) {
+      invocations.add(invocation);
+    }
+
+    void setDefaultInvocation(Invocation invocation) {
+      this.defaultInvocation = invocation;
+    }
+
+    List<String> getStateVersionHistory() {
+      synchronized (stateHistory) {
+        return new ArrayList<>(stateHistory);
+      }
+    }
+
+    @Override
+    protected NamedList<Object> sendRequest(SolrRequest<?> request, 
List<String> inputCollections)
+        throws SolrServerException, IOException {
+      String stateParam =
+          request.getParams() == null ? null : 
request.getParams().get(STATE_VERSION);
+      stateHistory.add(stateParam);
+      Invocation invocation = invocations.poll();
+      if (invocation == null) {
+        invocation = defaultInvocation;
+      }
+      if (invocation == null) {
+        return okResponse;
+      }
+      try {
+        NamedList<Object> rsp = invocation.invoke(request, inputCollections);
+        return rsp == null ? okResponse : rsp;
+      } catch (SolrServerException | IOException | SolrException e) {
+        throw e;
+      } catch (Exception e) {
+        throw new SolrServerException(e);
+      }
+    }
+
+    @Override
+    protected LBSolrClient getLbClient() {
+      throw new UnsupportedOperationException("LB client not used in test 
harness");
+    }
+
+    @Override
+    public ClusterStateProvider getClusterStateProvider() {
+      return provider;
+    }
+
+    @FunctionalInterface
+    interface Invocation {
+      NamedList<Object> invoke(SolrRequest<?> request, List<String> 
inputCollections)
+          throws Exception;
+    }
+  }
+
+  private static class DummyRequest extends SolrRequest<NamedList<Object>> {
+    private final ModifiableSolrParams params = new ModifiableSolrParams();
+    private final String collection;
+
+    DummyRequest(String collection) {
+      super(METHOD.GET, "/dummy", SolrRequestType.UNSPECIFIED);
+      this.collection = collection;
+    }
+
+    @Override
+    public ModifiableSolrParams getParams() {
+      return params;
+    }
+
+    @Override
+    public Collection<ContentStream> getContentStreams() {
+      return null;
+    }
+
+    @Override
+    protected NamedList<Object> createResponse(NamedList<Object> namedList) {
+      return namedList;
+    }
+
+    @Override
+    public boolean requiresCollection() {
+      return true;
+    }
+
+    @Override
+    public String getCollection() {
+      return collection;
+    }
+
+    @Override
+    public SolrRequestType getRequestType() {
+      return SolrRequestType.UNSPECIFIED;
+    }
+  }
+
+  private static class DummyUpdateRequest extends DummyRequest {
+    DummyUpdateRequest(String collection) {
+      super(collection);
+    }
+
+    @Override
+    public SolrRequestType getRequestType() {
+      return SolrRequestType.UPDATE;
+    }
+  }
+
+  private static class TestCollectionRef extends ClusterState.CollectionRef {
+    private final Supplier<DocCollection> supplier;
+    private final AtomicInteger counter;
+    private final CountDownLatch phaseOneReady; // Signals COUNT=phaseOneCount 
reached
+    private final CountDownLatch phaseOneProceed; // Test signals OK to proceed
+    private final CountDownLatch phaseTwoStarted; // Signals 
COUNT=phaseTwoCount blocked
+    private final CountDownLatch phaseTwoProceed; // Test signals OK to 
complete
+    private final int phaseOneCount;
+    private final int phaseTwoCount;
+    private final AtomicBoolean phaseOneTriggered = new AtomicBoolean(false);
+    private final AtomicBoolean phaseTwoTriggered = new AtomicBoolean(false);
+
+    // Two-phase constructor for precise control over async refresh ordering
+    TestCollectionRef(
+        Supplier<DocCollection> supplier,
+        AtomicInteger counter,
+        CountDownLatch phaseOneReady,
+        CountDownLatch phaseOneProceed,
+        CountDownLatch phaseTwoStarted,
+        CountDownLatch phaseTwoProceed,
+        int phaseOneCount,
+        int phaseTwoCount) {
+      super(null);
+      this.supplier = supplier;
+      this.counter = counter;
+      this.phaseOneReady = phaseOneReady;
+      this.phaseOneProceed = phaseOneProceed;
+      this.phaseTwoStarted = phaseTwoStarted;
+      this.phaseTwoProceed = phaseTwoProceed;
+      this.phaseOneCount = phaseOneCount;
+      this.phaseTwoCount = phaseTwoCount;
+    }
+
+    // Backward-compatible single-block constructor for existing tests
+    TestCollectionRef(
+        Supplier<DocCollection> supplier,
+        AtomicInteger counter,
+        CountDownLatch startLatch,
+        CountDownLatch waitLatch,
+        int blockAtCount) {
+      this(supplier, counter, null, null, startLatch, waitLatch, -1, 
blockAtCount);
+    }
+
+    @Override
+    public boolean isLazilyLoaded() {
+      return true;
+    }
+
+    @Override
+    public DocCollection get() {
+      int count = counter.incrementAndGet();
+
+      // Phase 1: Sync point to control async refresh completion timing
+      if (phaseOneCount > 0
+          && count == phaseOneCount
+          && phaseOneTriggered.compareAndSet(false, true)) {
+        if (phaseOneReady != null) {
+          phaseOneReady.countDown();
+        }
+        if (phaseOneProceed != null) {
+          try {
+            phaseOneProceed.await();
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          }
+        }
+      }
+
+      // Phase 2: Block for test verification
+      if (phaseTwoCount > 0
+          && count == phaseTwoCount
+          && phaseTwoTriggered.compareAndSet(false, true)) {
+        if (phaseTwoStarted != null) {
+          phaseTwoStarted.countDown();
+        }
+        if (phaseTwoProceed != null) {
+          try {
+            phaseTwoProceed.await();
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          }
+        }
+      }
+
+      return supplier.get();
+    }
+  }
+
+  private static final String COLL1_STATE =
       "{'gettingstarted':{\n"
           + "    'replicationFactor':'2',\n"
           + "    'router':{'name':'compositeId'},\n"


Reply via email to