This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch branch_10_0
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_10_0 by this push:
new 8c4eca8db20 SOLR-17947: CloudSolrClient refreshes collection state
asynchronously using a dedicated thread pool to reduce ZooKeeper blocking under
load. (#3962)
8c4eca8db20 is described below
commit 8c4eca8db20b461129856b30dc07d179ee1f2439
Author: Mark Robert Miller <[email protected]>
AuthorDate: Wed Dec 17 15:48:29 2025 -0600
SOLR-17947: CloudSolrClient refreshes collection state asynchronously using
a dedicated thread pool to reduce ZooKeeper blocking under load. (#3962)
---
...R-17947-cloudsolrclient async state refresh.yml | 8 +
.../client/solrj/impl/CloudHttp2SolrClient.java | 10 +-
.../solr/client/solrj/impl/CloudSolrClient.java | 305 ++++++++++++---
.../solrj/impl/CloudSolrClientCacheTest.java | 417 ++++++++++++++++++++-
4 files changed, 674 insertions(+), 66 deletions(-)
diff --git a/changelog/unreleased/SOLR-17947-cloudsolrclient async state
refresh.yml b/changelog/unreleased/SOLR-17947-cloudsolrclient async state
refresh.yml
new file mode 100644
index 00000000000..da7aa452ee1
--- /dev/null
+++ b/changelog/unreleased/SOLR-17947-cloudsolrclient async state refresh.yml
@@ -0,0 +1,8 @@
+title: CloudSolrClient now refreshes collection state asynchronously using a
dedicated
+ thread pool, reducing ZooKeeper blocking and improving performance under
load.
+type: changed
+authors:
+- name: Mark Miller
+links:
+- name: SOLR-17947
+ url: https://issues.apache.org/jira/browse/SOLR-17947
diff --git
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
index 1eb61d4c9c5..72cfd2d042e 100644
---
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
+++
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
@@ -44,7 +44,11 @@ public class CloudHttp2SolrClient extends CloudSolrClient {
* @param builder a {@link CloudSolrClient.Builder} with the options used to
create the client.
*/
protected CloudHttp2SolrClient(Builder builder) {
- super(builder.shardLeadersOnly, builder.parallelUpdates,
builder.directUpdatesToLeadersOnly);
+ super(
+ builder.shardLeadersOnly,
+ builder.parallelUpdates,
+ builder.directUpdatesToLeadersOnly,
+ builder.parallelCacheRefreshesLocks);
this.clientIsInternal = builder.httpClient == null;
try {
this.myClient = builder.createOrGetHttpClient();
@@ -65,10 +69,6 @@ public class CloudHttp2SolrClient extends CloudSolrClient {
this.collectionStateCache.timeToLiveMs =
TimeUnit.MILLISECONDS.convert(builder.timeToLiveSeconds,
TimeUnit.SECONDS);
-
- // If caches are expired then they are refreshed after acquiring a lock.
Set the number of
- // locks.
- this.locks = objectList(builder.parallelCacheRefreshesLocks);
}
private ClusterStateProvider createClusterStateProvider(Builder builder) {
diff --git
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 6eabbf6ddb0..c9199fdafff 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -37,10 +37,12 @@ import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
@@ -74,7 +76,6 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.CollectionUtil;
import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.SolrNamedThreadFactory;
@@ -97,6 +98,7 @@ public abstract class CloudSolrClient extends SolrClient {
// no of times collection state to be reloaded if stale state error is
received
private static final int MAX_STALE_RETRIES =
Integer.parseInt(System.getProperty("solr.solrj.cloud.max.stale.retries", "5"));
+ static final int DEFAULT_STATE_REFRESH_PARALLELISM = 5;
private final Random rand = new Random();
private final boolean updatesToLeaders;
@@ -125,7 +127,11 @@ public abstract class CloudSolrClient extends SolrClient {
// UpdateParams.ROLLBACK
);
- protected volatile Object[] locks = objectList(3);
+ private final ConcurrentHashMap<String, CompletableFuture<DocCollection>>
collectionRefreshes =
+ new ConcurrentHashMap<>();
+ private final Semaphore stateRefreshSemaphore;
+ private final int stateRefreshParallelism;
+ private volatile boolean closed;
/**
* Constructs {@link CloudSolrClient} instances from provided configuration.
It will use a Jetty
@@ -175,7 +181,7 @@ public abstract class CloudSolrClient extends SolrClient {
protected String defaultCollection;
protected long timeToLiveSeconds = 60;
- protected int parallelCacheRefreshesLocks = 3;
+ protected int parallelCacheRefreshesLocks =
DEFAULT_STATE_REFRESH_PARALLELISM;
protected int zkConnectTimeout =
SolrZkClientTimeout.DEFAULT_ZK_CONNECT_TIMEOUT;
protected int zkClientTimeout =
SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT;
protected boolean canUseZkACLs = true;
@@ -323,10 +329,10 @@ public abstract class CloudSolrClient extends SolrClient {
}
/**
- * When caches are expired then they are refreshed after acquiring a lock.
Use this to set the
- * number of locks.
+ * Configures how many collection state refresh operations may run in
parallel using a dedicated
+ * thread pool. This controls the maximum number of concurrent
ZooKeeper/cluster state lookups.
*
- * <p>Defaults to 3.
+ * <p>Defaults to 5.
*/
public Builder withParallelCacheRefreshes(int parallelCacheRefreshesLocks)
{
this.parallelCacheRefreshesLocks = parallelCacheRefreshesLocks;
@@ -512,6 +518,10 @@ public abstract class CloudSolrClient extends SolrClient {
return val;
}
+ ExpiringCachedDocCollection peek(Object key) {
+ return super.get(key);
+ }
+
@Override
public ExpiringCachedDocCollection put(String key,
ExpiringCachedDocCollection value) {
puts.incrementAndGet();
@@ -565,14 +575,46 @@ public abstract class CloudSolrClient extends SolrClient {
void setRetriedAt() {
retriedAtNano = System.nanoTime();
}
+
+ /**
+ * Marks this entry as {@code maybeStale} if the provided backoff window
has elapsed since the
+ * last retry.
+ *
+ * @return {@code true} if the entry was flagged as maybe stale
+ */
+ boolean markMaybeStaleIfOutsideBackoff(long retryBackoffNano) {
+ if (maybeStale) {
+ return true;
+ }
+ long lastRetry = retriedAtNano;
+ if (lastRetry != -1 && (System.nanoTime() - lastRetry) <=
retryBackoffNano) {
+ return false;
+ }
+ maybeStale = true;
+ return true;
+ }
}
protected CloudSolrClient(
boolean updatesToLeaders, boolean parallelUpdates, boolean
directUpdatesToLeadersOnly) {
+ this(
+ updatesToLeaders,
+ parallelUpdates,
+ directUpdatesToLeadersOnly,
+ DEFAULT_STATE_REFRESH_PARALLELISM);
+ }
+
+ protected CloudSolrClient(
+ boolean updatesToLeaders,
+ boolean parallelUpdates,
+ boolean directUpdatesToLeadersOnly,
+ int stateRefreshThreads) {
this.updatesToLeaders = updatesToLeaders;
this.parallelUpdates = parallelUpdates;
this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
this.requestRLTGenerator = new RequestReplicaListTransformerGenerator();
+ this.stateRefreshParallelism = Math.max(1, stateRefreshThreads);
+ this.stateRefreshSemaphore = new Semaphore(this.stateRefreshParallelism);
}
protected abstract LBSolrClient getLbClient();
@@ -598,6 +640,8 @@ public abstract class CloudSolrClient extends SolrClient {
@Override
public void close() {
+ closed = true;
+ collectionRefreshes.clear();
if (this.threadPool != null && !ExecutorUtil.isShutdown(this.threadPool)) {
ExecutorUtil.shutdownAndAwaitTermination(this.threadPool);
this.threadPool = null;
@@ -1081,7 +1125,13 @@ public abstract class CloudSolrClient extends SolrClient
{
List<String> inputCollections =
collection == null ? Collections.emptyList() :
StrUtils.splitSmart(collection, ",", true);
- return requestWithRetryOnStaleState(request, 0, inputCollections);
+ return requestWithRetryOnStaleState(
+ request,
+ 0,
+ inputCollections,
+ /*skipStateVersion*/ false,
+ Map.of(),
+ /*waitedForRefresh*/ false);
}
/**
@@ -1090,7 +1140,12 @@ public abstract class CloudSolrClient extends SolrClient
{
* and retried.
*/
protected NamedList<Object> requestWithRetryOnStaleState(
- SolrRequest<?> request, int retryCount, List<String> inputCollections)
+ SolrRequest<?> request,
+ int retryCount,
+ List<String> inputCollections,
+ boolean skipStateVersion,
+ Map<String, CompletableFuture<DocCollection>> pendingRefreshes,
+ boolean waitedForRefresh)
throws SolrServerException, IOException {
// build up a _stateVer_ param to pass to the server containing all the
// external collection state versions involved in this request, which
allows
@@ -1139,7 +1194,7 @@ public abstract class CloudSolrClient extends SolrClient {
}
if (request.getParams() instanceof ModifiableSolrParams params) {
- if (stateVerParam != null) {
+ if (!skipStateVersion && stateVerParam != null) {
params.set(STATE_VERSION, stateVerParam);
} else {
params.remove(STATE_VERSION);
@@ -1201,9 +1256,21 @@ public abstract class CloudSolrClient extends SolrClient
{
// in retryExpiryTime time
if (requestedCollections != null) {
for (DocCollection ext : requestedCollections) {
- ExpiringCachedDocCollection cacheEntry =
collectionStateCache.get(ext.getName());
- if (cacheEntry == null) continue;
- cacheEntry.maybeStale = true;
+ String name = ext.getName();
+ ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(name);
+ if (cacheEntry != null) {
+ if (wasCommError) {
+ cacheEntry.maybeStale = true;
+ } else {
+ boolean markedStale =
+
cacheEntry.markMaybeStaleIfOutsideBackoff(retryExpiryTimeNano);
+ if (markedStale && cacheEntry.shouldRetry()) {
+ triggerCollectionRefresh(name);
+ }
+ }
+ } else {
+ triggerCollectionRefresh(name);
+ }
}
}
if (retryCount < MAX_STALE_RETRIES) { // if it is a communication
error , we must try again
@@ -1220,7 +1287,13 @@ public abstract class CloudSolrClient extends SolrClient
{
MAX_STALE_RETRIES,
wasCommError,
errorCode);
- return requestWithRetryOnStaleState(request, retryCount + 1,
inputCollections);
+ return requestWithRetryOnStaleState(
+ request,
+ retryCount + 1,
+ inputCollections,
+ skipStateVersion,
+ pendingRefreshes,
+ waitedForRefresh);
}
} else {
log.info("request was not communication error it seems");
@@ -1272,16 +1345,57 @@ public abstract class CloudSolrClient extends
SolrClient {
}
}
- if (requestedCollections != null) {
- requestedCollections.clear(); // done with this
- }
-
// if the state was stale, then we retry the request once with new state
pulled from Zk
if (stateWasStale) {
log.warn(
"Re-trying request to collection(s) {} after stale state error
from server.",
inputCollections);
- resp = requestWithRetryOnStaleState(request, retryCount + 1,
inputCollections);
+
+ Map<String, CompletableFuture<DocCollection>> refreshesToWaitFor =
pendingRefreshes;
+ if (!waitedForRefresh && (pendingRefreshes == null ||
pendingRefreshes.isEmpty())) {
+ refreshesToWaitFor = new HashMap<>();
+ for (DocCollection ext : requestedCollections) {
+ refreshesToWaitFor.put(ext.getName(),
triggerCollectionRefresh(ext.getName()));
+ }
+ }
+
+ // First retry without sending state versions so the server does not
immediately reject the
+ // request while we intentionally rely on stale routing (e.g., to
allow forwarding to a new
+ // leader) as the background refresh completes.
+ if (!skipStateVersion && !waitedForRefresh) {
+ resp =
+ requestWithRetryOnStaleState(
+ request,
+ retryCount + 1,
+ inputCollections,
+ /*skipStateVersion*/ true,
+ refreshesToWaitFor,
+ waitedForRefresh);
+ } else if (!waitedForRefresh
+ && refreshesToWaitFor != null
+ && !refreshesToWaitFor.isEmpty()) {
+ for (Map.Entry<String, CompletableFuture<DocCollection>> entry :
+ refreshesToWaitFor.entrySet()) {
+ waitForCollectionRefresh(entry.getKey(), entry.getValue());
+ }
+ resp =
+ requestWithRetryOnStaleState(
+ request,
+ retryCount + 1,
+ inputCollections,
+ /*skipStateVersion*/ false,
+ Map.of(),
+ /*waitedForRefresh*/ true);
+ } else {
+ resp =
+ requestWithRetryOnStaleState(
+ request,
+ retryCount + 1,
+ inputCollections,
+ /*skipStateVersion*/ false,
+ Map.of(),
+ /*waitedForRefresh*/ waitedForRefresh);
+ }
} else {
if (exc instanceof SolrException
|| exc instanceof SolrServerException
@@ -1291,6 +1405,10 @@ public abstract class CloudSolrClient extends SolrClient
{
throw new SolrServerException(rootCause);
}
}
+
+ if (requestedCollections != null) {
+ requestedCollections.clear(); // done with this
+ }
}
return resp;
@@ -1505,54 +1623,125 @@ public abstract class CloudSolrClient extends
SolrClient {
return directUpdatesToLeadersOnly;
}
- protected static Object[] objectList(int n) {
- Object[] l = new Object[n];
- for (int i = 0; i < n; i++) {
- l[i] = new Object();
- }
- return l;
+ /** Visible for tests so they can assert the configured refresh parallelism.
*/
+ protected int getStateRefreshParallelism() {
+ return stateRefreshParallelism;
}
protected DocCollection getDocCollection(String collection, Integer
expectedVersion)
throws SolrException {
- if (expectedVersion == null) expectedVersion = -1;
- if (collection == null) return null;
- ExpiringCachedDocCollection cacheEntry =
collectionStateCache.get(collection);
- DocCollection col = cacheEntry == null ? null : cacheEntry.cached;
- if (col != null) {
- if (expectedVersion <= col.getZNodeVersion() &&
!cacheEntry.shouldRetry()) return col;
- }
-
- Object[] locks = this.locks;
- int lockId =
- Math.abs(Hash.murmurhash3_x86_32(collection, 0, collection.length(),
0) % locks.length);
- final Object lock = locks[lockId];
- synchronized (lock) {
- /*we have waited for some time just check once again*/
- cacheEntry = collectionStateCache.get(collection);
- col = cacheEntry == null ? null : cacheEntry.cached;
- if (col != null) {
- if (expectedVersion <= col.getZNodeVersion() &&
!cacheEntry.shouldRetry()) return col;
- }
- ClusterState.CollectionRef ref = getCollectionRef(collection);
- if (ref == null) {
- // no such collection exists
- return null;
- }
- // We are going to fetch a new version
- // we MUST try to get a new version
- DocCollection fetchedCol = ref.get(); // this is a call to ZK
- if (fetchedCol == null) return null; // this collection no more exists
- if (col != null && fetchedCol.getZNodeVersion() ==
col.getZNodeVersion()) {
- cacheEntry.setRetriedAt(); // we retried and found that it is the same
version
- cacheEntry.maybeStale = false;
- } else {
- collectionStateCache.put(collection, new
ExpiringCachedDocCollection(fetchedCol));
+ if (expectedVersion == null) {
+ expectedVersion = -1;
+ }
+ if (collection == null) {
+ return null;
+ }
+
+ ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(collection);
+ if (cacheEntry != null &&
cacheEntry.isExpired(collectionStateCache.timeToLiveMs)) {
+ collectionStateCache.remove(collection, cacheEntry);
+ cacheEntry = null;
+ }
+
+ DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
+
+ if (cacheEntry != null && cacheEntry.shouldRetry()) {
+ triggerCollectionRefresh(collection);
+ }
+
+ if (cached != null && expectedVersion <= cached.getZNodeVersion()) {
+ return cached;
+ }
+
+ CompletableFuture<DocCollection> refreshFuture =
triggerCollectionRefresh(collection);
+ return waitForCollectionRefresh(collection, refreshFuture);
+ }
+
+ private CompletableFuture<DocCollection> triggerCollectionRefresh(String
collection) {
+ if (closed) {
+ ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(collection);
+ DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
+ return CompletableFuture.completedFuture(cached);
+ }
+ return collectionRefreshes.computeIfAbsent(
+ collection,
+ key -> {
+ ExecutorService executor = threadPool;
+ CompletableFuture<DocCollection> future;
+ if (executor == null || ExecutorUtil.isShutdown(executor)) {
+ future = new CompletableFuture<>();
+ try {
+ future.complete(loadDocCollection(key));
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ } else {
+ future =
+ CompletableFuture.supplyAsync(
+ () -> {
+ stateRefreshSemaphore.acquireUninterruptibly();
+ try {
+ return loadDocCollection(key);
+ } finally {
+ stateRefreshSemaphore.release();
+ }
+ },
+ executor);
+ }
+ future.whenCompleteAsync(
+ (result, error) -> {
+ collectionRefreshes.remove(key, future);
+ });
+ return future;
+ });
+ }
+
+ private DocCollection waitForCollectionRefresh(
+ String collection, CompletableFuture<DocCollection> refreshFuture) {
+ try {
+ return refreshFuture.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ "Interrupted while refreshing state for collection " + collection,
+ e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SolrException) {
+ throw (SolrException) cause;
}
- return fetchedCol;
+ throw new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ "Error refreshing state for collection " + collection,
+ cause);
}
}
+ private DocCollection loadDocCollection(String collection) {
+ ClusterState.CollectionRef ref = getCollectionRef(collection);
+ if (ref == null) {
+ collectionStateCache.remove(collection);
+ return null;
+ }
+
+ DocCollection fetchedCol = ref.get();
+ if (fetchedCol == null) {
+ collectionStateCache.remove(collection);
+ return null;
+ }
+
+ ExpiringCachedDocCollection existing =
collectionStateCache.peek(collection);
+ if (existing != null && existing.cached.getZNodeVersion() ==
fetchedCol.getZNodeVersion()) {
+ existing.setRetriedAt();
+ existing.maybeStale = false;
+ return existing.cached;
+ }
+
+ collectionStateCache.put(collection, new
ExpiringCachedDocCollection(fetchedCol));
+ return fetchedCol;
+ }
+
ClusterState.CollectionRef getCollectionRef(String collection) {
return getClusterStateProvider().getState(collection);
}
diff --git
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
index dca6757b809..d2a864f4a54 100644
---
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
+++
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
@@ -22,24 +22,42 @@ import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketException;
import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.http.NoHttpResponseException;
import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.apache.LBHttpSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.junit.BeforeClass;
public class CloudSolrClientCacheTest extends SolrTestCaseJ4 {
@@ -88,7 +106,7 @@ public class CloudSolrClientCacheTest extends SolrTestCaseJ4
{
livenodes.addAll(Set.of("192.168.1.108:7574_solr",
"192.168.1.108:8983_solr"));
ClusterState cs =
ClusterState.createFromJson(
- 1, coll1State.getBytes(UTF_8), Collections.emptySet(),
Instant.now(), null);
+ 1, COLL1_STATE.getBytes(UTF_8), Collections.emptySet(),
Instant.now(), null);
refs.put(collName, new Ref(collName));
colls.put(collName, cs.getCollectionOrNull(collName));
responses.put(
@@ -109,7 +127,173 @@ public class CloudSolrClientCacheTest extends
SolrTestCaseJ4 {
UpdateRequest update = new UpdateRequest().add("id", "123", "desc",
"Something 0");
cloudClient.request(update, collName);
- assertEquals(2, refs.get(collName).getCount());
+ // Async refresh with deduplication means rapid retries can share the
same Future.
+ // Race: sometimes async completes fast enough for 2 fetches, sometimes
only 1.
+ int fetchCount = refs.get(collName).getCount();
+ assertTrue("Expected 1 or 2 fetches, got " + fetchCount, fetchCount >= 1
&& fetchCount <= 2);
+ }
+ }
+
+ public void testStaleStateRetrySkipsStateVersionBeforeWait() throws
Exception {
+ String collName = "gettingstarted";
+ Set<String> liveNodes = new HashSet<>(Set.of("192.168.1.108:8983_solr"));
+ AtomicInteger refGets = new AtomicInteger();
+ AtomicReference<DocCollection> currentDoc = new
AtomicReference<>(loadCollection(collName, 1));
+ Map<String, ClusterState.CollectionRef> refs =
+ Map.of(collName, new TestCollectionRef(currentDoc::get, refGets, null,
null, -1));
+ try (ClusterStateProvider provider = getStateProvider(liveNodes, refs);
+ RecordingCloudSolrClient client = new
RecordingCloudSolrClient(provider, 3)) {
+ client.enqueue(
+ (req, cols) -> {
+ throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
"stale");
+ });
+ client.enqueue((req, cols) -> null);
+
+ DummyRequest request = new DummyRequest(collName);
+ NamedList<Object> resp = client.request(request, collName);
+ assertNotNull(resp);
+
+ List<String> history = client.getStateVersionHistory();
+ assertEquals(2, history.size());
+ assertTrue(history.get(0).startsWith(collName + ":"));
+ assertNull("Second attempt should skip _stateVer_", history.get(1));
+ assertTrue("Expected refresh to be triggered", refGets.get() >= 1);
+ }
+ }
+
+ public void testDirectUpdatesToLeadersSkipStateVersionBeforeWait() throws
Exception {
+ String collName = "gettingstarted";
+ Set<String> liveNodes = new HashSet<>(Set.of("192.168.1.108:8983_solr"));
+ AtomicInteger refGets = new AtomicInteger();
+ AtomicReference<DocCollection> currentDoc = new
AtomicReference<>(loadCollection(collName, 1));
+ Map<String, ClusterState.CollectionRef> refs =
+ Map.of(collName, new TestCollectionRef(currentDoc::get, refGets, null,
null, -1));
+ try (ClusterStateProvider provider = getStateProvider(liveNodes, refs);
+ RecordingCloudSolrClient client =
+ new RecordingCloudSolrClient(provider, true, true, true, 3)) {
+ client.enqueue(
+ (req, cols) -> {
+ throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
"stale");
+ });
+ client.enqueue((req, cols) -> null);
+
+ DummyUpdateRequest request = new DummyUpdateRequest(collName);
+ NamedList<Object> resp = client.request(request, collName);
+ assertNotNull(resp);
+
+ List<String> history = client.getStateVersionHistory();
+ assertEquals(2, history.size());
+ assertTrue(history.get(0).startsWith(collName + ":"));
+ assertNull(history.get(1));
+ assertTrue(refGets.get() >= 1);
+ }
+ }
+
+ public void testStaleStateRetryWaitsAfterSkipFailure() throws Exception {
+ String collName = "gettingstarted";
+ AtomicReference<DocCollection> currentDoc = new
AtomicReference<>(loadCollection(collName, 1));
+
+ // Track when refresh is triggered to ensure the async refresh mechanism
is used
+ AtomicInteger refGets = new AtomicInteger();
+ TestCollectionRef ref = new TestCollectionRef(currentDoc::get, refGets,
null, null, -1);
+ Map<String, ClusterState.CollectionRef> refs = Map.of(collName, ref);
+ Set<String> liveNodes = new HashSet<>(Set.of("192.168.1.108:8983_solr"));
+
+ try (ClusterStateProvider provider = getStateProvider(liveNodes, refs);
+ RecordingCloudSolrClient client = new
RecordingCloudSolrClient(provider, 2)) {
+ // First attempt: returns stale error, triggers skipStateVersion retry
+ client.enqueue(
+ (req, cols) -> {
+ throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
"stale-first");
+ });
+ // Second attempt (skipStateVersion retry): also returns stale error
+ client.enqueue(
+ (req, cols) -> {
+ throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
"stale-second");
+ });
+ // Third attempt (after waiting for refresh): succeeds
+ client.enqueue((req, cols) -> null);
+
+ DummyRequest request = new DummyRequest(collName);
+ NamedList<Object> resp = client.request(request, collName);
+ assertNotNull(resp);
+
+ // Verify the retry sequence:
+ // 1. First attempt with state version
+ // 2. skipStateVersion retry without state version
+ // 3. Final retry with refreshed state version
+ List<String> history = client.getStateVersionHistory();
+ assertEquals("Should have 3 attempts", 3, history.size());
+ assertTrue(
+ "First attempt should have state param",
history.get(0).startsWith(collName + ":"));
+ assertNull("skipStateVersion attempt should NOT have state param",
history.get(1));
+ assertTrue(
+ "Final attempt should have state param after refresh",
+ history.get(2).startsWith(collName + ":"));
+
+ // Verify refresh was triggered (at least initial load + refresh after
stale errors)
+ assertTrue("Refresh should have been called", refGets.get() >= 2);
+ }
+ }
+
+ public void testStateRefreshThreadsConfiguredViaBuilder() throws Exception {
+ String collName = "gettingstarted";
+ AtomicReference<DocCollection> currentDoc = new
AtomicReference<>(loadCollection(collName, 1));
+ Map<String, ClusterState.CollectionRef> refs =
+ Map.of(
+ collName, new TestCollectionRef(currentDoc::get, new
AtomicInteger(), null, null, -1));
+ Set<String> liveNodes = new HashSet<>(Set.of("192.168.1.108:8983_solr"));
+
+ try (ClusterStateProvider provider = getStateProvider(liveNodes, refs);
+ RecordingCloudSolrClient client = new
RecordingCloudSolrClient(provider, 7)) {
+ assertEquals(7, client.getStateRefreshParallelism());
+ }
+ }
+
+ public void testConcurrentRefreshIsDeduplicated() throws Exception {
+ String collName = "gettingstarted";
+ AtomicReference<DocCollection> currentDoc = new
AtomicReference<>(loadCollection(collName, 1));
+ AtomicInteger refGets = new AtomicInteger();
+ CountDownLatch refreshStarted = new CountDownLatch(1);
+ CountDownLatch releaseRefresh = new CountDownLatch(1);
+ Map<String, ClusterState.CollectionRef> refs =
+ Map.of(
+ collName,
+ new TestCollectionRef(currentDoc::get, refGets, refreshStarted,
releaseRefresh, 1));
+ Set<String> liveNodes = new HashSet<>(Set.of("192.168.1.108:8983_solr"));
+
+ try (ClusterStateProvider provider = getStateProvider(liveNodes, refs);
+ RecordingCloudSolrClient client = new
RecordingCloudSolrClient(provider, 2)) {
+ AtomicInteger sendCount = new AtomicInteger();
+ client.setDefaultInvocation(
+ (req, cols) -> {
+ if (sendCount.incrementAndGet() <= 2) {
+ throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
"stale");
+ }
+ return null;
+ });
+
+ DummyRequest request = new DummyRequest(collName);
+ ExecutorService executor =
+ ExecutorUtil.newMDCAwareFixedThreadPool(
+ 2, new
SolrNamedThreadFactory("CloudSolrClientCacheTest-parallel"));
+ try {
+ Future<NamedList<Object>> first = executor.submit(() ->
client.request(request, collName));
+ Future<NamedList<Object>> second = executor.submit(() ->
client.request(request, collName));
+
+ assertTrue(
+ "Refresh should start within timeout", refreshStarted.await(30,
TimeUnit.SECONDS));
+ assertEquals("Only one refresh should be in flight", 1, refGets.get());
+ releaseRefresh.countDown();
+
+ NamedList<Object> firstResp = first.get(30, TimeUnit.SECONDS);
+ NamedList<Object> secondResp = second.get(30, TimeUnit.SECONDS);
+
+ assertNotNull(firstResp);
+ assertNotNull(secondResp);
+ } finally {
+ ExecutorUtil.shutdownAndAwaitTermination(executor);
+ }
}
}
@@ -159,7 +343,234 @@ public class CloudSolrClientCacheTest extends
SolrTestCaseJ4 {
};
}
- private String coll1State =
+ private DocCollection loadCollection(String collection, int version) throws
Exception {
+ ClusterState state =
+ ClusterState.createFromJson(
+ version, COLL1_STATE.getBytes(UTF_8), Collections.emptySet(),
Instant.now(), null);
+ return state.getCollectionOrNull(collection);
+ }
+
+ private static class RecordingCloudSolrClient extends CloudSolrClient
implements AutoCloseable {
+ private final ClusterStateProvider provider;
+ private final ConcurrentLinkedQueue<Invocation> invocations = new
ConcurrentLinkedQueue<>();
+ private volatile Invocation defaultInvocation;
+ private final List<String> stateHistory = Collections.synchronizedList(new
ArrayList<>());
+ private final NamedList<Object> okResponse;
+
+ RecordingCloudSolrClient(ClusterStateProvider provider, int
refreshThreads) {
+ this(provider, true, true, false, refreshThreads);
+ }
+
+ RecordingCloudSolrClient(
+ ClusterStateProvider provider,
+ boolean updatesToLeaders,
+ boolean parallelUpdates,
+ boolean directUpdatesToLeadersOnly,
+ int refreshThreads) {
+ super(updatesToLeaders, parallelUpdates, directUpdatesToLeadersOnly,
refreshThreads);
+ this.provider = provider;
+ NamedList<Object> header = new NamedList<>();
+ header.add("status", 0);
+ okResponse = new NamedList<>();
+ okResponse.add("responseHeader", header);
+ }
+
+ void enqueue(Invocation invocation) {
+ invocations.add(invocation);
+ }
+
+ void setDefaultInvocation(Invocation invocation) {
+ this.defaultInvocation = invocation;
+ }
+
+ List<String> getStateVersionHistory() {
+ synchronized (stateHistory) {
+ return new ArrayList<>(stateHistory);
+ }
+ }
+
+ @Override
+ protected NamedList<Object> sendRequest(SolrRequest<?> request,
List<String> inputCollections)
+ throws SolrServerException, IOException {
+ String stateParam =
+ request.getParams() == null ? null :
request.getParams().get(STATE_VERSION);
+ stateHistory.add(stateParam);
+ Invocation invocation = invocations.poll();
+ if (invocation == null) {
+ invocation = defaultInvocation;
+ }
+ if (invocation == null) {
+ return okResponse;
+ }
+ try {
+ NamedList<Object> rsp = invocation.invoke(request, inputCollections);
+ return rsp == null ? okResponse : rsp;
+ } catch (SolrServerException | IOException | SolrException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SolrServerException(e);
+ }
+ }
+
+ @Override
+ protected LBSolrClient getLbClient() {
+ throw new UnsupportedOperationException("LB client not used in test
harness");
+ }
+
+ @Override
+ public ClusterStateProvider getClusterStateProvider() {
+ return provider;
+ }
+
+ @FunctionalInterface
+ interface Invocation {
+ NamedList<Object> invoke(SolrRequest<?> request, List<String>
inputCollections)
+ throws Exception;
+ }
+ }
+
+ private static class DummyRequest extends SolrRequest<NamedList<Object>> {
+ private final ModifiableSolrParams params = new ModifiableSolrParams();
+ private final String collection;
+
+ DummyRequest(String collection) {
+ super(METHOD.GET, "/dummy", SolrRequestType.UNSPECIFIED);
+ this.collection = collection;
+ }
+
+ @Override
+ public ModifiableSolrParams getParams() {
+ return params;
+ }
+
+ @Override
+ public Collection<ContentStream> getContentStreams() {
+ return null;
+ }
+
+ @Override
+ protected NamedList<Object> createResponse(NamedList<Object> namedList) {
+ return namedList;
+ }
+
+ @Override
+ public boolean requiresCollection() {
+ return true;
+ }
+
+ @Override
+ public String getCollection() {
+ return collection;
+ }
+
+ @Override
+ public SolrRequestType getRequestType() {
+ return SolrRequestType.UNSPECIFIED;
+ }
+ }
+
+ private static class DummyUpdateRequest extends DummyRequest {
+ DummyUpdateRequest(String collection) {
+ super(collection);
+ }
+
+ @Override
+ public SolrRequestType getRequestType() {
+ return SolrRequestType.UPDATE;
+ }
+ }
+
+ private static class TestCollectionRef extends ClusterState.CollectionRef {
+ private final Supplier<DocCollection> supplier;
+ private final AtomicInteger counter;
+ private final CountDownLatch phaseOneReady; // Signals COUNT=phaseOneCount
reached
+ private final CountDownLatch phaseOneProceed; // Test signals OK to proceed
+ private final CountDownLatch phaseTwoStarted; // Signals
COUNT=phaseTwoCount blocked
+ private final CountDownLatch phaseTwoProceed; // Test signals OK to
complete
+ private final int phaseOneCount;
+ private final int phaseTwoCount;
+ private final AtomicBoolean phaseOneTriggered = new AtomicBoolean(false);
+ private final AtomicBoolean phaseTwoTriggered = new AtomicBoolean(false);
+
+ // Two-phase constructor for precise control over async refresh ordering
+ TestCollectionRef(
+ Supplier<DocCollection> supplier,
+ AtomicInteger counter,
+ CountDownLatch phaseOneReady,
+ CountDownLatch phaseOneProceed,
+ CountDownLatch phaseTwoStarted,
+ CountDownLatch phaseTwoProceed,
+ int phaseOneCount,
+ int phaseTwoCount) {
+ super(null);
+ this.supplier = supplier;
+ this.counter = counter;
+ this.phaseOneReady = phaseOneReady;
+ this.phaseOneProceed = phaseOneProceed;
+ this.phaseTwoStarted = phaseTwoStarted;
+ this.phaseTwoProceed = phaseTwoProceed;
+ this.phaseOneCount = phaseOneCount;
+ this.phaseTwoCount = phaseTwoCount;
+ }
+
+ // Backward-compatible single-block constructor for existing tests
+ TestCollectionRef(
+ Supplier<DocCollection> supplier,
+ AtomicInteger counter,
+ CountDownLatch startLatch,
+ CountDownLatch waitLatch,
+ int blockAtCount) {
+ this(supplier, counter, null, null, startLatch, waitLatch, -1,
blockAtCount);
+ }
+
+ @Override
+ public boolean isLazilyLoaded() {
+ return true;
+ }
+
+ @Override
+ public DocCollection get() {
+ int count = counter.incrementAndGet();
+
+ // Phase 1: Sync point to control async refresh completion timing
+ if (phaseOneCount > 0
+ && count == phaseOneCount
+ && phaseOneTriggered.compareAndSet(false, true)) {
+ if (phaseOneReady != null) {
+ phaseOneReady.countDown();
+ }
+ if (phaseOneProceed != null) {
+ try {
+ phaseOneProceed.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ // Phase 2: Block for test verification
+ if (phaseTwoCount > 0
+ && count == phaseTwoCount
+ && phaseTwoTriggered.compareAndSet(false, true)) {
+ if (phaseTwoStarted != null) {
+ phaseTwoStarted.countDown();
+ }
+ if (phaseTwoProceed != null) {
+ try {
+ phaseTwoProceed.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ return supplier.get();
+ }
+ }
+
+ private static final String COLL1_STATE =
"{'gettingstarted':{\n"
+ " 'replicationFactor':'2',\n"
+ " 'router':{'name':'compositeId'},\n"