This is an automated email from the ASF dual-hosted git repository.
houston pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new f7aae7b6b8d SOLR-17921: Prefetch liveNodes in
BaseHttpClusterStateProvider (#3673)
f7aae7b6b8d is described below
commit f7aae7b6b8dfe70506ab87a9f5e747a4cca13b10
Author: Houston Putman <[email protected]>
AuthorDate: Mon Sep 22 08:53:32 2025 -0700
SOLR-17921: Prefetch liveNodes in BaseHttpClusterStateProvider (#3673)
(cherry picked from commit 5c47042148d817cb0242ae24b2f540e0af2b05d1)
---
solr/CHANGES.txt | 3 +
.../solrj/impl/BaseHttpClusterStateProvider.java | 71 +++++++++++++++++-----
.../solr/client/solrj/impl/CloudSolrClient.java | 11 +---
.../solrj/impl/Http2ClusterStateProvider.java | 1 +
.../solrj/impl/HttpClusterStateProvider.java | 1 +
.../solrj/impl/ClusterStateProviderTest.java | 10 +++
6 files changed, 73 insertions(+), 24 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 49408e0df38..f8fc9da446f 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -20,6 +20,9 @@ Improvements
* SOLR-17641: Solr is now able to start on Java 24 and later, but with
Security Manager disabled (Houston Putman, Jan Høydahl)
+* SOLR-17921: SolrJ CloudSolrClient configured with a Solr URL (not ZK) now
refreshes liveNodes in the background.
+ This will reduce spikes in request latency when the cached liveNodes have
expired. (Houston Putman, David Smiley)
+
Optimizations
---------------------
(No changes)
diff --git
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
index b1477a0f45a..3f8310c85b7 100644
---
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
+++
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
@@ -30,6 +30,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrClient;
@@ -46,8 +48,8 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.CollectionUtil;
import org.apache.solr.common.util.EnvUtils;
import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.URLUtil;
-import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +59,7 @@ public abstract class BaseHttpClusterStateProvider implements
ClusterStateProvid
private String urlScheme;
private List<URL> configuredNodes;
volatile Set<String> liveNodes; // initially null then never null
- long liveNodesTimestamp = 0;
+ volatile long liveNodesTimestamp = 0;
volatile Map<String, List<String>> aliases;
volatile Map<String, Map<String, String>> aliasProperties;
long aliasesTimestamp = 0;
@@ -65,6 +67,11 @@ public abstract class BaseHttpClusterStateProvider
implements ClusterStateProvid
// the liveNodes and aliases cache will be invalidated after 5 secs
private int cacheTimeout =
EnvUtils.getPropertyAsInteger("solr.solrj.cache.timeout.sec", 5);
+ volatile boolean liveNodeReloadingScheduled = false;
+ private final ScheduledExecutorService liveNodeReloadingService =
+ Executors.newSingleThreadScheduledExecutor(
+ new SolrNamedThreadFactory("liveNodeReloadingExecutor"));
+
protected void initConfiguredNodes(List<String> solrUrls) throws Exception {
this.configuredNodes =
solrUrls.stream()
@@ -98,14 +105,14 @@ public abstract class BaseHttpClusterStateProvider
implements ClusterStateProvid
@Override
public ClusterState.CollectionRef getState(String collection) {
for (String nodeName : getLiveNodes()) {
- String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
+ String baseUrl = URLUtil.getBaseUrlForNodeName(nodeName, urlScheme);
try (SolrClient client = getSolrClient(baseUrl)) {
DocCollection docCollection = fetchCollectionState(client, collection);
return new ClusterState.CollectionRef(docCollection);
} catch (SolrServerException | IOException e) {
log.warn(
"Attempt to fetch cluster state from {} failed.",
- Utils.getBaseUrlForNodeName(nodeName, urlScheme),
+ URLUtil.getBaseUrlForNodeName(nodeName, urlScheme),
e);
} catch (RemoteSolrException e) {
if ("NOT_FOUND".equals(e.getMetadata("CLUSTERSTATUS"))) {
@@ -221,25 +228,50 @@ public abstract class BaseHttpClusterStateProvider
implements ClusterStateProvid
}
@Override
- public synchronized Set<String> getLiveNodes() {
- // synchronized because there's no value in multiple doing this at the
same time
+ public Set<String> getLiveNodes() {
+ // Use cached liveNodes if cached and still valid
+ if (liveNodes == null
+ || (TimeUnit.SECONDS.convert((System.nanoTime() - liveNodesTimestamp),
TimeUnit.NANOSECONDS)
+ > getCacheTimeout())) {
+ // Do synchronized fetch when there is no cached value or the cached
value is expired
+ fetchLiveNodes(false);
+ }
+ return liveNodes;
+ }
+
+ private synchronized void fetchLiveNodes(boolean force) {
+ if (!liveNodeReloadingScheduled) {
+ // Method is synchronized, so this is safe
+ liveNodeReloadingScheduled = true;
+ long liveNodeReloadDelayMs = (1000L * getCacheTimeout()) / 2;
+ liveNodeReloadingService.scheduleWithFixedDelay(
+ () -> fetchLiveNodes(true),
+ liveNodeReloadDelayMs,
+ liveNodeReloadDelayMs,
+ TimeUnit.MILLISECONDS);
+ }
// only in the initial state, liveNodes is null
if (liveNodes != null) {
- if (TimeUnit.SECONDS.convert((System.nanoTime() - liveNodesTimestamp),
TimeUnit.NANOSECONDS)
- <= getCacheTimeout()) {
- return this.liveNodes; // cached copy is fresh enough
+ // Don't use the cached value if force is enabled
+ if (!force
+ && TimeUnit.SECONDS.convert(
+ (System.nanoTime() - liveNodesTimestamp),
TimeUnit.NANOSECONDS)
+ <= getCacheTimeout()) {
+ // Check the cached value again, as the live nodes might have been
updated while blocked on
+ // synchronization
+ return; // cached copy is fresh enough
}
if (liveNodes.stream()
.map(node -> URLUtil.getBaseUrlForNodeName(node, urlScheme))
.map(BaseHttpClusterStateProvider::stringToUrl)
- .anyMatch(this::updateLiveNodes)) return this.liveNodes;
+ .anyMatch(this::updateLiveNodes)) return;
log.warn("Failed fetching live_nodes from {}. Trying configured
nodes...", liveNodes);
}
- if (configuredNodes.stream().anyMatch(this::updateLiveNodes)) return
this.liveNodes;
+ if (configuredNodes.stream().anyMatch(this::updateLiveNodes)) return;
throw new RuntimeException(
"Failed fetching live_nodes from "
@@ -288,9 +320,8 @@ public abstract class BaseHttpClusterStateProvider
implements ClusterStateProvid
|| TimeUnit.SECONDS.convert((System.nanoTime() - aliasesTimestamp),
TimeUnit.NANOSECONDS)
> getCacheTimeout()) {
for (String nodeName : getLiveNodes()) {
- String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
+ String baseUrl = URLUtil.getBaseUrlForNodeName(nodeName, urlScheme);
try (SolrClient client = getSolrClient(baseUrl)) {
-
CollectionAdminResponse response =
new CollectionAdminRequest.ListAliases().process(client);
this.aliases = response.getAliasesAsLists();
@@ -335,7 +366,7 @@ public abstract class BaseHttpClusterStateProvider
implements ClusterStateProvid
@Override
public ClusterState getClusterState() {
for (String nodeName : getLiveNodes()) {
- String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
+ String baseUrl = URLUtil.getBaseUrlForNodeName(nodeName, urlScheme);
try (SolrClient client = getSolrClient(baseUrl)) {
return fetchClusterState(client);
} catch (SolrServerException | RemoteSolrException | IOException e) {
@@ -361,7 +392,7 @@ public abstract class BaseHttpClusterStateProvider
implements ClusterStateProvid
@Override
public Map<String, Object> getClusterProperties() {
for (String nodeName : getLiveNodes()) {
- String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
+ String baseUrl = URLUtil.getBaseUrlForNodeName(nodeName, urlScheme);
try (SolrClient client = getSolrClient(baseUrl)) {
SimpleOrderedMap<?> cluster =
submitClusterStateRequest(client, null,
ClusterStateRequestType.FETCH_CLUSTER_PROP);
@@ -411,6 +442,16 @@ public abstract class BaseHttpClusterStateProvider
implements ClusterStateProvid
return String.join(",", this.liveNodes);
}
+ @Override
+ public boolean isClosed() {
+ return liveNodeReloadingService.isShutdown();
+ }
+
+ @Override
+ public void close() throws IOException {
+ liveNodeReloadingService.shutdown();
+ }
+
private enum ClusterStateRequestType {
FETCH_LIVE_NODES,
FETCH_CLUSTER_PROP,
diff --git
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index b6dbeccfddd..5fca4aef486 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -615,6 +615,7 @@ public abstract class CloudSolrClient extends SolrClient {
DocCollection col, ReplicaListTransformer replicaListTransformer) {
Map<String, List<String>> urlMap = new HashMap<>();
Slice[] slices = col.getActiveSlicesArr();
+ Set<String> liveNodes = getClusterStateProvider().getLiveNodes();
for (Slice slice : slices) {
String name = slice.getName();
List<Replica> sortedReplicas = new ArrayList<>();
@@ -622,9 +623,7 @@ public abstract class CloudSolrClient extends SolrClient {
if (directUpdatesToLeadersOnly && leader == null) {
for (Replica replica :
slice.getReplicas(
- replica ->
- replica.isActive(getClusterStateProvider().getLiveNodes())
- && replica.getType() == Replica.Type.NRT)) {
+ replica -> replica.isActive(liveNodes) && replica.getType() ==
Replica.Type.NRT)) {
leader = replica;
break;
}
@@ -846,8 +845,6 @@ public abstract class CloudSolrClient extends SolrClient {
protected NamedList<Object> requestWithRetryOnStaleState(
SolrRequest<?> request, int retryCount, List<String> inputCollections)
throws SolrServerException, IOException {
- connect(); // important to call this before you start working with the
ZkStateReader
-
// build up a _stateVer_ param to pass to the server containing all the
// external collection state versions involved in this request, which
allows
// the server to notify us that our cached state for one or more of the
external
@@ -1061,8 +1058,6 @@ public abstract class CloudSolrClient extends SolrClient {
protected NamedList<Object> sendRequest(SolrRequest<?> request, List<String>
inputCollections)
throws SolrServerException, IOException {
- connect();
-
boolean sendToLeaders = false;
if (request instanceof IsUpdateRequest) {
@@ -1353,8 +1348,6 @@ public abstract class CloudSolrClient extends SolrClient {
* will be only one shard in the return value.
*/
public Map<String, Integer> getShardReplicationFactor(String collection,
NamedList<?> resp) {
- connect();
-
Map<String, Integer> results = new HashMap<>();
if (resp instanceof RouteResponse) {
NamedList<NamedList<?>> routes = ((RouteResponse<?>)
resp).getRouteResponses();
diff --git
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2ClusterStateProvider.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2ClusterStateProvider.java
index a7abb80b71d..cb4226c58d5 100644
---
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2ClusterStateProvider.java
+++
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2ClusterStateProvider.java
@@ -37,6 +37,7 @@ public class Http2ClusterStateProvider extends
BaseHttpClusterStateProvider {
if (this.closeClient && this.httpClient != null) {
httpClient.close();
}
+ super.close();
}
@Override
diff --git
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
index 353c15fd1b7..2877184bcde 100644
---
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
+++
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
@@ -48,5 +48,6 @@ public class HttpClusterStateProvider extends
BaseHttpClusterStateProvider {
if (this.clientIsInternal && this.httpClient != null) {
HttpClientUtil.close(httpClient);
}
+ super.close();
}
}
diff --git
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ClusterStateProviderTest.java
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ClusterStateProviderTest.java
index 10ee17201d7..0650da8cafa 100644
---
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ClusterStateProviderTest.java
+++
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ClusterStateProviderTest.java
@@ -20,6 +20,7 @@ package org.apache.solr.client.solrj.impl;
import static org.apache.solr.common.util.URLUtil.getNodeNameForBaseUrl;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import java.io.IOException;
@@ -29,6 +30,7 @@ import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrServerException;
@@ -346,9 +348,17 @@ public class ClusterStateProviderTest extends
SolrCloudTestCase {
cluster.stopJettySolrRunner(jettyNode2);
waitForCSPCacheTimeout();
+ long startTimeNs = System.nanoTime();
actualKnownNodes = cspHttp.getLiveNodes();
+ long liveNodeFetchTimeMs =
+ TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNs,
TimeUnit.NANOSECONDS);
assertEquals(1, actualKnownNodes.size());
assertEquals(Set.of(nodeName3), actualKnownNodes);
+ // This should already be cached, because it is being updated in the
background
+ assertThat(
+ "Cached getLiveNodes() should take no more than 2 milliseconds",
+ liveNodeFetchTimeMs,
+ lessThanOrEqualTo(2L));
// Bring back a backup node and take down the new node
cluster.startJettySolrRunner(jettyNode2, true);