This is an automated email from the ASF dual-hosted git repository.
dsmiley pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 86bc6f29224 SOLR-14070: Deprecate CloudSolrClient ZkHost constructor
(#4533)
86bc6f29224 is described below
commit 86bc6f292245e0566511a9d9c3fb8aaba933e564
Author: David Smiley <[email protected]>
AuthorDate: Thu Jun 18 10:44:17 2026 -0400
SOLR-14070: Deprecate CloudSolrClient ZkHost constructor (#4533)
-- again; was accidentally un-deprecated for 10.0. Oops.
---
.../solr/client/solrj/impl/CloudSolrClient.java | 2935 ++++++++++----------
1 file changed, 1466 insertions(+), 1469 deletions(-)
diff --git
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 69e5c8d5809..b9b52442297 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -92,25 +92,12 @@ import org.slf4j.MDC;
*/
public abstract class CloudSolrClient extends SolrClient {
+ public static final String STATE_VERSION = "_stateVer_";
+ static final int DEFAULT_STATE_REFRESH_PARALLELISM = 5;
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
// no of times collection state to be reloaded if stale state error is
received
private static final int MAX_STALE_RETRIES =
Integer.parseInt(System.getProperty("solr.solrj.cloud.max.stale.retries", "5"));
- static final int DEFAULT_STATE_REFRESH_PARALLELISM = 5;
- private final Random rand = new Random();
-
- private final boolean updatesToLeaders;
- private final boolean directUpdatesToLeadersOnly;
- private final RequestReplicaListTransformerGenerator requestRLTGenerator;
- private final boolean parallelUpdates;
- private final ExecutorService threadPool =
- ExecutorUtil.newMDCAwareCachedThreadPool(
- new SolrNamedThreadFactory("CloudSolrClient ThreadPool"));
-
- public static final String STATE_VERSION = "_stateVer_";
- protected long retryExpiryTimeNano =
- TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS); // 3 seconds or 3
million nanos
private static final Set<String> NON_ROUTABLE_PARAMS =
Set.of(
UpdateParams.EXPUNGE_DELETES,
@@ -125,1713 +112,1672 @@ public abstract class CloudSolrClient extends
SolrClient {
// Not supported via SolrCloud
// UpdateParams.ROLLBACK
);
-
+ protected final StateCache collectionStateCache = new StateCache();
+ private final Random rand = new Random();
+ private final boolean updatesToLeaders;
+ private final boolean directUpdatesToLeadersOnly;
+ private final RequestReplicaListTransformerGenerator requestRLTGenerator;
+ private final boolean parallelUpdates;
+ private final ExecutorService threadPool =
+ ExecutorUtil.newMDCAwareCachedThreadPool(
+ new SolrNamedThreadFactory("CloudSolrClient ThreadPool"));
private final ConcurrentHashMap<String, CompletableFuture<DocCollection>>
collectionRefreshes =
new ConcurrentHashMap<>();
private final Semaphore stateRefreshSemaphore;
private final int stateRefreshParallelism;
+ protected long retryExpiryTimeNano =
+ TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS); // 3 seconds or 3
million nanos
private volatile boolean closed;
- /**
- * Constructs {@link CloudSolrClient} instances from provided configuration.
It will use a Jetty
- * based {@code HttpClient} if available, or will otherwise use the JDK.
- */
- public static class Builder {
+ protected CloudSolrClient(
+ boolean updatesToLeaders, boolean parallelUpdates, boolean
directUpdatesToLeadersOnly) {
+ this(
+ updatesToLeaders,
+ parallelUpdates,
+ directUpdatesToLeadersOnly,
+ DEFAULT_STATE_REFRESH_PARALLELISM);
+ }
- protected Collection<String> zkHosts = new ArrayList<>();
- protected List<String> solrUrls = new ArrayList<>();
- protected String zkChroot;
- protected HttpSolrClient httpClient;
- protected boolean shardLeadersOnly = true;
- protected boolean directUpdatesToLeadersOnly = false;
- protected boolean parallelUpdates = true;
- protected ClusterStateProvider stateProvider;
- protected HttpSolrClient.BuilderBase<?, ?> internalClientBuilder;
- protected RequestWriter requestWriter;
- protected ResponseParser responseParser;
- protected long retryExpiryTimeNano =
- TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS); // 3 seconds or 3
million nanos
+ protected CloudSolrClient(
+ boolean updatesToLeaders,
+ boolean parallelUpdates,
+ boolean directUpdatesToLeadersOnly,
+ int stateRefreshThreads) {
+ this.updatesToLeaders = updatesToLeaders;
+ this.parallelUpdates = parallelUpdates;
+ this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
+ this.requestRLTGenerator = new RequestReplicaListTransformerGenerator();
+ this.stateRefreshParallelism = Math.max(1, stateRefreshThreads);
+ this.stateRefreshSemaphore = new Semaphore(this.stateRefreshParallelism);
+ }
- protected String defaultCollection;
- protected long timeToLiveSeconds = 60;
- protected int parallelCacheRefreshesLocks =
DEFAULT_STATE_REFRESH_PARALLELISM;
- protected int zkConnectTimeout =
SolrZkClientTimeout.DEFAULT_ZK_CONNECT_TIMEOUT;
- protected int zkClientTimeout =
SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT;
- protected boolean canUseZkACLs = true;
+ /**
+ * Determines whether an UpdateRequest contains sufficient routing
information to identify shard
+ * leaders for direct updates when directUpdatesToLeadersOnly is enabled.
+ */
+ private static boolean hasInfoToFindLeaders(UpdateRequest updateRequest,
String idField) {
+ final Map<SolrInputDocument, Map<String, Object>> documents =
updateRequest.getDocumentsMap();
+ final Map<String, Map<String, Object>> deleteById =
updateRequest.getDeleteByIdMap();
- /**
- * Provide a series of Solr URLs to be used when configuring {@link
CloudSolrClient} instances.
- * The solr client will use these urls to understand the cluster topology,
which solr nodes are
- * active etc.
- *
- * <p>Provided Solr URLs are expected to point to the root Solr path
- * ("http://hostname:8983/solr"); they should not include any collections,
cores, or other path
- * components.
- *
- * <p>Usage example:
- *
- * <pre>
- * final List<String> solrBaseUrls = new ArrayList<String>();
- * solrBaseUrls.add("http://solr1:8983/solr");
solrBaseUrls.add("http://solr2:8983/solr");
solrBaseUrls.add("http://solr3:8983/solr");
- * final SolrClient client = new
CloudSolrClient.Builder(solrBaseUrls).build();
- * </pre>
- */
- public Builder(List<String> solrUrls) {
- this.solrUrls = solrUrls;
+ final boolean hasNoDocuments = (documents == null || documents.isEmpty());
+ final boolean hasNoDeleteById = (deleteById == null ||
deleteById.isEmpty());
+ if (hasNoDocuments && hasNoDeleteById) {
+ // no documents and no delete-by-id, so no info to find leader(s)
+ return false;
}
- /**
- * Provide a series of ZK hosts which will be used when configuring {@link
CloudSolrClient}
- * instances.
- *
- * <p>Usage example when Solr stores data at the ZooKeeper root ('/'):
- *
- * <pre>
- * final List<String> zkServers = new ArrayList<String>();
- * zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181");
zkServers.add("zookeeper3:2181");
- * final SolrClient client = new CloudSolrClient.Builder(zkServers,
Optional.empty()).build();
- * </pre>
- *
- * Usage example when Solr data is stored in a ZooKeeper chroot:
- *
- * <pre>
- * final List<String> zkServers = new ArrayList<String>();
- * zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181");
zkServers.add("zookeeper3:2181");
- * final SolrClient client = new CloudSolrClient.Builder(zkServers,
Optional.of("/solr")).build();
- * </pre>
- *
- * @param zkHosts a List of at least one ZooKeeper host and port (e.g.
"zookeeper1:2181")
- * @param zkChroot the path to the root ZooKeeper node containing Solr
data. Provide {@code
- * java.util.Optional.empty()} if no ZK chroot is used.
- */
- public Builder(List<String> zkHosts, Optional<String> zkChroot) {
- this.zkHosts = zkHosts;
- if (zkChroot.isPresent()) this.zkChroot = zkChroot.get();
+ if (documents != null) {
+ for (final Map.Entry<SolrInputDocument, Map<String, Object>> entry :
documents.entrySet()) {
+ final SolrInputDocument doc = entry.getKey();
+ final Object fieldValue = doc.getFieldValue(idField);
+ if (fieldValue == null) {
+ // a document with no id field value, so can't find leader for it
+ return false;
+ }
+ }
}
- /** for an expert use-case */
- public Builder(ClusterStateProvider stateProvider) {
- this.stateProvider = stateProvider;
+ if (deleteById != null) {
+ for (final Map.Entry<String, Map<String, Object>> entry :
deleteById.entrySet()) {
+ final Map<String, Object> params = entry.getValue();
+ if (params == null || params.get(ShardParams._ROUTE_) == null) {
+ // deleteById entry lacks explicit route parameter, can't find
leader for it
+ return false;
+ }
+ }
}
- /**
- * Creates a client builder based on a connection string of 2 possible
formats:
- *
- * <ul>
- * <li>ZooKeeper connection string (optionally with chroot), e.g. {@code
- * zk1:2181,zk2:2181,zk3:2181/solr}
- * <li>Comma-separated list of Solr node base URLs (HTTP or HTTPS), e.g.
{@code
- * http://solr1:8983/solr,http://solr2:8983/solr}
- * </ul>
- *
- * @param connectionString a string specifying either ZooKeeper connection
string or HTTP(S)
- * Solr URLs
- * @throws IllegalArgumentException if string is null, empty, or malformed
- */
- public Builder(String connectionString) {
- this(CloudSolrClientConnection.parse(connectionString));
- }
+ return true;
+ }
- /**
- * Creates a client builder from a {@link CloudSolrClientConnection}.
- *
- * @param connection instance of {@link CloudSolrClientConnection}, which
can be obtained from
- * the solr connection string or created via the constructor
- */
- public Builder(CloudSolrClientConnection connection) {
- if (connection.isZookeeper()) {
- this.zkHosts = connection.quorumItems();
- this.zkChroot = connection.zkChroot();
- } else {
- this.solrUrls = connection.quorumItems();
- }
- }
+ protected abstract LBSolrClient getLbClient();
- /** Whether to use the default ZK ACLs when building a ZK Client. */
- public Builder canUseZkACLs(boolean canUseZkACLs) {
- this.canUseZkACLs = canUseZkACLs;
- return this;
- }
+ public abstract ClusterStateProvider getClusterStateProvider();
- /**
- * Tells {@link Builder} that created clients should be configured such
that {@link
- * CloudSolrClient#isUpdatesToLeaders} returns <code>true</code>.
- *
- * @see #sendUpdatesToAnyReplica
- * @see CloudSolrClient#isUpdatesToLeaders
- */
- public Builder sendUpdatesOnlyToShardLeaders() {
- shardLeadersOnly = true;
- return this;
- }
+ /**
+ * @deprecated problematic as a 'get' method, since one implementation will
do a remote request
+ * each time this is called, potentially return lots of data that isn't
even needed.
+ */
+ @Deprecated
+ public ClusterState getClusterState() {
+ // The future of "ClusterState" isn't clear. Could make it more of a
cache instead of a
+ // snapshot, so we un-deprecate. Or we avoid it and maybe make the
ClusterStateProvider as that
+ // cache. SOLR-17604 is related.
+ return getClusterStateProvider().getClusterState();
+ }
- /**
- * Tells {@link Builder} that created clients should be configured such
that {@link
- * CloudSolrClient#isUpdatesToLeaders} returns <code>false</code>.
- *
- * @see #sendUpdatesOnlyToShardLeaders
- * @see CloudSolrClient#isUpdatesToLeaders
- */
- public Builder sendUpdatesToAnyReplica() {
- shardLeadersOnly = false;
- return this;
- }
+ /** Is this a communication error? We will retry if so. */
+ protected boolean wasCommError(Throwable t) {
+ return t instanceof SocketException || t instanceof UnknownHostException;
+ }
- /**
- * Tells {@link CloudSolrClient.Builder} that created clients should send
direct updates to
- * shard leaders only.
- *
- * <p>UpdateRequests whose leaders cannot be found will "fail fast" on the
client side with a
- * {@link SolrException}
- *
- * @see #sendDirectUpdatesToAnyShardReplica
- * @see CloudSolrClient#isDirectUpdatesToLeadersOnly
- */
- public Builder sendDirectUpdatesToShardLeadersOnly() {
- directUpdatesToLeadersOnly = true;
- return this;
+ @Override
+ public void close() {
+ closed = true;
+ collectionRefreshes.clear();
+ if (!ExecutorUtil.isShutdown(this.threadPool)) {
+ ExecutorUtil.shutdownAndAwaitTermination(this.threadPool);
}
+ }
- /**
- * Tells {@link CloudSolrClient.Builder} that created clients can send
updates to any shard
- * replica (shard leaders and non-leaders).
- *
- * <p>Shard leaders are still preferred, but the created clients will fall
back to using other
- * replicas if a leader cannot be found.
- *
- * @see #sendDirectUpdatesToShardLeadersOnly
- * @see CloudSolrClient#isDirectUpdatesToLeadersOnly
- */
- public Builder sendDirectUpdatesToAnyShardReplica() {
- directUpdatesToLeadersOnly = false;
- return this;
- }
-
- /** Provides a {@link RequestWriter} for created clients to use when
handing requests. */
- public Builder withRequestWriter(RequestWriter requestWriter) {
- this.requestWriter = requestWriter;
- return this;
- }
-
- /** Provides a {@link ResponseParser} for created clients to use when
handling requests. */
- public Builder withResponseParser(ResponseParser responseParser) {
- this.responseParser = responseParser;
- return this;
- }
-
- /**
- * Tells {@link CloudSolrClient.Builder} whether created clients should
send shard updates
- * serially or in parallel
- *
- * <p>When an {@link UpdateRequest} affects multiple shards, {@link
CloudSolrClient} splits it
- * up and sends a request to each affected shard. This setting chooses
whether those
- * sub-requests are sent serially or in parallel.
- *
- * <p>If not set, this defaults to 'true' and sends sub-requests in
parallel.
- */
- public Builder withParallelUpdates(boolean parallelUpdates) {
- this.parallelUpdates = parallelUpdates;
- return this;
- }
+ public ResponseParser getParser() {
+ return getLbClient().getParser();
+ }
- /**
- * Configures how many collection state refresh operations may run in
parallel using a dedicated
- * thread pool. This controls the maximum number of concurrent
ZooKeeper/cluster state lookups.
- *
- * <p>Defaults to 5.
- */
- public Builder withParallelCacheRefreshes(int parallelCacheRefreshesLocks)
{
- this.parallelCacheRefreshesLocks = parallelCacheRefreshesLocks;
- return this;
- }
+ public RequestWriter getRequestWriter() {
+ return getLbClient().getRequestWriter();
+ }
- /**
- * This is the time to wait to re-fetch the state after getting the same
state version from ZK
- */
- public Builder withRetryExpiryTime(long expiryTime, TimeUnit unit) {
- this.retryExpiryTimeNano = TimeUnit.NANOSECONDS.convert(expiryTime,
unit);
- return this;
- }
+ /** Gets whether direct updates are sent in parallel */
+ public boolean isParallelUpdates() {
+ return parallelUpdates;
+ }
- /** Sets the default collection for request. */
- public Builder withDefaultCollection(String defaultCollection) {
- this.defaultCollection = defaultCollection;
- return this;
- }
+ /**
+ * Connect to the zookeeper ensemble. This is an optional method that may be
used to force a
+ * connection before any other requests are sent.
+ *
+ * @deprecated Call {@link ClusterStateProvider#getLiveNodes()} instead.
+ */
+ @Deprecated
+ public void connect() {
+ getClusterStateProvider().connect();
+ }
- /**
- * Sets the cache ttl for DocCollection Objects cached.
- *
- * @param timeToLive ttl value
- */
- public Builder withCollectionCacheTtl(long timeToLive, TimeUnit unit) {
- assert timeToLive > 0;
- this.timeToLiveSeconds = TimeUnit.SECONDS.convert(timeToLive, unit);
- return this;
+ /**
+ * Connect to a cluster. If the cluster is not ready, retry connection up to
a given timeout.
+ *
+ * @param duration the timeout
+ * @param timeUnit the units of the timeout
+ * @throws TimeoutException if the cluster is not ready after the timeout
+ * @throws InterruptedException if the wait is interrupted
+ */
+ @Deprecated
+ public void connect(long duration, TimeUnit timeUnit)
+ throws TimeoutException, InterruptedException {
+ if (log.isInfoEnabled()) {
+ log.info(
+ "Waiting for {} {} for cluster at {} to be ready",
+ duration,
+ timeUnit,
+ getClusterStateProvider());
}
-
- /**
- * Set the internal Solr HTTP client.
- *
- * <p>Note: closing the client instance is the responsibility of the
caller.
- *
- * @return this
- */
- public Builder withHttpClient(HttpSolrClient httpSolrClient) {
- if (this.internalClientBuilder != null) {
- throw new IllegalStateException(
- "The builder can't accept an httpClient AND an
internalClientBuilder, only one of those can be provided");
+ long timeout = System.nanoTime() + timeUnit.toNanos(duration);
+ while (System.nanoTime() < timeout) {
+ try {
+ connect();
+ if (log.isInfoEnabled()) {
+ log.info("Cluster at {} ready", getClusterStateProvider());
+ }
+ return;
+ } catch (RuntimeException e) {
+ // not ready yet, then...
}
- this.httpClient = httpSolrClient;
- return this;
+ TimeUnit.MILLISECONDS.sleep(250);
}
+ throw new TimeoutException("Timed out waiting for cluster");
+ }
- /**
- * If provided, the CloudSolrClient will build it's internal client using
this builder (instead
- * of the empty default one). Providing this builder allows users to
configure the internal
- * clients (authentication, timeouts, etc.).
- *
- * @param internalClientBuilder the builder to use for creating the
internal http client.
- * @return this
- */
- public Builder withHttpClientBuilder(HttpSolrClient.BuilderBase<?, ?>
internalClientBuilder) {
- if (this.httpClient != null) {
- throw new IllegalStateException(
- "The builder can't accept an httpClient AND an
internalClientBuilder, only one of those can be provided");
- }
- this.internalClientBuilder = internalClientBuilder;
- return this;
- }
+ @SuppressWarnings({"unchecked"})
+ private NamedList<Object> directUpdate(UpdateRequest request, String
collection)
+ throws SolrServerException {
+ SolrParams params = request.getParams();
+ ModifiableSolrParams routableParams = new ModifiableSolrParams();
+ ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
- @Deprecated(since = "9.10")
- public Builder withInternalClientBuilder(
- HttpSolrClient.BuilderBase<?, ?> internalClientBuilder) {
- return withHttpClientBuilder(internalClientBuilder);
+ if (params != null) {
+ nonRoutableParams.add(params);
+ routableParams.add(params);
+ for (String param : NON_ROUTABLE_PARAMS) {
+ routableParams.remove(param);
+ }
+ } else {
+ params = new ModifiableSolrParams();
}
- /**
- * Sets the Zk connection timeout
- *
- * @param zkConnectTimeout timeout value
- * @param unit time unit
- */
- public Builder withZkConnectTimeout(int zkConnectTimeout, TimeUnit unit) {
- this.zkConnectTimeout = Math.toIntExact(unit.toMillis(zkConnectTimeout));
- return this;
+ if (collection == null) {
+ throw new SolrServerException(
+ "No collection param specified on request and no default collection
has been set.");
}
- /**
- * Sets the Zk client session timeout
- *
- * @param zkClientTimeout timeout value
- * @param unit time unit
- */
- public Builder withZkClientTimeout(int zkClientTimeout, TimeUnit unit) {
- this.zkClientTimeout = Math.toIntExact(unit.toMillis(zkClientTimeout));
- return this;
+ // Check to see if the collection is an alias. Updates to multi-collection
aliases are ok as
+ // long as they are routed aliases
+ List<String> aliasedCollections = new
ArrayList<>(resolveAliases(List.of(collection)));
+ if (aliasedCollections.size() == 1 ||
getClusterStateProvider().isRoutedAlias(collection)) {
+ collection = aliasedCollections.get(0); // pick 1st (consistent with
HttpSolrCall behavior)
+ } else {
+ throw new SolrException(
+ SolrException.ErrorCode.BAD_REQUEST,
+ "Update request to non-routed multi-collection alias not supported: "
+ + collection
+ + " -> "
+ + aliasedCollections);
}
- /** Create a {@link CloudSolrClient} based on the provided configuration.
*/
- public CloudHttp2SolrClient build() {
- int providedOptions = 0;
- if (!zkHosts.isEmpty()) providedOptions++;
- if (!solrUrls.isEmpty()) providedOptions++;
- if (stateProvider != null) providedOptions++;
+ DocCollection col = getDocCollection(collection, null);
- if (providedOptions > 1) {
- throw new IllegalArgumentException(
- "Only one of zkHost(s), solrUrl(s), or stateProvider should be
specified.");
- } else if (providedOptions == 0) {
- throw new IllegalArgumentException(
- "One of zkHosts, solrUrls, or stateProvider must be specified.");
- }
+ DocRouter router = col.getRouter();
- return new CloudHttp2SolrClient(this);
+ if (router instanceof ImplicitDocRouter) {
+ // short circuit as optimization
+ return null;
}
- protected HttpSolrClient createOrGetHttpClient() {
- if (httpClient != null) {
- return httpClient;
- } else if (internalClientBuilder != null) {
- return internalClientBuilder.build();
+ ReplicaListTransformer replicaListTransformer =
+ requestRLTGenerator.getReplicaListTransformer(params);
+
+ // Create the URL map, which is keyed on slice name.
+ // The value is a list of URLs for each replica in the slice.
+ // The first value in the list is the leader for the slice.
+ final Map<String, List<String>> urlMap = buildUrlMap(col,
replicaListTransformer);
+ String routeField =
+ (col.getRouter().getRouteField(col) == null) ? ID :
col.getRouter().getRouteField(col);
+ final Map<String, ? extends LBSolrClient.Req> routes =
+ createRoutes(request, routableParams, col, router, urlMap, routeField);
+ if (routes == null) {
+ if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(request,
routeField)) {
+ // we have info (documents with ids and/or ids to delete) with
+ // which to find the leaders, but we could not find (all of) them
+ throw new SolrException(
+ SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+ "directUpdatesToLeadersOnly==true but could not find leader(s)");
} else {
- return HttpSolrClient.builder(null).build();
+ // we could not find a leader or routes yet - use unoptimized general
path
+ log.warn(
+ "No routing info found for update to collection '{}', broadcasting
to all shards.",
+ collection);
+ return null;
}
}
- protected LBSolrClient createOrGetLbClient(HttpSolrClient myClient) {
- return myClient.createLBSolrClient();
- }
+ final NamedList<Throwable> exceptions = new NamedList<>();
+ final NamedList<NamedList<?>> shardResponses =
+ new NamedList<>(routes.size() + 1); // +1 for deleteQuery
- protected ClusterStateProvider createZkClusterStateProvider() {
- ClusterStateProvider stateProvider =
- ClusterStateProvider.newZkClusterStateProvider(zkHosts, zkChroot,
canUseZkACLs);
- if (stateProvider instanceof
SolrZkClientTimeout.SolrZkClientTimeoutAware timeoutAware) {
- timeoutAware.setZkClientTimeout(zkClientTimeout);
- timeoutAware.setZkConnectTimeout(zkConnectTimeout);
- }
- return stateProvider;
- }
+ long start = System.nanoTime();
- protected ClusterStateProvider
createHttpClusterStateProvider(HttpSolrClient httpClient) {
- try {
- return new HttpClusterStateProvider<>(solrUrls, httpClient);
- } catch (Exception e) {
- throw new RuntimeException(
- "Couldn't initialize a HttpClusterStateProvider (is/are the "
- + "Solr server(s), "
- + solrUrls
- + ", down?)",
- e);
+ if (parallelUpdates) {
+ final Map<String, Future<NamedList<?>>> responseFutures =
+ CollectionUtil.newHashMap(routes.size());
+ for (final Map.Entry<String, ? extends LBSolrClient.Req> entry :
routes.entrySet()) {
+ final String url = entry.getKey();
+ final LBSolrClient.Req lbRequest = entry.getValue();
+ try {
+ MDC.put("CloudSolrClient.url", url);
+ responseFutures.put(
+ url,
+ threadPool.submit(
+ () -> {
+ return getLbClient().request(lbRequest).getResponse();
+ }));
+ } finally {
+ MDC.remove("CloudSolrClient.url");
+ }
}
- }
- }
- protected static class StateCache extends ConcurrentHashMap<String,
ExpiringCachedDocCollection> {
- final AtomicLong puts = new AtomicLong();
- final AtomicLong hits = new AtomicLong();
- final Lock evictLock = new ReentrantLock(true);
- public volatile long timeToLiveMs = 60 * 1000L;
+ for (final Map.Entry<String, Future<NamedList<?>>> entry :
responseFutures.entrySet()) {
+ final String url = entry.getKey();
+ final Future<NamedList<?>> responseFuture = entry.getValue();
+ try {
+ shardResponses.add(url, responseFuture.get());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ exceptions.add(url, e.getCause());
+ }
+ }
- @Override
- public ExpiringCachedDocCollection get(Object key) {
- ExpiringCachedDocCollection val = super.get(key);
- if (val == null) {
- // a new collection is likely to be added now.
- // check if there are stale items and remove them
- evictStale();
- return null;
+ if (exceptions.size() > 0) {
+ Throwable firstException = exceptions.getVal(0);
+ if (firstException instanceof SolrException e) {
+ throw getRouteException(
+ SolrException.ErrorCode.getErrorCode(e.code()), exceptions,
routes);
+ } else {
+ throw getRouteException(SolrException.ErrorCode.SERVER_ERROR,
exceptions, routes);
+ }
}
- if (val.isExpired(timeToLiveMs)) {
- super.remove(key);
- return null;
+ } else {
+ for (Map.Entry<String, ? extends LBSolrClient.Req> entry :
routes.entrySet()) {
+ String url = entry.getKey();
+ LBSolrClient.Req lbRequest = entry.getValue();
+ try {
+ NamedList<Object> rsp =
getLbClient().request(lbRequest).getResponse();
+ shardResponses.add(url, rsp);
+ } catch (Exception e) {
+ if (e instanceof SolrException) {
+ throw (SolrException) e;
+ } else {
+ throw new SolrServerException(e);
+ }
+ }
}
- hits.incrementAndGet();
- return val;
}
- ExpiringCachedDocCollection peek(Object key) {
- return super.get(key);
+ UpdateRequest nonRoutableRequest = null;
+ List<String> deleteQuery = request.getDeleteQuery();
+ if (deleteQuery != null && deleteQuery.size() > 0) {
+ UpdateRequest deleteQueryRequest = new UpdateRequest();
+ deleteQueryRequest.setDeleteQuery(deleteQuery);
+ nonRoutableRequest = deleteQueryRequest;
}
- @Override
- public ExpiringCachedDocCollection put(String key,
ExpiringCachedDocCollection value) {
- puts.incrementAndGet();
- return super.put(key, value);
- }
+ Set<String> paramNames = nonRoutableParams.getParameterNames();
- void evictStale() {
- if (!evictLock.tryLock()) return;
+ Set<String> intersection = new HashSet<>(paramNames);
+ intersection.retainAll(NON_ROUTABLE_PARAMS);
+
+ if (nonRoutableRequest != null || intersection.size() > 0) {
+ if (nonRoutableRequest == null) {
+ nonRoutableRequest = new UpdateRequest();
+ }
+ nonRoutableRequest.setParams(nonRoutableParams);
+ nonRoutableRequest.setBasicAuthCredentials(
+ request.getBasicAuthUser(), request.getBasicAuthPassword());
+ final var endpoints =
+ routes.keySet().stream()
+ .map(url -> LBSolrClient.Endpoint.from(url))
+ .collect(Collectors.toList());
+ Collections.shuffle(endpoints, rand);
+ LBSolrClient.Req req = new LBSolrClient.Req(nonRoutableRequest,
endpoints);
try {
- for (Entry<String, ExpiringCachedDocCollection> e : entrySet()) {
- if (e.getValue().isExpired(timeToLiveMs)) {
- super.remove(e.getKey());
- }
- }
- } finally {
- evictLock.unlock();
+ LBSolrClient.Rsp rsp = getLbClient().request(req);
+ shardResponses.add(endpoints.get(0).toString(), rsp.getResponse());
+ } catch (Exception e) {
+ throw new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR, endpoints.get(0).toString(),
e);
}
}
- }
- protected final StateCache collectionStateCache = new StateCache();
+ long end = System.nanoTime();
- class ExpiringCachedDocCollection {
- final DocCollection cached;
- final long cachedAtNano;
- // This is the time at which the collection is retried and got the same
old version
- volatile long retriedAtNano = -1;
- // flag that suggests that this is potentially to be rechecked
- volatile boolean maybeStale = false;
+ @SuppressWarnings({"rawtypes"})
+ RouteResponse rr =
+ condenseResponse(
+ shardResponses, (int) TimeUnit.MILLISECONDS.convert(end - start,
TimeUnit.NANOSECONDS));
+ rr.setRouteResponses(shardResponses);
+ rr.setRoutes(routes);
+ return rr;
+ }
- ExpiringCachedDocCollection(DocCollection cached) {
- this.cached = cached;
- this.cachedAtNano = System.nanoTime();
- }
+ protected RouteException getRouteException(
+ SolrException.ErrorCode serverError,
+ NamedList<Throwable> exceptions,
+ Map<String, ? extends LBSolrClient.Req> routes) {
+ return new RouteException(serverError, exceptions, routes);
+ }
- boolean isExpired(long timeToLiveMs) {
- return (System.nanoTime() - cachedAtNano)
- > TimeUnit.NANOSECONDS.convert(timeToLiveMs, TimeUnit.MILLISECONDS);
- }
+ protected Map<String, ? extends LBSolrClient.Req> createRoutes(
+ UpdateRequest updateRequest,
+ ModifiableSolrParams routableParams,
+ DocCollection col,
+ DocRouter router,
+ Map<String, List<String>> urlMap,
+ String routeField) {
+ return urlMap == null
+ ? null
+ : updateRequest.getRoutesToCollection(router, col, urlMap,
routableParams, routeField);
+ }
- boolean shouldRetry() {
- if (maybeStale) { // we are not sure if it is stale so check with retry
time
- if ((retriedAtNano == -1 || (System.nanoTime() - retriedAtNano) >
retryExpiryTimeNano)) {
- return true; // we retried a while back. and we could not get
anything new.
- // it's likely that it is not going to be available now also.
+ private Map<String, List<String>> buildUrlMap(
+ DocCollection col, ReplicaListTransformer replicaListTransformer) {
+ Map<String, List<String>> urlMap = new HashMap<>();
+ Collection<Slice> slices = col.getActiveSlices();
+ Set<String> liveNodes = getClusterStateProvider().getLiveNodes();
+ for (Slice slice : slices) {
+ String name = slice.getName();
+ List<Replica> sortedReplicas = new ArrayList<>();
+ Replica leader = slice.getLeader();
+ if (directUpdatesToLeadersOnly && leader == null) {
+ for (Replica replica :
+ slice.getReplicas(
+ replica -> replica.isActive(liveNodes) && replica.getType() ==
Replica.Type.NRT)) {
+ leader = replica;
+ break;
}
}
- return false;
- }
-
- void setRetriedAt() {
- retriedAtNano = System.nanoTime();
- }
-
- /**
- * Marks this entry as {@code maybeStale} if the provided backoff window
has elapsed since the
- * last retry.
- *
- * @return {@code true} if the entry was flagged as maybe stale
- */
- boolean markMaybeStaleIfOutsideBackoff(long retryBackoffNano) {
- if (maybeStale) {
- return true;
+ if (leader == null) {
+ if (directUpdatesToLeadersOnly) {
+ continue;
+ }
+ // take unoptimized general path - we cannot find a leader yet
+ return null;
}
- long lastRetry = retriedAtNano;
- if (lastRetry != -1 && (System.nanoTime() - lastRetry) <=
retryBackoffNano) {
- return false;
+
+ if (!directUpdatesToLeadersOnly) {
+ for (Replica replica : slice.getReplicas()) {
+ if (!replica.equals(leader)) {
+ sortedReplicas.add(replica);
+ }
+ }
}
- maybeStale = true;
- return true;
- }
- }
- protected CloudSolrClient(
- boolean updatesToLeaders, boolean parallelUpdates, boolean
directUpdatesToLeadersOnly) {
- this(
- updatesToLeaders,
- parallelUpdates,
- directUpdatesToLeadersOnly,
- DEFAULT_STATE_REFRESH_PARALLELISM);
- }
+ // Sort the non-leader replicas according to the request parameters
+ replicaListTransformer.transform(sortedReplicas);
- protected CloudSolrClient(
- boolean updatesToLeaders,
- boolean parallelUpdates,
- boolean directUpdatesToLeadersOnly,
- int stateRefreshThreads) {
- this.updatesToLeaders = updatesToLeaders;
- this.parallelUpdates = parallelUpdates;
- this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
- this.requestRLTGenerator = new RequestReplicaListTransformerGenerator();
- this.stateRefreshParallelism = Math.max(1, stateRefreshThreads);
- this.stateRefreshSemaphore = new Semaphore(this.stateRefreshParallelism);
+ // put the leaderUrl first.
+ sortedReplicas.add(0, leader);
+
+ urlMap.put(
+ name,
sortedReplicas.stream().map(Replica::getCoreUrl).collect(Collectors.toList()));
+ }
+ return urlMap;
}
- protected abstract LBSolrClient getLbClient();
+ protected <T extends RouteResponse<?>> T condenseResponse(
+ NamedList<?> response, int timeMillis, Supplier<T> supplier) {
+ T condensed = supplier.get();
+ int status = 0;
+ Integer rf = null;
- public abstract ClusterStateProvider getClusterStateProvider();
+ // TolerantUpdateProcessor
+ List<SimpleOrderedMap<String>> toleratedErrors = null;
+ int maxToleratedErrors = Integer.MAX_VALUE;
- /**
- * @deprecated problematic as a 'get' method, since one implementation will
do a remote request
- * each time this is called, potentially return lots of data that isn't
even needed.
- */
- @Deprecated
- public ClusterState getClusterState() {
- // The future of "ClusterState" isn't clear. Could make it more of a
cache instead of a
- // snapshot, so we un-deprecate. Or we avoid it and maybe make the
ClusterStateProvider as that
- // cache. SOLR-17604 is related.
- return getClusterStateProvider().getClusterState();
- }
+ // For "adds", "deletes", "deleteByQuery" etc.
+ Map<String, NamedList<Object>> versions = new HashMap<>();
- /** Is this a communication error? We will retry if so. */
- protected boolean wasCommError(Throwable t) {
- return t instanceof SocketException || t instanceof UnknownHostException;
- }
+ for (int i = 0; i < response.size(); i++) {
+ NamedList<?> shardResponse = (NamedList<?>) response.getVal(i);
+ NamedList<?> header = (NamedList<?>) shardResponse.get("responseHeader");
+ Integer shardStatus = (Integer) header.get("status");
+ int s = shardStatus.intValue();
+ if (s > 0) {
+ status = s;
+ }
+ Object rfObj = header.get(UpdateRequest.REPFACT);
+ if (rfObj != null && rfObj instanceof Integer routeRf) {
+ if (rf == null || routeRf < rf) rf = routeRf;
+ }
- @Override
- public void close() {
- closed = true;
- collectionRefreshes.clear();
- if (!ExecutorUtil.isShutdown(this.threadPool)) {
- ExecutorUtil.shutdownAndAwaitTermination(this.threadPool);
+ @SuppressWarnings("unchecked")
+ List<SimpleOrderedMap<String>> shardTolerantErrors =
+ (List<SimpleOrderedMap<String>>) header.get("errors");
+ if (null != shardTolerantErrors) {
+ Integer shardMaxToleratedErrors = (Integer) header.get("maxErrors");
+ assert null != shardMaxToleratedErrors
+ : "TolerantUpdateProcessor reported errors but not maxErrors";
+ // if we get into some weird state where the nodes disagree about the
effective maxErrors,
+ // assume the min value seen to decide if we should fail.
+ maxToleratedErrors =
+ Math.min(
+ maxToleratedErrors,
+
ToleratedUpdateError.getEffectiveMaxErrors(shardMaxToleratedErrors.intValue()));
+
+ if (null == toleratedErrors) {
+ toleratedErrors = new
ArrayList<SimpleOrderedMap<String>>(shardTolerantErrors.size());
+ }
+ for (SimpleOrderedMap<String> err : shardTolerantErrors) {
+ toleratedErrors.add(err);
+ }
+ }
+ for (String updateType : Arrays.asList("adds", "deletes",
"deleteByQuery")) {
+ Object obj = shardResponse.get(updateType);
+ if (obj instanceof NamedList<?> nl) {
+ NamedList<Object> versionsList =
+ versions.containsKey(updateType) ? versions.get(updateType) :
new NamedList<>();
+ versionsList.addAll(nl);
+ versions.put(updateType, versionsList);
+ }
+ }
}
- }
- public ResponseParser getParser() {
- return getLbClient().getParser();
- }
+ NamedList<Object> cheader = new NamedList<>();
+ cheader.add("status", status);
+ cheader.add("QTime", timeMillis);
+ if (rf != null) cheader.add(UpdateRequest.REPFACT, rf);
+ if (null != toleratedErrors) {
+ cheader.add("maxErrors",
ToleratedUpdateError.getUserFriendlyMaxErrors(maxToleratedErrors));
+ cheader.add("errors", toleratedErrors);
+ if (maxToleratedErrors < toleratedErrors.size()) {
+ // cumulative errors are too high, we need to throw a client exception
w/correct metadata
- public RequestWriter getRequestWriter() {
- return getLbClient().getRequestWriter();
+ // NOTE: it shouldn't be possible for 1 == toleratedErrors.size(),
because if that were the
+ // case then at least one shard should have thrown a real error before
this, so we don't
+ // worry about having a more "singular" exception msg for that
situation
+ StringBuilder msgBuf =
+ new StringBuilder()
+ .append(toleratedErrors.size())
+ .append(" Async failures during distributed update: ");
+
+ NamedList<String> metadata = new NamedList<>();
+ for (SimpleOrderedMap<String> err : toleratedErrors) {
+ ToleratedUpdateError te = ToleratedUpdateError.parseMap(err);
+ metadata.add(te.getMetadataKey(), te.getMetadataValue());
+
+ msgBuf.append("\n").append(te.getMessage());
+ }
+
+ SolrException toThrow =
+ new SolrException(SolrException.ErrorCode.BAD_REQUEST,
msgBuf.toString());
+ toThrow.setMetadata(metadata);
+ throw toThrow;
+ }
+ }
+ for (Map.Entry<String, NamedList<Object>> entry : versions.entrySet()) {
+ condensed.add(entry.getKey(), entry.getValue());
+ }
+ condensed.add("responseHeader", cheader);
+ return condensed;
}
- /** Gets whether direct updates are sent in parallel */
- public boolean isParallelUpdates() {
- return parallelUpdates;
+ @SuppressWarnings({"rawtypes"})
+ public RouteResponse condenseResponse(NamedList<?> response, int timeMillis)
{
+ return condenseResponse(response, timeMillis, RouteResponse::new);
}
- /**
- * Connect to the zookeeper ensemble. This is an optional method that may be
used to force a
- * connection before any other requests are sent.
- *
- * @deprecated Call {@link ClusterStateProvider#getLiveNodes()} instead.
- */
- @Deprecated
- public void connect() {
- getClusterStateProvider().connect();
+ @Override
+ public NamedList<Object> request(SolrRequest<?> request, String collection)
+ throws SolrServerException, IOException {
+ // the collection parameter of the request overrides that of the parameter
to this method
+ String requestCollection = request.getCollection();
+ if (requestCollection != null) {
+ collection = requestCollection;
+ } else if (collection == null) {
+ collection = defaultCollection;
+ }
+
+ List<String> inputCollections =
+ collection == null ? List.of() : StrUtils.splitSmart(collection, ",",
true);
+ return requestWithRetryOnStaleState(
+ request,
+ 0,
+ inputCollections,
+ /*skipStateVersion*/ false,
+ Map.of(),
+ /*waitedForRefresh*/ false);
}
/**
- * Connect to a cluster. If the cluster is not ready, retry connection up to
a given timeout.
- *
- * @param duration the timeout
- * @param timeUnit the units of the timeout
- * @throws TimeoutException if the cluster is not ready after the timeout
- * @throws InterruptedException if the wait is interrupted
+ * As this class doesn't watch external collections on the client side,
there's a chance that the
+ * request will fail due to cached stale state, which means the state must
be refreshed from ZK
+ * and retried.
*/
- @Deprecated
- public void connect(long duration, TimeUnit timeUnit)
- throws TimeoutException, InterruptedException {
- if (log.isInfoEnabled()) {
- log.info(
- "Waiting for {} {} for cluster at {} to be ready",
- duration,
- timeUnit,
- getClusterStateProvider());
+ protected NamedList<Object> requestWithRetryOnStaleState(
+ SolrRequest<?> request,
+ int retryCount,
+ List<String> inputCollections,
+ boolean skipStateVersion,
+ Map<String, CompletableFuture<DocCollection>> pendingRefreshes,
+ boolean waitedForRefresh)
+ throws SolrServerException, IOException {
+ // build up a _stateVer_ param to pass to the server containing all the
+ // external collection state versions involved in this request, which
allows
+ // the server to notify us that our cached state for one or more of the
external
+ // collections is stale and needs to be refreshed ... this code has no
impact on internal
+ // collections
+ String stateVerParam = null;
+ List<DocCollection> requestedCollections = null;
+ boolean isCollectionRequestOfV2 = false;
+ if (request instanceof V2Request) {
+ isCollectionRequestOfV2 = ((V2Request) request).isPerCollectionRequest();
}
- long timeout = System.nanoTime() + timeUnit.toNanos(duration);
- while (System.nanoTime() < timeout) {
- try {
- connect();
- if (log.isInfoEnabled()) {
- log.info("Cluster at {} ready", getClusterStateProvider());
+ boolean isAdmin =
+ request.getRequestType() == SolrRequestType.ADMIN &&
!request.requiresCollection();
+ if (!inputCollections.isEmpty()
+ && !isAdmin
+ && !isCollectionRequestOfV2) { // don't do _stateVer_ checking for
admin, v2 api requests
+ Set<String> requestedCollectionNames = resolveAliases(inputCollections);
+
+ StringBuilder stateVerParamBuilder = null;
+ for (String requestedCollection : requestedCollectionNames) {
+ // track the version of state we're using on the client side using the
_stateVer_ param
+ DocCollection coll = getDocCollection(requestedCollection, null);
+ if (coll == null) {
+ throw new SolrException(
+ SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " +
requestedCollection);
}
- return;
- } catch (RuntimeException e) {
- // not ready yet, then...
- }
- TimeUnit.MILLISECONDS.sleep(250);
- }
- throw new TimeoutException("Timed out waiting for cluster");
- }
+ int collVer = coll.getZNodeVersion();
+ if (requestedCollections == null)
+ requestedCollections = new
ArrayList<>(requestedCollectionNames.size());
+ requestedCollections.add(coll);
- @SuppressWarnings({"unchecked"})
- private NamedList<Object> directUpdate(UpdateRequest request, String
collection)
- throws SolrServerException {
- SolrParams params = request.getParams();
- ModifiableSolrParams routableParams = new ModifiableSolrParams();
- ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
+ if (stateVerParamBuilder == null) {
+ stateVerParamBuilder = new StringBuilder();
+ } else {
+ stateVerParamBuilder.append(
+ "|"); // hopefully pipe is not an allowed char in a collection
name
+ }
- if (params != null) {
- nonRoutableParams.add(params);
- routableParams.add(params);
- for (String param : NON_ROUTABLE_PARAMS) {
- routableParams.remove(param);
+
stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
}
- } else {
- params = new ModifiableSolrParams();
- }
- if (collection == null) {
- throw new SolrServerException(
- "No collection param specified on request and no default collection
has been set.");
+ if (stateVerParamBuilder != null) {
+ stateVerParam = stateVerParamBuilder.toString();
+ }
}
- // Check to see if the collection is an alias. Updates to multi-collection
aliases are ok as
- // long as they are routed aliases
- List<String> aliasedCollections = new
ArrayList<>(resolveAliases(List.of(collection)));
- if (aliasedCollections.size() == 1 ||
getClusterStateProvider().isRoutedAlias(collection)) {
- collection = aliasedCollections.get(0); // pick 1st (consistent with
HttpSolrCall behavior)
- } else {
- throw new SolrException(
- SolrException.ErrorCode.BAD_REQUEST,
- "Update request to non-routed multi-collection alias not supported: "
- + collection
- + " -> "
- + aliasedCollections);
- }
-
- DocCollection col = getDocCollection(collection, null);
-
- DocRouter router = col.getRouter();
-
- if (router instanceof ImplicitDocRouter) {
- // short circuit as optimization
- return null;
- }
-
- ReplicaListTransformer replicaListTransformer =
- requestRLTGenerator.getReplicaListTransformer(params);
-
- // Create the URL map, which is keyed on slice name.
- // The value is a list of URLs for each replica in the slice.
- // The first value in the list is the leader for the slice.
- final Map<String, List<String>> urlMap = buildUrlMap(col,
replicaListTransformer);
- String routeField =
- (col.getRouter().getRouteField(col) == null) ? ID :
col.getRouter().getRouteField(col);
- final Map<String, ? extends LBSolrClient.Req> routes =
- createRoutes(request, routableParams, col, router, urlMap, routeField);
- if (routes == null) {
- if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(request,
routeField)) {
- // we have info (documents with ids and/or ids to delete) with
- // which to find the leaders, but we could not find (all of) them
- throw new SolrException(
- SolrException.ErrorCode.SERVICE_UNAVAILABLE,
- "directUpdatesToLeadersOnly==true but could not find leader(s)");
+ if (request.getParams() instanceof ModifiableSolrParams params) {
+ if (!skipStateVersion && stateVerParam != null) {
+ params.set(STATE_VERSION, stateVerParam);
} else {
- // we could not find a leader or routes yet - use unoptimized general
path
- log.warn(
- "No routing info found for update to collection '{}', broadcasting
to all shards.",
- collection);
- return null;
+ params.remove(STATE_VERSION);
}
- }
-
- final NamedList<Throwable> exceptions = new NamedList<>();
- final NamedList<NamedList<?>> shardResponses =
- new NamedList<>(routes.size() + 1); // +1 for deleteQuery
+ } // else: ??? how to set this ???
- long start = System.nanoTime();
+ NamedList<Object> resp = null;
+ try {
+ resp = sendRequest(request, inputCollections);
+ // to avoid an O(n) operation we always add STATE_VERSION to the last
and try to read it from
+ // there
+ Object o = resp == null || resp.size() == 0 ? null :
resp.get(STATE_VERSION, resp.size() - 1);
+ if (o != null && o instanceof Map<?, ?> invalidStates) {
+ // remove this because no one else needs this and tests would fail if
they are comparing
+ // responses
+ resp.remove(resp.size() - 1);
+ for (Map.Entry<?, ?> e : invalidStates.entrySet()) {
+ getDocCollection((String) e.getKey(), (Integer) e.getValue());
+ }
+ }
+ } catch (Exception exc) {
- if (parallelUpdates) {
- final Map<String, Future<NamedList<?>>> responseFutures =
- CollectionUtil.newHashMap(routes.size());
- for (final Map.Entry<String, ? extends LBSolrClient.Req> entry :
routes.entrySet()) {
- final String url = entry.getKey();
- final LBSolrClient.Req lbRequest = entry.getValue();
- try {
- MDC.put("CloudSolrClient.url", url);
- responseFutures.put(
- url,
- threadPool.submit(
- () -> {
- return getLbClient().request(lbRequest).getResponse();
- }));
- } finally {
- MDC.remove("CloudSolrClient.url");
+ Throwable rootCause = SolrException.getRootCause(exc);
+ // don't do retry support for admin requests
+ // or if the request doesn't have a collection specified
+ // or request is v2 api and its method is not GET
+ if (inputCollections.isEmpty()
+ || isAdmin
+ || (request.getApiVersion() == SolrRequest.ApiVersion.V2
+ && request.getMethod() != SolrRequest.METHOD.GET)) {
+ if (exc instanceof SolrServerException) {
+ throw (SolrServerException) exc;
+ } else if (exc instanceof IOException) {
+ throw (IOException) exc;
+ } else if (exc instanceof RuntimeException) {
+ throw (RuntimeException) exc;
+ } else {
+ throw new SolrServerException(rootCause);
}
}
- for (final Map.Entry<String, Future<NamedList<?>>> entry :
responseFutures.entrySet()) {
- final String url = entry.getKey();
- final Future<NamedList<?>> responseFuture = entry.getValue();
- try {
- shardResponses.add(url, responseFuture.get());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- exceptions.add(url, e.getCause());
+ int errorCode =
+ (rootCause instanceof SolrException)
+ ? ((SolrException) rootCause).code()
+ : SolrException.ErrorCode.UNKNOWN.code;
+
+ final boolean wasCommError = wasCommError(rootCause);
+
+ if (wasCommError
+ || (exc instanceof RouteException
+ && (errorCode == 503)) // 404 because the core does not exist
503 service unavailable
+ // TODO there are other reasons for 404. We need to change the solr
response format from HTML
+ // to structured data to know that
+ ) {
+ // it was a communication error. it is likely that
+ // the node to which the request to be sent is down . So , expire the
state
+ // so that the next attempt would fetch the fresh state
+ // just re-read state for all of them, if it has not been retried
+ // in retryExpiryTime time
+ if (requestedCollections != null) {
+ for (DocCollection ext : requestedCollections) {
+ String name = ext.getName();
+ ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(name);
+ if (cacheEntry != null) {
+ if (wasCommError) {
+ cacheEntry.maybeStale = true;
+ } else {
+ boolean markedStale =
+
cacheEntry.markMaybeStaleIfOutsideBackoff(retryExpiryTimeNano);
+ if (markedStale && cacheEntry.shouldRetry()) {
+ triggerCollectionRefresh(name);
+ }
+ }
+ } else {
+ triggerCollectionRefresh(name);
+ }
+ }
+ }
+ if (retryCount < MAX_STALE_RETRIES) { // if it is a communication
error , we must try again
+ // may be, we have a stale version of the collection state,
+ // and we could not get any information from the server
+ // it is probably not worth trying again and again because
+ // the state would not have been updated
+ log.info(
+ "Request to collection {} failed due to ({}) {}, retry={}
maxRetries={} commError={} errorCode={} - retrying",
+ inputCollections,
+ errorCode,
+ rootCause,
+ retryCount,
+ MAX_STALE_RETRIES,
+ wasCommError,
+ errorCode);
+ return requestWithRetryOnStaleState(
+ request,
+ retryCount + 1,
+ inputCollections,
+ skipStateVersion,
+ pendingRefreshes,
+ waitedForRefresh);
}
+ } else {
+ log.info("request was not communication error it seems");
}
+ log.info(
+ "Request to collection {} failed due to ({}) {}, retry={}
maxRetries={} commError={} errorCode={} ",
+ inputCollections,
+ errorCode,
+ rootCause,
+ retryCount,
+ MAX_STALE_RETRIES,
+ wasCommError,
+ errorCode);
- if (exceptions.size() > 0) {
- Throwable firstException = exceptions.getVal(0);
- if (firstException instanceof SolrException e) {
- throw getRouteException(
- SolrException.ErrorCode.getErrorCode(e.code()), exceptions,
routes);
- } else {
- throw getRouteException(SolrException.ErrorCode.SERVER_ERROR,
exceptions, routes);
+ boolean stateWasStale = false;
+ if (retryCount < MAX_STALE_RETRIES
+ && requestedCollections != null
+ && !requestedCollections.isEmpty()
+ && (SolrException.ErrorCode.getErrorCode(errorCode)
+ == SolrException.ErrorCode.INVALID_STATE
+ || errorCode == 404)) {
+ // cached state for one or more external collections was stale
+ // re-issue request using updated state
+ stateWasStale = true;
+
+ // just re-read state for all of them, which is a little heavy-handed
but hopefully a rare
+ // occurrence
+ for (DocCollection ext : requestedCollections) {
+ collectionStateCache.remove(ext.getName());
}
}
- } else {
- for (Map.Entry<String, ? extends LBSolrClient.Req> entry :
routes.entrySet()) {
- String url = entry.getKey();
- LBSolrClient.Req lbRequest = entry.getValue();
- try {
- NamedList<Object> rsp =
getLbClient().request(lbRequest).getResponse();
- shardResponses.add(url, rsp);
- } catch (Exception e) {
- if (e instanceof SolrException) {
- throw (SolrException) e;
- } else {
- throw new SolrServerException(e);
+
+ // if we experienced a communication error, it's worth checking the state
+ // with ZK just to make sure the node we're trying to hit is still part
of the collection
+ if (retryCount < MAX_STALE_RETRIES
+ && !stateWasStale
+ && requestedCollections != null
+ && !requestedCollections.isEmpty()
+ && wasCommError) {
+ for (DocCollection ext : requestedCollections) {
+ DocCollection latestStateFromZk = getDocCollection(ext.getName(),
null);
+ if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
+ // looks like we couldn't reach the server because the state was
stale == retry
+ stateWasStale = true;
+ // we just pulled state from ZK, so update the cache so that the
retry uses it
+ collectionStateCache.put(
+ ext.getName(), new
ExpiringCachedDocCollection(latestStateFromZk));
}
}
}
- }
- UpdateRequest nonRoutableRequest = null;
- List<String> deleteQuery = request.getDeleteQuery();
- if (deleteQuery != null && deleteQuery.size() > 0) {
- UpdateRequest deleteQueryRequest = new UpdateRequest();
- deleteQueryRequest.setDeleteQuery(deleteQuery);
- nonRoutableRequest = deleteQueryRequest;
- }
-
- Set<String> paramNames = nonRoutableParams.getParameterNames();
+ // if the state was stale, then we retry the request once with new state
pulled from Zk
+ if (stateWasStale) {
+ log.warn(
+ "Re-trying request to collection(s) {} after stale state error
from server.",
+ inputCollections);
- Set<String> intersection = new HashSet<>(paramNames);
- intersection.retainAll(NON_ROUTABLE_PARAMS);
+ Map<String, CompletableFuture<DocCollection>> refreshesToWaitFor =
pendingRefreshes;
+ if (!waitedForRefresh && (pendingRefreshes == null ||
pendingRefreshes.isEmpty())) {
+ refreshesToWaitFor = new HashMap<>();
+ for (DocCollection ext : requestedCollections) {
+ refreshesToWaitFor.put(ext.getName(),
triggerCollectionRefresh(ext.getName()));
+ }
+ }
- if (nonRoutableRequest != null || intersection.size() > 0) {
- if (nonRoutableRequest == null) {
- nonRoutableRequest = new UpdateRequest();
- }
- nonRoutableRequest.setParams(nonRoutableParams);
- nonRoutableRequest.setBasicAuthCredentials(
- request.getBasicAuthUser(), request.getBasicAuthPassword());
- final var endpoints =
- routes.keySet().stream()
- .map(url -> LBSolrClient.Endpoint.from(url))
- .collect(Collectors.toList());
- Collections.shuffle(endpoints, rand);
- LBSolrClient.Req req = new LBSolrClient.Req(nonRoutableRequest,
endpoints);
- try {
- LBSolrClient.Rsp rsp = getLbClient().request(req);
- shardResponses.add(endpoints.get(0).toString(), rsp.getResponse());
- } catch (Exception e) {
- throw new SolrException(
- SolrException.ErrorCode.SERVER_ERROR, endpoints.get(0).toString(),
e);
+ // First retry without sending state versions so the server does not
immediately reject the
+ // request while we intentionally rely on stale routing (e.g., to
allow forwarding to a new
+ // leader) as the background refresh completes.
+ if (!skipStateVersion && !waitedForRefresh) {
+ resp =
+ requestWithRetryOnStaleState(
+ request,
+ retryCount + 1,
+ inputCollections,
+ /*skipStateVersion*/ true,
+ refreshesToWaitFor,
+ waitedForRefresh);
+ } else if (!waitedForRefresh
+ && refreshesToWaitFor != null
+ && !refreshesToWaitFor.isEmpty()) {
+ for (Map.Entry<String, CompletableFuture<DocCollection>> entry :
+ refreshesToWaitFor.entrySet()) {
+ waitForCollectionRefresh(entry.getKey(), entry.getValue());
+ }
+ resp =
+ requestWithRetryOnStaleState(
+ request,
+ retryCount + 1,
+ inputCollections,
+ /*skipStateVersion*/ false,
+ Map.of(),
+ /*waitedForRefresh*/ true);
+ } else {
+ resp =
+ requestWithRetryOnStaleState(
+ request,
+ retryCount + 1,
+ inputCollections,
+ /*skipStateVersion*/ false,
+ Map.of(),
+ /*waitedForRefresh*/ waitedForRefresh);
+ }
+ } else {
+ if (exc instanceof SolrException
+ || exc instanceof SolrServerException
+ || exc instanceof IOException) {
+ throw exc;
+ } else {
+ throw new SolrServerException(rootCause);
+ }
}
- }
- long end = System.nanoTime();
+ if (requestedCollections != null) {
+ requestedCollections.clear(); // done with this
+ }
+ }
- @SuppressWarnings({"rawtypes"})
- RouteResponse rr =
- condenseResponse(
- shardResponses, (int) TimeUnit.MILLISECONDS.convert(end - start,
TimeUnit.NANOSECONDS));
- rr.setRouteResponses(shardResponses);
- rr.setRoutes(routes);
- return rr;
+ return resp;
}
- protected RouteException getRouteException(
- SolrException.ErrorCode serverError,
- NamedList<Throwable> exceptions,
- Map<String, ? extends LBSolrClient.Req> routes) {
- return new RouteException(serverError, exceptions, routes);
- }
+ protected NamedList<Object> sendRequest(SolrRequest<?> request, List<String>
inputCollections)
+ throws SolrServerException, IOException {
+ boolean sendToLeaders = false;
- protected Map<String, ? extends LBSolrClient.Req> createRoutes(
- UpdateRequest updateRequest,
- ModifiableSolrParams routableParams,
- DocCollection col,
- DocRouter router,
- Map<String, List<String>> urlMap,
- String routeField) {
- return urlMap == null
- ? null
- : updateRequest.getRoutesToCollection(router, col, urlMap,
routableParams, routeField);
- }
+ if (request.getRequestType() == SolrRequestType.UPDATE) {
+ sendToLeaders = this.isUpdatesToLeaders();
- private Map<String, List<String>> buildUrlMap(
- DocCollection col, ReplicaListTransformer replicaListTransformer) {
- Map<String, List<String>> urlMap = new HashMap<>();
- Collection<Slice> slices = col.getActiveSlices();
- Set<String> liveNodes = getClusterStateProvider().getLiveNodes();
- for (Slice slice : slices) {
- String name = slice.getName();
- List<Replica> sortedReplicas = new ArrayList<>();
- Replica leader = slice.getLeader();
- if (directUpdatesToLeadersOnly && leader == null) {
- for (Replica replica :
- slice.getReplicas(
- replica -> replica.isActive(liveNodes) && replica.getType() ==
Replica.Type.NRT)) {
- leader = replica;
- break;
- }
- }
- if (leader == null) {
- if (directUpdatesToLeadersOnly) {
- continue;
- }
- // take unoptimized general path - we cannot find a leader yet
- return null;
- }
+ if (sendToLeaders && request instanceof UpdateRequest updateRequest) {
+ sendToLeaders = sendToLeaders && updateRequest.isSendToLeaders();
- if (!directUpdatesToLeadersOnly) {
- for (Replica replica : slice.getReplicas()) {
- if (!replica.equals(leader)) {
- sortedReplicas.add(replica);
+ // Check if we can do a "directUpdate" ...
+ if (sendToLeaders) {
+ if (inputCollections.size() > 1) {
+ throw new SolrException(
+ SolrException.ErrorCode.BAD_REQUEST,
+ "Update request must be sent to a single collection "
+ + "or an alias: "
+ + inputCollections);
+ }
+ String collection =
+ inputCollections.isEmpty()
+ ? null
+ : inputCollections.get(0); // getting first mimics
HttpSolrCall
+ NamedList<Object> response = directUpdate(updateRequest, collection);
+ if (response != null) {
+ return response;
}
}
}
+ }
- // Sort the non-leader replicas according to the request parameters
- replicaListTransformer.transform(sortedReplicas);
-
- // put the leaderUrl first.
- sortedReplicas.add(0, leader);
+ SolrParams reqParams = request.getParams();
+ assert reqParams != null;
- urlMap.put(
- name,
sortedReplicas.stream().map(Replica::getCoreUrl).collect(Collectors.toList()));
- }
- return urlMap;
- }
+ ReplicaListTransformer replicaListTransformer =
+ requestRLTGenerator.getReplicaListTransformer(reqParams);
- protected <T extends RouteResponse<?>> T condenseResponse(
- NamedList<?> response, int timeMillis, Supplier<T> supplier) {
- T condensed = supplier.get();
- int status = 0;
- Integer rf = null;
+ final ClusterStateProvider provider = getClusterStateProvider();
+ final String urlScheme = provider.getUrlScheme();
+ final Set<String> liveNodes = provider.getLiveNodes();
- // TolerantUpdateProcessor
- List<SimpleOrderedMap<String>> toleratedErrors = null;
- int maxToleratedErrors = Integer.MAX_VALUE;
+ final List<LBSolrClient.Endpoint> requestEndpoints =
+ new ArrayList<>(); // we populate this as follows...
- // For "adds", "deletes", "deleteByQuery" etc.
- Map<String, NamedList<Object>> versions = new HashMap<>();
+ if (request.getApiVersion() == SolrRequest.ApiVersion.V2) {
+ if (!liveNodes.isEmpty()) {
+ List<String> liveNodesList = new ArrayList<>(liveNodes);
+ Collections.shuffle(liveNodesList, rand);
+ final var chosenNodeUrl =
Utils.getBaseUrlForNodeName(liveNodesList.get(0), urlScheme);
+ requestEndpoints.add(new LBSolrClient.Endpoint(chosenNodeUrl));
+ }
- for (int i = 0; i < response.size(); i++) {
- NamedList<?> shardResponse = (NamedList<?>) response.getVal(i);
- NamedList<?> header = (NamedList<?>) shardResponse.get("responseHeader");
- Integer shardStatus = (Integer) header.get("status");
- int s = shardStatus.intValue();
- if (s > 0) {
- status = s;
+ } else if (!request.requiresCollection()) {
+ for (String liveNode : liveNodes) {
+ final var nodeBaseUrl = Utils.getBaseUrlForNodeName(liveNode,
urlScheme);
+ requestEndpoints.add(new LBSolrClient.Endpoint(nodeBaseUrl));
}
- Object rfObj = header.get(UpdateRequest.REPFACT);
- if (rfObj != null && rfObj instanceof Integer routeRf) {
- if (rf == null || routeRf < rf) rf = routeRf;
+ } else { // API call to a particular collection / core / alias (i.e.
+ // request.requiresCollection() == true)
+ Set<String> collectionNames = resolveAliases(inputCollections);
+ if (collectionNames.isEmpty()) {
+ throw new SolrException(
+ SolrException.ErrorCode.BAD_REQUEST,
+ "No collection param specified on request and no default
collection has been set: "
+ + inputCollections);
}
- @SuppressWarnings("unchecked")
- List<SimpleOrderedMap<String>> shardTolerantErrors =
- (List<SimpleOrderedMap<String>>) header.get("errors");
- if (null != shardTolerantErrors) {
- Integer shardMaxToleratedErrors = (Integer) header.get("maxErrors");
- assert null != shardMaxToleratedErrors
- : "TolerantUpdateProcessor reported errors but not maxErrors";
- // if we get into some weird state where the nodes disagree about the
effective maxErrors,
- // assume the min value seen to decide if we should fail.
- maxToleratedErrors =
- Math.min(
- maxToleratedErrors,
-
ToleratedUpdateError.getEffectiveMaxErrors(shardMaxToleratedErrors.intValue()));
-
- if (null == toleratedErrors) {
- toleratedErrors = new
ArrayList<SimpleOrderedMap<String>>(shardTolerantErrors.size());
- }
- for (SimpleOrderedMap<String> err : shardTolerantErrors) {
- toleratedErrors.add(err);
+ List<String> preferredNodes = request.getPreferredNodes();
+ if (preferredNodes != null && !preferredNodes.isEmpty()) {
+ String joinedInputCollections = StrUtils.join(inputCollections, ',');
+ final var endpoints =
+ preferredNodes.stream()
+ .map(nodeName -> Utils.getBaseUrlForNodeName(nodeName,
urlScheme))
+ .map(nodeUrl -> new LBSolrClient.Endpoint(nodeUrl,
joinedInputCollections))
+ .collect(Collectors.toList());
+ if (!endpoints.isEmpty()) {
+ LBSolrClient.Req req = new LBSolrClient.Req(request, endpoints);
+ LBSolrClient.Rsp rsp = getLbClient().request(req);
+ return rsp.getResponse();
}
}
- for (String updateType : Arrays.asList("adds", "deletes",
"deleteByQuery")) {
- Object obj = shardResponse.get(updateType);
- if (obj instanceof NamedList<?> nl) {
- NamedList<Object> versionsList =
- versions.containsKey(updateType) ? versions.get(updateType) :
new NamedList<>();
- versionsList.addAll(nl);
- versions.put(updateType, versionsList);
+
+ // TODO: not a big deal because of the caching, but we could avoid
looking
+ // at every shard when getting leaders if we tweaked some things
+
+ // Retrieve slices from the cloud state and, for each collection
specified, add it to the Map
+ // of slices.
+ Map<String, Slice> slices = new HashMap<>();
+ String shardKeys = reqParams.get(ShardParams._ROUTE_);
+ for (String collectionName : collectionNames) {
+ DocCollection col = getDocCollection(collectionName, null);
+ if (col == null) {
+ throw new SolrException(
+ SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " +
collectionName);
}
+ Collection<Slice> routeSlices =
col.getRouter().getSearchSlices(shardKeys, reqParams, col);
+ ClientUtils.addSlices(slices, collectionName, routeSlices, true);
}
- }
- NamedList<Object> cheader = new NamedList<>();
- cheader.add("status", status);
- cheader.add("QTime", timeMillis);
- if (rf != null) cheader.add(UpdateRequest.REPFACT, rf);
- if (null != toleratedErrors) {
- cheader.add("maxErrors",
ToleratedUpdateError.getUserFriendlyMaxErrors(maxToleratedErrors));
- cheader.add("errors", toleratedErrors);
- if (maxToleratedErrors < toleratedErrors.size()) {
- // cumulative errors are too high, we need to throw a client exception
w/correct metadata
+ // Gather URLs, grouped by leader or replica
+ List<Replica> sortedReplicas = new ArrayList<>();
+ List<Replica> replicas = new ArrayList<>();
+ for (Slice slice : slices.values()) {
+ Replica leader = slice.getLeader();
+ for (Replica replica : slice.getReplicas()) {
+ String node = replica.getNodeName();
+ if (!liveNodes.contains(node) // Must be a live node to continue
+ || replica.getState()
+ != Replica.State.ACTIVE) { // Must be an ACTIVE replica to
continue
+ continue;
+ }
+ if (sendToLeaders && replica.equals(leader)) {
+ sortedReplicas.add(replica); // put leaders here eagerly (if
sendToLeader mode)
+ } else {
+ replicas.add(replica); // replicas here
+ }
+ }
+ }
- // NOTE: it shouldn't be possible for 1 == toleratedErrors.size(),
because if that were the
- // case then at least one shard should have thrown a real error before
this, so we don't
- // worry about having a more "singular" exception msg for that
situation
- StringBuilder msgBuf =
- new StringBuilder()
- .append(toleratedErrors.size())
- .append(" Async failures during distributed update: ");
+ // Sort the leader replicas, if any, according to the request
preferences (none if
+ // !sendToLeaders)
+ replicaListTransformer.transform(sortedReplicas);
- NamedList<String> metadata = new NamedList<>();
- for (SimpleOrderedMap<String> err : toleratedErrors) {
- ToleratedUpdateError te = ToleratedUpdateError.parseMap(err);
- metadata.add(te.getMetadataKey(), te.getMetadataValue());
+ // Sort the replicas, if any, according to the request preferences and
append to our list
+ replicaListTransformer.transform(replicas);
- msgBuf.append("\n").append(te.getMessage());
- }
+ sortedReplicas.addAll(replicas);
- SolrException toThrow =
- new SolrException(SolrException.ErrorCode.BAD_REQUEST,
msgBuf.toString());
- toThrow.setMetadata(metadata);
- throw toThrow;
+ String joinedInputCollections = StrUtils.join(inputCollections, ',');
+ Set<String> seenNodes = new HashSet<>();
+ sortedReplicas.forEach(
+ replica -> {
+ if (seenNodes.add(replica.getNodeName())) {
+ if (inputCollections.size() == 1 && collectionNames.size() == 1)
{
+ // If we have a single collection name (and not an alias to
multiple collection),
+ // send the query directly to a replica of this collection.
+ requestEndpoints.add(
+ new LBSolrClient.Endpoint(replica.getBaseUrl(),
replica.getCoreName()));
+ } else {
+ requestEndpoints.add(
+ new LBSolrClient.Endpoint(replica.getBaseUrl(),
joinedInputCollections));
+ }
+ }
+ });
+
+ if (requestEndpoints.isEmpty()) {
+ collectionStateCache.keySet().removeAll(collectionNames);
+ throw new SolrException(
+ SolrException.ErrorCode.INVALID_STATE,
+ "Could not find a healthy node to handle the request.");
}
}
- for (Map.Entry<String, NamedList<Object>> entry : versions.entrySet()) {
- condensed.add(entry.getKey(), entry.getValue());
+
+ LBSolrClient.Req req = new LBSolrClient.Req(request, requestEndpoints);
+ LBSolrClient.Rsp rsp = getLbClient().request(req);
+ return rsp.getResponse();
+ }
+
+ /**
+ * Resolves the input collections to their possible aliased collections.
Doesn't validate
+ * collection existence.
+ */
+ private Set<String> resolveAliases(List<String> inputCollections) {
+ if (inputCollections.isEmpty()) {
+ return Set.of();
}
- condensed.add("responseHeader", cheader);
- return condensed;
+ LinkedHashSet<String> uniqueNames = new LinkedHashSet<>(); // consistent
ordering
+ for (String collectionName : inputCollections) {
+ if (getDocCollection(collectionName, -1) == null) {
+ // perhaps it's an alias
+
uniqueNames.addAll(getClusterStateProvider().resolveAlias(collectionName));
+ } else {
+ uniqueNames.add(collectionName); // it's a collection
+ }
+ }
+ return uniqueNames;
}
- @SuppressWarnings({"rawtypes"})
- public RouteResponse condenseResponse(NamedList<?> response, int timeMillis)
{
- return condenseResponse(response, timeMillis, RouteResponse::new);
+ /**
+ * If true, this client has been configured such that it will generally
prefer to send {@link
+ * SolrRequestType#UPDATE} requests to a shard leader, if and only if {@link
+ * UpdateRequest#isSendToLeaders} is also true. If false, then this client
has been configured to
+ * obey normal routing preferences when dealing with {@link
SolrRequestType#UPDATE} requests.
+ *
+ * @see #isDirectUpdatesToLeadersOnly
+ */
+ public boolean isUpdatesToLeaders() {
+ return updatesToLeaders;
}
- @SuppressWarnings({"rawtypes"})
- public static class RouteResponse<T extends LBSolrClient.Req> extends
NamedList<Object> {
- private NamedList<NamedList<?>> routeResponses;
- private Map<String, T> routes;
+ /**
+ * If true, this client has been configured such that "direct updates" will
<em>only</em> be sent
+ * to the current leader of the corresponding shard, and will not be retried
with other replicas.
+ * This method has no effect if {@link #isUpdatesToLeaders()} or {@link
+ * UpdateRequest#isSendToLeaders} returns false.
+ *
+ * <p>A "direct update" is any update that can be sent directly to a single
shard, and does not
+ * need to be broadcast to every shard. (Example: document updates or
"delete by id" when using
+ * the default router; non-direct updates are things like commits and
"delete by query").
+ *
+ * <p>NOTE: If a single {@link UpdateRequest} contains multiple "direct
updates" for different
+ * shards, this client may break the request up and merge the responses.
+ *
+ * @return true if direct updates are sent to shard leaders only
+ */
+ public boolean isDirectUpdatesToLeadersOnly() {
+ return directUpdatesToLeadersOnly;
+ }
- public void setRouteResponses(NamedList<NamedList<?>> routeResponses) {
- this.routeResponses = routeResponses;
- }
+ /** Visible for tests so they can assert the configured refresh parallelism.
*/
+ protected int getStateRefreshParallelism() {
+ return stateRefreshParallelism;
+ }
- public NamedList<NamedList<?>> getRouteResponses() {
- return routeResponses;
+ protected DocCollection getDocCollection(String collection, Integer
expectedVersion)
+ throws SolrException {
+ if (expectedVersion == null) {
+ expectedVersion = -1;
+ }
+ if (collection == null) {
+ return null;
}
- public void setRoutes(Map<String, T> routes) {
- this.routes = routes;
+ ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(collection);
+ if (cacheEntry != null &&
cacheEntry.isExpired(collectionStateCache.timeToLiveMs)) {
+ collectionStateCache.remove(collection, cacheEntry);
+ cacheEntry = null;
}
- public Map<String, T> getRoutes() {
- return routes;
+ DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
+
+ if (cacheEntry != null && cacheEntry.shouldRetry()) {
+ triggerCollectionRefresh(collection);
}
- }
- public static class RouteException extends SolrException {
+ if (cached != null && expectedVersion <= cached.getZNodeVersion()) {
+ return cached;
+ }
- private NamedList<Throwable> throwables;
- private Map<String, ? extends LBSolrClient.Req> routes;
+ CompletableFuture<DocCollection> refreshFuture =
triggerCollectionRefresh(collection);
+ return waitForCollectionRefresh(collection, refreshFuture);
+ }
- public RouteException(
- ErrorCode errorCode,
- NamedList<Throwable> throwables,
- Map<String, ? extends LBSolrClient.Req> routes) {
- super(errorCode, throwables.getVal(0).getMessage(),
throwables.getVal(0));
- this.throwables = throwables;
- this.routes = routes;
+ private CompletableFuture<DocCollection> triggerCollectionRefresh(String
collection) {
+ return collectionRefreshes.compute(
+ collection,
+ (key, existingFuture) -> {
+ // A refresh is still in progress; return it.
+ if (existingFuture != null && !existingFuture.isDone()) {
+ return existingFuture;
+ }
+ // No refresh is in-progress, so trigger it.
- // create a merged copy of the metadata from all wrapped exceptions
- NamedList<String> metadata = new NamedList<String>();
- for (int i = 0; i < throwables.size(); i++) {
- Throwable t = throwables.getVal(i);
- if (t instanceof SolrException e) {
- NamedList<String> eMeta = e.getMetadata();
- if (null != eMeta) {
- metadata.addAll(eMeta);
+ if (ExecutorUtil.isShutdown(threadPool)) {
+ assert closed; // see close() for the sequence
+ ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(key);
+ DocCollection cached = cacheEntry == null ? null :
cacheEntry.cached;
+ return CompletableFuture.completedFuture(cached);
+ } else {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ stateRefreshSemaphore.acquireUninterruptibly();
+ try {
+ return loadDocCollection(key);
+ } finally {
+ stateRefreshSemaphore.release();
+ // Remove the entry in case of many collections
+ collectionRefreshes.remove(key);
+ }
+ },
+ threadPool);
}
- }
- }
- if (0 < metadata.size()) {
- this.setMetadata(metadata);
+ });
+ }
+
+ private DocCollection waitForCollectionRefresh(
+ String collection, CompletableFuture<DocCollection> refreshFuture) {
+ try {
+ return refreshFuture.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ "Interrupted while refreshing state for collection " + collection,
+ e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SolrException) {
+ throw (SolrException) cause;
}
+ throw new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ "Error refreshing state for collection " + collection,
+ cause);
}
+ }
- public NamedList<Throwable> getThrowables() {
- return throwables;
+ private DocCollection loadDocCollection(String collection) {
+ ClusterState.CollectionRef ref = getCollectionRef(collection);
+ if (ref == null) {
+ collectionStateCache.remove(collection);
+ return null;
}
- public Map<String, ? extends LBSolrClient.Req> getRoutes() {
- return this.routes;
+ DocCollection fetchedCol = ref.get();
+ if (fetchedCol == null) {
+ collectionStateCache.remove(collection);
+ return null;
}
- }
- @Override
- public NamedList<Object> request(SolrRequest<?> request, String collection)
- throws SolrServerException, IOException {
- // the collection parameter of the request overrides that of the parameter
to this method
- String requestCollection = request.getCollection();
- if (requestCollection != null) {
- collection = requestCollection;
- } else if (collection == null) {
- collection = defaultCollection;
+ ExpiringCachedDocCollection existing =
collectionStateCache.peek(collection);
+ if (existing != null && existing.cached.getZNodeVersion() ==
fetchedCol.getZNodeVersion()) {
+ existing.setRetriedAt();
+ existing.maybeStale = false;
+ return existing.cached;
}
- List<String> inputCollections =
- collection == null ? List.of() : StrUtils.splitSmart(collection, ",",
true);
- return requestWithRetryOnStaleState(
- request,
- 0,
- inputCollections,
- /*skipStateVersion*/ false,
- Map.of(),
- /*waitedForRefresh*/ false);
+ collectionStateCache.put(collection, new
ExpiringCachedDocCollection(fetchedCol));
+ return fetchedCol;
+ }
+
+ ClusterState.CollectionRef getCollectionRef(String collection) {
+ return getClusterStateProvider().getState(collection);
}
/**
- * As this class doesn't watch external collections on the client side,
there's a chance that the
- * request will fail due to cached stale state, which means the state must
be refreshed from ZK
- * and retried.
+ * Useful for determining the minimum achieved replication factor across all
shards involved in
+ * processing an update request, typically useful for gauging the
replication factor of a batch.
*/
- protected NamedList<Object> requestWithRetryOnStaleState(
- SolrRequest<?> request,
- int retryCount,
- List<String> inputCollections,
- boolean skipStateVersion,
- Map<String, CompletableFuture<DocCollection>> pendingRefreshes,
- boolean waitedForRefresh)
- throws SolrServerException, IOException {
- // build up a _stateVer_ param to pass to the server containing all the
- // external collection state versions involved in this request, which
allows
- // the server to notify us that our cached state for one or more of the
external
- // collections is stale and needs to be refreshed ... this code has no
impact on internal
- // collections
- String stateVerParam = null;
- List<DocCollection> requestedCollections = null;
- boolean isCollectionRequestOfV2 = false;
- if (request instanceof V2Request) {
- isCollectionRequestOfV2 = ((V2Request) request).isPerCollectionRequest();
- }
- boolean isAdmin =
- request.getRequestType() == SolrRequestType.ADMIN &&
!request.requiresCollection();
- if (!inputCollections.isEmpty()
- && !isAdmin
- && !isCollectionRequestOfV2) { // don't do _stateVer_ checking for
admin, v2 api requests
- Set<String> requestedCollectionNames = resolveAliases(inputCollections);
-
- StringBuilder stateVerParamBuilder = null;
- for (String requestedCollection : requestedCollectionNames) {
- // track the version of state we're using on the client side using the
_stateVer_ param
- DocCollection coll = getDocCollection(requestedCollection, null);
- if (coll == null) {
- throw new SolrException(
- SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " +
requestedCollection);
- }
- int collVer = coll.getZNodeVersion();
- if (requestedCollections == null)
- requestedCollections = new
ArrayList<>(requestedCollectionNames.size());
- requestedCollections.add(coll);
-
- if (stateVerParamBuilder == null) {
- stateVerParamBuilder = new StringBuilder();
- } else {
- stateVerParamBuilder.append(
- "|"); // hopefully pipe is not an allowed char in a collection
name
- }
-
-
stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
- }
+ public int getMinAchievedReplicationFactor(String collection, NamedList<?>
resp) {
+ // it's probably already on the top-level header set by condense
+ NamedList<?> header = (NamedList<?>) resp.get("responseHeader");
+ Integer achRf = (Integer) header.get(UpdateRequest.REPFACT);
+ if (achRf != null) return achRf.intValue();
- if (stateVerParamBuilder != null) {
- stateVerParam = stateVerParamBuilder.toString();
+ // not on the top-level header, walk the shard route tree
+ Map<String, Integer> shardRf = getShardReplicationFactor(collection, resp);
+ for (Integer rf : shardRf.values()) {
+ if (achRf == null || rf < achRf) {
+ achRf = rf;
}
}
+ return (achRf != null) ? achRf.intValue() : -1;
+ }
- if (request.getParams() instanceof ModifiableSolrParams params) {
- if (!skipStateVersion && stateVerParam != null) {
- params.set(STATE_VERSION, stateVerParam);
- } else {
- params.remove(STATE_VERSION);
- }
- } // else: ??? how to set this ???
-
- NamedList<Object> resp = null;
- try {
- resp = sendRequest(request, inputCollections);
- // to avoid an O(n) operation we always add STATE_VERSION to the last
and try to read it from
- // there
- Object o = resp == null || resp.size() == 0 ? null :
resp.get(STATE_VERSION, resp.size() - 1);
- if (o != null && o instanceof Map<?, ?> invalidStates) {
- // remove this because no one else needs this and tests would fail if
they are comparing
- // responses
- resp.remove(resp.size() - 1);
- for (Map.Entry<?, ?> e : invalidStates.entrySet()) {
- getDocCollection((String) e.getKey(), (Integer) e.getValue());
- }
- }
- } catch (Exception exc) {
-
- Throwable rootCause = SolrException.getRootCause(exc);
- // don't do retry support for admin requests
- // or if the request doesn't have a collection specified
- // or request is v2 api and its method is not GET
- if (inputCollections.isEmpty()
- || isAdmin
- || (request.getApiVersion() == SolrRequest.ApiVersion.V2
- && request.getMethod() != SolrRequest.METHOD.GET)) {
- if (exc instanceof SolrServerException) {
- throw (SolrServerException) exc;
- } else if (exc instanceof IOException) {
- throw (IOException) exc;
- } else if (exc instanceof RuntimeException) {
- throw (RuntimeException) exc;
- } else {
- throw new SolrServerException(rootCause);
+ /**
+ * Walks the NamedList response after performing an update request looking
for the replication
+ * factor that was achieved in each shard involved in the request. For
single doc updates, there
+ * will be only one shard in the return value.
+ */
+ public Map<String, Integer> getShardReplicationFactor(String collection,
NamedList<?> resp) {
+ Map<String, Integer> results = new HashMap<>();
+ if (resp instanceof RouteResponse) {
+ NamedList<NamedList<?>> routes = ((RouteResponse<?>)
resp).getRouteResponses();
+ DocCollection coll = getDocCollection(collection, null);
+ Map<String, String> leaders = new HashMap<>();
+ for (Slice slice : coll.getActiveSlices()) {
+ Replica leader = slice.getLeader();
+ if (leader != null) {
+ String leaderUrl = leader.getBaseUrl() + "/" + leader.getCoreName();
+ leaders.put(leaderUrl, slice.getName());
+ String altLeaderUrl = leader.getBaseUrl() + "/" + collection;
+ leaders.put(altLeaderUrl, slice.getName());
}
}
- int errorCode =
- (rootCause instanceof SolrException)
- ? ((SolrException) rootCause).code()
- : SolrException.ErrorCode.UNKNOWN.code;
-
- final boolean wasCommError = wasCommError(rootCause);
-
- if (wasCommError
- || (exc instanceof RouteException
- && (errorCode == 503)) // 404 because the core does not exist
503 service unavailable
- // TODO there are other reasons for 404. We need to change the solr
response format from HTML
- // to structured data to know that
- ) {
- // it was a communication error. it is likely that
- // the node to which the request to be sent is down . So , expire the
state
- // so that the next attempt would fetch the fresh state
- // just re-read state for all of them, if it has not been retried
- // in retryExpiryTime time
- if (requestedCollections != null) {
- for (DocCollection ext : requestedCollections) {
- String name = ext.getName();
- ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(name);
- if (cacheEntry != null) {
- if (wasCommError) {
- cacheEntry.maybeStale = true;
- } else {
- boolean markedStale =
-
cacheEntry.markMaybeStaleIfOutsideBackoff(retryExpiryTimeNano);
- if (markedStale && cacheEntry.shouldRetry()) {
- triggerCollectionRefresh(name);
- }
- }
- } else {
- triggerCollectionRefresh(name);
+ Iterator<Map.Entry<String, NamedList<?>>> routeIter = routes.iterator();
+ while (routeIter.hasNext()) {
+ Map.Entry<String, NamedList<?>> next = routeIter.next();
+ String host = next.getKey();
+ NamedList<?> hostResp = next.getValue();
+ Integer rf =
+ (Integer) ((NamedList<?>)
hostResp.get("responseHeader")).get(UpdateRequest.REPFACT);
+ if (rf != null) {
+ String shard = leaders.get(host);
+ if (shard == null) {
+ if (host.endsWith("/")) shard = leaders.get(host.substring(0,
host.length() - 1));
+ if (shard == null) {
+ shard = host;
}
}
+ results.put(shard, rf);
}
- if (retryCount < MAX_STALE_RETRIES) { // if it is a communication
error , we must try again
- // may be, we have a stale version of the collection state,
- // and we could not get any information from the server
- // it is probably not worth trying again and again because
- // the state would not have been updated
- log.info(
- "Request to collection {} failed due to ({}) {}, retry={}
maxRetries={} commError={} errorCode={} - retrying",
- inputCollections,
- errorCode,
- rootCause,
- retryCount,
- MAX_STALE_RETRIES,
- wasCommError,
- errorCode);
- return requestWithRetryOnStaleState(
- request,
- retryCount + 1,
- inputCollections,
- skipStateVersion,
- pendingRefreshes,
- waitedForRefresh);
- }
- } else {
- log.info("request was not communication error it seems");
}
- log.info(
- "Request to collection {} failed due to ({}) {}, retry={}
maxRetries={} commError={} errorCode={} ",
- inputCollections,
- errorCode,
- rootCause,
- retryCount,
- MAX_STALE_RETRIES,
- wasCommError,
- errorCode);
-
- boolean stateWasStale = false;
- if (retryCount < MAX_STALE_RETRIES
- && requestedCollections != null
- && !requestedCollections.isEmpty()
- && (SolrException.ErrorCode.getErrorCode(errorCode)
- == SolrException.ErrorCode.INVALID_STATE
- || errorCode == 404)) {
- // cached state for one or more external collections was stale
- // re-issue request using updated state
- stateWasStale = true;
+ }
+ return results;
+ }
- // just re-read state for all of them, which is a little heavy-handed
but hopefully a rare
- // occurrence
- for (DocCollection ext : requestedCollections) {
- collectionStateCache.remove(ext.getName());
- }
- }
+ /**
+ * Constructs {@link CloudSolrClient} instances from provided configuration.
It will use a Jetty
+ * based {@code HttpClient} if available, or will otherwise use the JDK.
+ */
+ public static class Builder {
- // if we experienced a communication error, it's worth checking the state
- // with ZK just to make sure the node we're trying to hit is still part
of the collection
- if (retryCount < MAX_STALE_RETRIES
- && !stateWasStale
- && requestedCollections != null
- && !requestedCollections.isEmpty()
- && wasCommError) {
- for (DocCollection ext : requestedCollections) {
- DocCollection latestStateFromZk = getDocCollection(ext.getName(),
null);
- if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
- // looks like we couldn't reach the server because the state was
stale == retry
- stateWasStale = true;
- // we just pulled state from ZK, so update the cache so that the
retry uses it
- collectionStateCache.put(
- ext.getName(), new
ExpiringCachedDocCollection(latestStateFromZk));
- }
- }
- }
-
- // if the state was stale, then we retry the request once with new state
pulled from Zk
- if (stateWasStale) {
- log.warn(
- "Re-trying request to collection(s) {} after stale state error
from server.",
- inputCollections);
-
- Map<String, CompletableFuture<DocCollection>> refreshesToWaitFor =
pendingRefreshes;
- if (!waitedForRefresh && (pendingRefreshes == null ||
pendingRefreshes.isEmpty())) {
- refreshesToWaitFor = new HashMap<>();
- for (DocCollection ext : requestedCollections) {
- refreshesToWaitFor.put(ext.getName(),
triggerCollectionRefresh(ext.getName()));
- }
- }
+ protected Collection<String> zkHosts = new ArrayList<>();
+ protected List<String> solrUrls = new ArrayList<>();
+ protected String zkChroot;
+ protected HttpSolrClient httpClient;
+ protected boolean shardLeadersOnly = true;
+ protected boolean directUpdatesToLeadersOnly = false;
+ protected boolean parallelUpdates = true;
+ protected ClusterStateProvider stateProvider;
+ protected HttpSolrClient.BuilderBase<?, ?> internalClientBuilder;
+ protected RequestWriter requestWriter;
+ protected ResponseParser responseParser;
+ protected long retryExpiryTimeNano =
+ TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS); // 3 seconds or 3
million nanos
- // First retry without sending state versions so the server does not
immediately reject the
- // request while we intentionally rely on stale routing (e.g., to
allow forwarding to a new
- // leader) as the background refresh completes.
- if (!skipStateVersion && !waitedForRefresh) {
- resp =
- requestWithRetryOnStaleState(
- request,
- retryCount + 1,
- inputCollections,
- /*skipStateVersion*/ true,
- refreshesToWaitFor,
- waitedForRefresh);
- } else if (!waitedForRefresh
- && refreshesToWaitFor != null
- && !refreshesToWaitFor.isEmpty()) {
- for (Map.Entry<String, CompletableFuture<DocCollection>> entry :
- refreshesToWaitFor.entrySet()) {
- waitForCollectionRefresh(entry.getKey(), entry.getValue());
- }
- resp =
- requestWithRetryOnStaleState(
- request,
- retryCount + 1,
- inputCollections,
- /*skipStateVersion*/ false,
- Map.of(),
- /*waitedForRefresh*/ true);
- } else {
- resp =
- requestWithRetryOnStaleState(
- request,
- retryCount + 1,
- inputCollections,
- /*skipStateVersion*/ false,
- Map.of(),
- /*waitedForRefresh*/ waitedForRefresh);
- }
- } else {
- if (exc instanceof SolrException
- || exc instanceof SolrServerException
- || exc instanceof IOException) {
- throw exc;
- } else {
- throw new SolrServerException(rootCause);
- }
- }
+ protected String defaultCollection;
+ protected long timeToLiveSeconds = 60;
+ protected int parallelCacheRefreshesLocks =
DEFAULT_STATE_REFRESH_PARALLELISM;
+ protected int zkConnectTimeout =
SolrZkClientTimeout.DEFAULT_ZK_CONNECT_TIMEOUT;
+ protected int zkClientTimeout =
SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT;
+ protected boolean canUseZkACLs = true;
- if (requestedCollections != null) {
- requestedCollections.clear(); // done with this
- }
+ /**
+ * Provide a series of Solr URLs to be used when configuring {@link
CloudSolrClient} instances.
+ * The solr client will use these urls to understand the cluster topology,
which solr nodes are
+ * active etc.
+ *
+ * <p>Provided Solr URLs are expected to point to the root Solr path
+ * ("http://hostname:8983/solr"); they should not include any collections,
cores, or other path
+ * components.
+ *
+ * <p>Usage example:
+ *
+ * <pre>
+ * final List<String> solrBaseUrls = new ArrayList<String>();
+ * solrBaseUrls.add("http://solr1:8983/solr");
solrBaseUrls.add("http://solr2:8983/solr");
solrBaseUrls.add("http://solr3:8983/solr");
+ * final SolrClient client = new
CloudSolrClient.Builder(solrBaseUrls).build();
+ * </pre>
+ */
+ public Builder(List<String> solrUrls) {
+ this.solrUrls = solrUrls;
}
- return resp;
- }
-
- protected NamedList<Object> sendRequest(SolrRequest<?> request, List<String>
inputCollections)
- throws SolrServerException, IOException {
- boolean sendToLeaders = false;
-
- if (request.getRequestType() == SolrRequestType.UPDATE) {
- sendToLeaders = this.isUpdatesToLeaders();
-
- if (sendToLeaders && request instanceof UpdateRequest updateRequest) {
- sendToLeaders = sendToLeaders && updateRequest.isSendToLeaders();
-
- // Check if we can do a "directUpdate" ...
- if (sendToLeaders) {
- if (inputCollections.size() > 1) {
- throw new SolrException(
- SolrException.ErrorCode.BAD_REQUEST,
- "Update request must be sent to a single collection "
- + "or an alias: "
- + inputCollections);
- }
- String collection =
- inputCollections.isEmpty()
- ? null
- : inputCollections.get(0); // getting first mimics
HttpSolrCall
- NamedList<Object> response = directUpdate(updateRequest, collection);
- if (response != null) {
- return response;
- }
- }
- }
+ /**
+ * Provide a series of ZK hosts which will be used when configuring {@link
CloudSolrClient}
+ * instances.
+ *
+ * <p>Usage example when Solr stores data at the ZooKeeper root ('/'):
+ *
+ * <pre>
+ * final List<String> zkServers = new ArrayList<String>();
+ * zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181");
zkServers.add("zookeeper3:2181");
+ * final SolrClient client = new CloudSolrClient.Builder(zkServers,
Optional.empty()).build();
+ * </pre>
+ *
+ * Usage example when Solr data is stored in a ZooKeeper chroot:
+ *
+ * <pre>
+ * final List<String> zkServers = new ArrayList<String>();
+ * zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181");
zkServers.add("zookeeper3:2181");
+ * final SolrClient client = new CloudSolrClient.Builder(zkServers,
Optional.of("/solr")).build();
+ * </pre>
+ *
+ * @param zkHosts a List of at least one ZooKeeper host and port (e.g.
"zookeeper1:2181")
+ * @param zkChroot the path to the root ZooKeeper node containing Solr
data. Provide {@code
+ * java.util.Optional.empty()} if no ZK chroot is used.
+ * @deprecated Use a connectionString constructor and/or prefer HTTP URLs
instead.
+ */
+ @Deprecated(since = "10.1") // sort of 10.0 but accidentally removed
+ public Builder(List<String> zkHosts, Optional<String> zkChroot) {
+ this.zkHosts = zkHosts;
+ if (zkChroot.isPresent()) this.zkChroot = zkChroot.get();
}
- SolrParams reqParams = request.getParams();
- assert reqParams != null;
-
- ReplicaListTransformer replicaListTransformer =
- requestRLTGenerator.getReplicaListTransformer(reqParams);
-
- final ClusterStateProvider provider = getClusterStateProvider();
- final String urlScheme = provider.getUrlScheme();
- final Set<String> liveNodes = provider.getLiveNodes();
-
- final List<LBSolrClient.Endpoint> requestEndpoints =
- new ArrayList<>(); // we populate this as follows...
-
- if (request.getApiVersion() == SolrRequest.ApiVersion.V2) {
- if (!liveNodes.isEmpty()) {
- List<String> liveNodesList = new ArrayList<>(liveNodes);
- Collections.shuffle(liveNodesList, rand);
- final var chosenNodeUrl =
Utils.getBaseUrlForNodeName(liveNodesList.get(0), urlScheme);
- requestEndpoints.add(new LBSolrClient.Endpoint(chosenNodeUrl));
- }
-
- } else if (!request.requiresCollection()) {
- for (String liveNode : liveNodes) {
- final var nodeBaseUrl = Utils.getBaseUrlForNodeName(liveNode,
urlScheme);
- requestEndpoints.add(new LBSolrClient.Endpoint(nodeBaseUrl));
- }
- } else { // API call to a particular collection / core / alias (i.e.
- // request.requiresCollection() == true)
- Set<String> collectionNames = resolveAliases(inputCollections);
- if (collectionNames.isEmpty()) {
- throw new SolrException(
- SolrException.ErrorCode.BAD_REQUEST,
- "No collection param specified on request and no default
collection has been set: "
- + inputCollections);
- }
-
- List<String> preferredNodes = request.getPreferredNodes();
- if (preferredNodes != null && !preferredNodes.isEmpty()) {
- String joinedInputCollections = StrUtils.join(inputCollections, ',');
- final var endpoints =
- preferredNodes.stream()
- .map(nodeName -> Utils.getBaseUrlForNodeName(nodeName,
urlScheme))
- .map(nodeUrl -> new LBSolrClient.Endpoint(nodeUrl,
joinedInputCollections))
- .collect(Collectors.toList());
- if (!endpoints.isEmpty()) {
- LBSolrClient.Req req = new LBSolrClient.Req(request, endpoints);
- LBSolrClient.Rsp rsp = getLbClient().request(req);
- return rsp.getResponse();
- }
- }
-
- // TODO: not a big deal because of the caching, but we could avoid
looking
- // at every shard when getting leaders if we tweaked some things
+ /** for an expert use-case */
+ public Builder(ClusterStateProvider stateProvider) {
+ this.stateProvider = stateProvider;
+ }
- // Retrieve slices from the cloud state and, for each collection
specified, add it to the Map
- // of slices.
- Map<String, Slice> slices = new HashMap<>();
- String shardKeys = reqParams.get(ShardParams._ROUTE_);
- for (String collectionName : collectionNames) {
- DocCollection col = getDocCollection(collectionName, null);
- if (col == null) {
- throw new SolrException(
- SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " +
collectionName);
- }
- Collection<Slice> routeSlices =
col.getRouter().getSearchSlices(shardKeys, reqParams, col);
- ClientUtils.addSlices(slices, collectionName, routeSlices, true);
- }
+ /**
+ * Creates a client builder based on a connection string of 2 possible
formats:
+ *
+ * <ul>
+ * <li>ZooKeeper connection string (optionally with chroot), e.g. {@code
+ * zk1:2181,zk2:2181,zk3:2181/solr}
+ * <li>Comma-separated list of Solr node base URLs (HTTP or HTTPS), e.g.
{@code
+ * http://solr1:8983/solr,http://solr2:8983/solr}
+ * </ul>
+ *
+ * @param connectionString a string specifying either ZooKeeper connection
string or HTTP(S)
+ * Solr URLs
+ * @throws IllegalArgumentException if string is null, empty, or malformed
+ */
+ public Builder(String connectionString) {
+ this(CloudSolrClientConnection.parse(connectionString));
+ }
- // Gather URLs, grouped by leader or replica
- List<Replica> sortedReplicas = new ArrayList<>();
- List<Replica> replicas = new ArrayList<>();
- for (Slice slice : slices.values()) {
- Replica leader = slice.getLeader();
- for (Replica replica : slice.getReplicas()) {
- String node = replica.getNodeName();
- if (!liveNodes.contains(node) // Must be a live node to continue
- || replica.getState()
- != Replica.State.ACTIVE) { // Must be an ACTIVE replica to
continue
- continue;
- }
- if (sendToLeaders && replica.equals(leader)) {
- sortedReplicas.add(replica); // put leaders here eagerly (if
sendToLeader mode)
- } else {
- replicas.add(replica); // replicas here
- }
- }
+ /**
+ * Creates a client builder from a {@link CloudSolrClientConnection}.
+ *
+ * @param connection instance of {@link CloudSolrClientConnection}, which
can be obtained from
+ * the solr connection string or created via the constructor
+ */
+ public Builder(CloudSolrClientConnection connection) {
+ if (connection.isZookeeper()) {
+ this.zkHosts = connection.quorumItems();
+ this.zkChroot = connection.zkChroot();
+ } else {
+ this.solrUrls = connection.quorumItems();
}
+ }
- // Sort the leader replicas, if any, according to the request
preferences (none if
- // !sendToLeaders)
- replicaListTransformer.transform(sortedReplicas);
+ /** Whether to use the default ZK ACLs when building a ZK Client. */
+ public Builder canUseZkACLs(boolean canUseZkACLs) {
+ this.canUseZkACLs = canUseZkACLs;
+ return this;
+ }
- // Sort the replicas, if any, according to the request preferences and
append to our list
- replicaListTransformer.transform(replicas);
+ /**
+ * Tells {@link Builder} that created clients should be configured such
that {@link
+ * CloudSolrClient#isUpdatesToLeaders} returns <code>true</code>.
+ *
+ * @see #sendUpdatesToAnyReplica
+ * @see CloudSolrClient#isUpdatesToLeaders
+ */
+ public Builder sendUpdatesOnlyToShardLeaders() {
+ shardLeadersOnly = true;
+ return this;
+ }
- sortedReplicas.addAll(replicas);
+ /**
+ * Tells {@link Builder} that created clients should be configured such
that {@link
+ * CloudSolrClient#isUpdatesToLeaders} returns <code>false</code>.
+ *
+ * @see #sendUpdatesOnlyToShardLeaders
+ * @see CloudSolrClient#isUpdatesToLeaders
+ */
+ public Builder sendUpdatesToAnyReplica() {
+ shardLeadersOnly = false;
+ return this;
+ }
- String joinedInputCollections = StrUtils.join(inputCollections, ',');
- Set<String> seenNodes = new HashSet<>();
- sortedReplicas.forEach(
- replica -> {
- if (seenNodes.add(replica.getNodeName())) {
- if (inputCollections.size() == 1 && collectionNames.size() == 1)
{
- // If we have a single collection name (and not an alias to
multiple collection),
- // send the query directly to a replica of this collection.
- requestEndpoints.add(
- new LBSolrClient.Endpoint(replica.getBaseUrl(),
replica.getCoreName()));
- } else {
- requestEndpoints.add(
- new LBSolrClient.Endpoint(replica.getBaseUrl(),
joinedInputCollections));
- }
- }
- });
+ /**
+ * Tells {@link CloudSolrClient.Builder} that created clients should send
direct updates to
+ * shard leaders only.
+ *
+ * <p>UpdateRequests whose leaders cannot be found will "fail fast" on the
client side with a
+ * {@link SolrException}
+ *
+ * @see #sendDirectUpdatesToAnyShardReplica
+ * @see CloudSolrClient#isDirectUpdatesToLeadersOnly
+ */
+ public Builder sendDirectUpdatesToShardLeadersOnly() {
+ directUpdatesToLeadersOnly = true;
+ return this;
+ }
- if (requestEndpoints.isEmpty()) {
- collectionStateCache.keySet().removeAll(collectionNames);
- throw new SolrException(
- SolrException.ErrorCode.INVALID_STATE,
- "Could not find a healthy node to handle the request.");
- }
+ /**
+ * Tells {@link CloudSolrClient.Builder} that created clients can send
updates to any shard
+ * replica (shard leaders and non-leaders).
+ *
+ * <p>Shard leaders are still preferred, but the created clients will fall
back to using other
+ * replicas if a leader cannot be found.
+ *
+ * @see #sendDirectUpdatesToShardLeadersOnly
+ * @see CloudSolrClient#isDirectUpdatesToLeadersOnly
+ */
+ public Builder sendDirectUpdatesToAnyShardReplica() {
+ directUpdatesToLeadersOnly = false;
+ return this;
}
- LBSolrClient.Req req = new LBSolrClient.Req(request, requestEndpoints);
- LBSolrClient.Rsp rsp = getLbClient().request(req);
- return rsp.getResponse();
- }
-
- /**
- * Resolves the input collections to their possible aliased collections.
Doesn't validate
- * collection existence.
- */
- private Set<String> resolveAliases(List<String> inputCollections) {
- if (inputCollections.isEmpty()) {
- return Set.of();
- }
- LinkedHashSet<String> uniqueNames = new LinkedHashSet<>(); // consistent
ordering
- for (String collectionName : inputCollections) {
- if (getDocCollection(collectionName, -1) == null) {
- // perhaps it's an alias
-
uniqueNames.addAll(getClusterStateProvider().resolveAlias(collectionName));
- } else {
- uniqueNames.add(collectionName); // it's a collection
- }
+ /** Provides a {@link RequestWriter} for created clients to use when
handing requests. */
+ public Builder withRequestWriter(RequestWriter requestWriter) {
+ this.requestWriter = requestWriter;
+ return this;
}
- return uniqueNames;
- }
- /**
- * If true, this client has been configured such that it will generally
prefer to send {@link
- * SolrRequestType#UPDATE} requests to a shard leader, if and only if {@link
- * UpdateRequest#isSendToLeaders} is also true. If false, then this client
has been configured to
- * obey normal routing preferences when dealing with {@link
SolrRequestType#UPDATE} requests.
- *
- * @see #isDirectUpdatesToLeadersOnly
- */
- public boolean isUpdatesToLeaders() {
- return updatesToLeaders;
- }
+ /** Provides a {@link ResponseParser} for created clients to use when
handling requests. */
+ public Builder withResponseParser(ResponseParser responseParser) {
+ this.responseParser = responseParser;
+ return this;
+ }
- /**
- * If true, this client has been configured such that "direct updates" will
<em>only</em> be sent
- * to the current leader of the corresponding shard, and will not be retried
with other replicas.
- * This method has no effect if {@link #isUpdatesToLeaders()} or {@link
- * UpdateRequest#isSendToLeaders} returns false.
- *
- * <p>A "direct update" is any update that can be sent directly to a single
shard, and does not
- * need to be broadcast to every shard. (Example: document updates or
"delete by id" when using
- * the default router; non-direct updates are things like commits and
"delete by query").
- *
- * <p>NOTE: If a single {@link UpdateRequest} contains multiple "direct
updates" for different
- * shards, this client may break the request up and merge the responses.
- *
- * @return true if direct updates are sent to shard leaders only
- */
- public boolean isDirectUpdatesToLeadersOnly() {
- return directUpdatesToLeadersOnly;
- }
+ /**
+ * Tells {@link CloudSolrClient.Builder} whether created clients should
send shard updates
+ * serially or in parallel
+ *
+ * <p>When an {@link UpdateRequest} affects multiple shards, {@link
CloudSolrClient} splits it
+ * up and sends a request to each affected shard. This setting chooses
whether those
+ * sub-requests are sent serially or in parallel.
+ *
+ * <p>If not set, this defaults to 'true' and sends sub-requests in
parallel.
+ */
+ public Builder withParallelUpdates(boolean parallelUpdates) {
+ this.parallelUpdates = parallelUpdates;
+ return this;
+ }
- /** Visible for tests so they can assert the configured refresh parallelism.
*/
- protected int getStateRefreshParallelism() {
- return stateRefreshParallelism;
- }
+ /**
+ * Configures how many collection state refresh operations may run in
parallel using a dedicated
+ * thread pool. This controls the maximum number of concurrent
ZooKeeper/cluster state lookups.
+ *
+ * <p>Defaults to 5.
+ */
+ public Builder withParallelCacheRefreshes(int parallelCacheRefreshesLocks)
{
+ this.parallelCacheRefreshesLocks = parallelCacheRefreshesLocks;
+ return this;
+ }
- protected DocCollection getDocCollection(String collection, Integer
expectedVersion)
- throws SolrException {
- if (expectedVersion == null) {
- expectedVersion = -1;
+ /**
+ * This is the time to wait to re-fetch the state after getting the same
state version from ZK
+ */
+ public Builder withRetryExpiryTime(long expiryTime, TimeUnit unit) {
+ this.retryExpiryTimeNano = TimeUnit.NANOSECONDS.convert(expiryTime,
unit);
+ return this;
}
- if (collection == null) {
- return null;
+
+ /** Sets the default collection for request. */
+ public Builder withDefaultCollection(String defaultCollection) {
+ this.defaultCollection = defaultCollection;
+ return this;
}
- ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(collection);
- if (cacheEntry != null &&
cacheEntry.isExpired(collectionStateCache.timeToLiveMs)) {
- collectionStateCache.remove(collection, cacheEntry);
- cacheEntry = null;
+ /**
+ * Sets the cache ttl for DocCollection Objects cached.
+ *
+ * @param timeToLive ttl value
+ */
+ public Builder withCollectionCacheTtl(long timeToLive, TimeUnit unit) {
+ assert timeToLive > 0;
+ this.timeToLiveSeconds = TimeUnit.SECONDS.convert(timeToLive, unit);
+ return this;
}
- DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
+ /**
+ * Set the internal Solr HTTP client.
+ *
+ * <p>Note: closing the client instance is the responsibility of the
caller.
+ *
+ * @return this
+ */
+ public Builder withHttpClient(HttpSolrClient httpSolrClient) {
+ if (this.internalClientBuilder != null) {
+ throw new IllegalStateException(
+ "The builder can't accept an httpClient AND an
internalClientBuilder, only one of those can be provided");
+ }
+ this.httpClient = httpSolrClient;
+ return this;
+ }
- if (cacheEntry != null && cacheEntry.shouldRetry()) {
- triggerCollectionRefresh(collection);
+ /**
+ * If provided, the CloudSolrClient will build it's internal client using
this builder (instead
+ * of the empty default one). Providing this builder allows users to
configure the internal
+ * clients (authentication, timeouts, etc.).
+ *
+ * @param internalClientBuilder the builder to use for creating the
internal http client.
+ * @return this
+ */
+ public Builder withHttpClientBuilder(HttpSolrClient.BuilderBase<?, ?>
internalClientBuilder) {
+ if (this.httpClient != null) {
+ throw new IllegalStateException(
+ "The builder can't accept an httpClient AND an
internalClientBuilder, only one of those can be provided");
+ }
+ this.internalClientBuilder = internalClientBuilder;
+ return this;
}
- if (cached != null && expectedVersion <= cached.getZNodeVersion()) {
- return cached;
+ @Deprecated(since = "9.10")
+ public Builder withInternalClientBuilder(
+ HttpSolrClient.BuilderBase<?, ?> internalClientBuilder) {
+ return withHttpClientBuilder(internalClientBuilder);
}
- CompletableFuture<DocCollection> refreshFuture =
triggerCollectionRefresh(collection);
- return waitForCollectionRefresh(collection, refreshFuture);
- }
+ /**
+ * Sets the Zk connection timeout
+ *
+ * @param zkConnectTimeout timeout value
+ * @param unit time unit
+ */
+ public Builder withZkConnectTimeout(int zkConnectTimeout, TimeUnit unit) {
+ this.zkConnectTimeout = Math.toIntExact(unit.toMillis(zkConnectTimeout));
+ return this;
+ }
- private CompletableFuture<DocCollection> triggerCollectionRefresh(String
collection) {
- return collectionRefreshes.compute(
- collection,
- (key, existingFuture) -> {
- // A refresh is still in progress; return it.
- if (existingFuture != null && !existingFuture.isDone()) {
- return existingFuture;
- }
- // No refresh is in-progress, so trigger it.
+ /**
+ * Sets the Zk client session timeout
+ *
+ * @param zkClientTimeout timeout value
+ * @param unit time unit
+ */
+ public Builder withZkClientTimeout(int zkClientTimeout, TimeUnit unit) {
+ this.zkClientTimeout = Math.toIntExact(unit.toMillis(zkClientTimeout));
+ return this;
+ }
- if (ExecutorUtil.isShutdown(threadPool)) {
- assert closed; // see close() for the sequence
- ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(key);
- DocCollection cached = cacheEntry == null ? null :
cacheEntry.cached;
- return CompletableFuture.completedFuture(cached);
- } else {
- return CompletableFuture.supplyAsync(
- () -> {
- stateRefreshSemaphore.acquireUninterruptibly();
- try {
- return loadDocCollection(key);
- } finally {
- stateRefreshSemaphore.release();
- // Remove the entry in case of many collections
- collectionRefreshes.remove(key);
- }
- },
- threadPool);
- }
- });
- }
+ /** Create a {@link CloudSolrClient} based on the provided configuration.
*/
+ public CloudHttp2SolrClient build() {
+ int providedOptions = 0;
+ if (!zkHosts.isEmpty()) providedOptions++;
+ if (!solrUrls.isEmpty()) providedOptions++;
+ if (stateProvider != null) providedOptions++;
- private DocCollection waitForCollectionRefresh(
- String collection, CompletableFuture<DocCollection> refreshFuture) {
- try {
- return refreshFuture.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(
- SolrException.ErrorCode.SERVER_ERROR,
- "Interrupted while refreshing state for collection " + collection,
- e);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof SolrException) {
- throw (SolrException) cause;
+ if (providedOptions > 1) {
+ throw new IllegalArgumentException(
+ "Only one of zkHost(s), solrUrl(s), or stateProvider should be
specified.");
+ } else if (providedOptions == 0) {
+ throw new IllegalArgumentException(
+ "One of zkHosts, solrUrls, or stateProvider must be specified.");
}
- throw new SolrException(
- SolrException.ErrorCode.SERVER_ERROR,
- "Error refreshing state for collection " + collection,
- cause);
+
+ return new CloudHttp2SolrClient(this);
}
- }
- private DocCollection loadDocCollection(String collection) {
- ClusterState.CollectionRef ref = getCollectionRef(collection);
- if (ref == null) {
- collectionStateCache.remove(collection);
- return null;
+ protected HttpSolrClient createOrGetHttpClient() {
+ if (httpClient != null) {
+ return httpClient;
+ } else if (internalClientBuilder != null) {
+ return internalClientBuilder.build();
+ } else {
+ return HttpSolrClient.builder(null).build();
+ }
}
- DocCollection fetchedCol = ref.get();
- if (fetchedCol == null) {
- collectionStateCache.remove(collection);
- return null;
+ protected LBSolrClient createOrGetLbClient(HttpSolrClient myClient) {
+ return myClient.createLBSolrClient();
}
- ExpiringCachedDocCollection existing =
collectionStateCache.peek(collection);
- if (existing != null && existing.cached.getZNodeVersion() ==
fetchedCol.getZNodeVersion()) {
- existing.setRetriedAt();
- existing.maybeStale = false;
- return existing.cached;
+ protected ClusterStateProvider createZkClusterStateProvider() {
+ ClusterStateProvider stateProvider =
+ ClusterStateProvider.newZkClusterStateProvider(zkHosts, zkChroot,
canUseZkACLs);
+ if (stateProvider instanceof
SolrZkClientTimeout.SolrZkClientTimeoutAware timeoutAware) {
+ timeoutAware.setZkClientTimeout(zkClientTimeout);
+ timeoutAware.setZkConnectTimeout(zkConnectTimeout);
+ }
+ return stateProvider;
}
- collectionStateCache.put(collection, new
ExpiringCachedDocCollection(fetchedCol));
- return fetchedCol;
- }
-
- ClusterState.CollectionRef getCollectionRef(String collection) {
- return getClusterStateProvider().getState(collection);
+ protected ClusterStateProvider
createHttpClusterStateProvider(HttpSolrClient httpClient) {
+ try {
+ return new HttpClusterStateProvider<>(solrUrls, httpClient);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Couldn't initialize a HttpClusterStateProvider (is/are the "
+ + "Solr server(s), "
+ + solrUrls
+ + ", down?)",
+ e);
+ }
+ }
}
- /**
- * Useful for determining the minimum achieved replication factor across all
shards involved in
- * processing an update request, typically useful for gauging the
replication factor of a batch.
- */
- public int getMinAchievedReplicationFactor(String collection, NamedList<?>
resp) {
- // it's probably already on the top-level header set by condense
- NamedList<?> header = (NamedList<?>) resp.get("responseHeader");
- Integer achRf = (Integer) header.get(UpdateRequest.REPFACT);
- if (achRf != null) return achRf.intValue();
+ protected static class StateCache extends ConcurrentHashMap<String,
ExpiringCachedDocCollection> {
+ final AtomicLong puts = new AtomicLong();
+ final AtomicLong hits = new AtomicLong();
+ final Lock evictLock = new ReentrantLock(true);
+ public volatile long timeToLiveMs = 60 * 1000L;
- // not on the top-level header, walk the shard route tree
- Map<String, Integer> shardRf = getShardReplicationFactor(collection, resp);
- for (Integer rf : shardRf.values()) {
- if (achRf == null || rf < achRf) {
- achRf = rf;
+ @Override
+ public ExpiringCachedDocCollection get(Object key) {
+ ExpiringCachedDocCollection val = super.get(key);
+ if (val == null) {
+ // a new collection is likely to be added now.
+ // check if there are stale items and remove them
+ evictStale();
+ return null;
+ }
+ if (val.isExpired(timeToLiveMs)) {
+ super.remove(key);
+ return null;
}
+ hits.incrementAndGet();
+ return val;
}
- return (achRf != null) ? achRf.intValue() : -1;
- }
- /**
- * Walks the NamedList response after performing an update request looking
for the replication
- * factor that was achieved in each shard involved in the request. For
single doc updates, there
- * will be only one shard in the return value.
- */
- public Map<String, Integer> getShardReplicationFactor(String collection,
NamedList<?> resp) {
- Map<String, Integer> results = new HashMap<>();
- if (resp instanceof RouteResponse) {
- NamedList<NamedList<?>> routes = ((RouteResponse<?>)
resp).getRouteResponses();
- DocCollection coll = getDocCollection(collection, null);
- Map<String, String> leaders = new HashMap<>();
- for (Slice slice : coll.getActiveSlices()) {
- Replica leader = slice.getLeader();
- if (leader != null) {
- String leaderUrl = leader.getBaseUrl() + "/" + leader.getCoreName();
- leaders.put(leaderUrl, slice.getName());
- String altLeaderUrl = leader.getBaseUrl() + "/" + collection;
- leaders.put(altLeaderUrl, slice.getName());
- }
- }
+ ExpiringCachedDocCollection peek(Object key) {
+ return super.get(key);
+ }
- Iterator<Map.Entry<String, NamedList<?>>> routeIter = routes.iterator();
- while (routeIter.hasNext()) {
- Map.Entry<String, NamedList<?>> next = routeIter.next();
- String host = next.getKey();
- NamedList<?> hostResp = next.getValue();
- Integer rf =
- (Integer) ((NamedList<?>)
hostResp.get("responseHeader")).get(UpdateRequest.REPFACT);
- if (rf != null) {
- String shard = leaders.get(host);
- if (shard == null) {
- if (host.endsWith("/")) shard = leaders.get(host.substring(0,
host.length() - 1));
- if (shard == null) {
- shard = host;
- }
+ @Override
+ public ExpiringCachedDocCollection put(String key,
ExpiringCachedDocCollection value) {
+ puts.incrementAndGet();
+ return super.put(key, value);
+ }
+
+ void evictStale() {
+ if (!evictLock.tryLock()) return;
+ try {
+ for (Entry<String, ExpiringCachedDocCollection> e : entrySet()) {
+ if (e.getValue().isExpired(timeToLiveMs)) {
+ super.remove(e.getKey());
}
- results.put(shard, rf);
}
+ } finally {
+ evictLock.unlock();
}
}
- return results;
}
- /**
- * Determines whether an UpdateRequest contains sufficient routing
information to identify shard
- * leaders for direct updates when directUpdatesToLeadersOnly is enabled.
- */
- private static boolean hasInfoToFindLeaders(UpdateRequest updateRequest,
String idField) {
- final Map<SolrInputDocument, Map<String, Object>> documents =
updateRequest.getDocumentsMap();
- final Map<String, Map<String, Object>> deleteById =
updateRequest.getDeleteByIdMap();
+ @SuppressWarnings({"rawtypes"})
+ public static class RouteResponse<T extends LBSolrClient.Req> extends
NamedList<Object> {
+ private NamedList<NamedList<?>> routeResponses;
+ private Map<String, T> routes;
- final boolean hasNoDocuments = (documents == null || documents.isEmpty());
- final boolean hasNoDeleteById = (deleteById == null ||
deleteById.isEmpty());
- if (hasNoDocuments && hasNoDeleteById) {
- // no documents and no delete-by-id, so no info to find leader(s)
- return false;
+ public void setRouteResponses(NamedList<NamedList<?>> routeResponses) {
+ this.routeResponses = routeResponses;
}
- if (documents != null) {
- for (final Map.Entry<SolrInputDocument, Map<String, Object>> entry :
documents.entrySet()) {
- final SolrInputDocument doc = entry.getKey();
- final Object fieldValue = doc.getFieldValue(idField);
- if (fieldValue == null) {
- // a document with no id field value, so can't find leader for it
- return false;
- }
- }
+ public NamedList<NamedList<?>> getRouteResponses() {
+ return routeResponses;
}
- if (deleteById != null) {
- for (final Map.Entry<String, Map<String, Object>> entry :
deleteById.entrySet()) {
- final Map<String, Object> params = entry.getValue();
- if (params == null || params.get(ShardParams._ROUTE_) == null) {
- // deleteById entry lacks explicit route parameter, can't find
leader for it
- return false;
+ public void setRoutes(Map<String, T> routes) {
+ this.routes = routes;
+ }
+
+ public Map<String, T> getRoutes() {
+ return routes;
+ }
+ }
+
+ public static class RouteException extends SolrException {
+
+ private NamedList<Throwable> throwables;
+ private Map<String, ? extends LBSolrClient.Req> routes;
+
+ public RouteException(
+ ErrorCode errorCode,
+ NamedList<Throwable> throwables,
+ Map<String, ? extends LBSolrClient.Req> routes) {
+ super(errorCode, throwables.getVal(0).getMessage(),
throwables.getVal(0));
+ this.throwables = throwables;
+ this.routes = routes;
+
+ // create a merged copy of the metadata from all wrapped exceptions
+ NamedList<String> metadata = new NamedList<String>();
+ for (int i = 0; i < throwables.size(); i++) {
+ Throwable t = throwables.getVal(i);
+ if (t instanceof SolrException e) {
+ NamedList<String> eMeta = e.getMetadata();
+ if (null != eMeta) {
+ metadata.addAll(eMeta);
+ }
}
}
+ if (0 < metadata.size()) {
+ this.setMetadata(metadata);
+ }
}
- return true;
+ public NamedList<Throwable> getThrowables() {
+ return throwables;
+ }
+
+ public Map<String, ? extends LBSolrClient.Req> getRoutes() {
+ return this.routes;
+ }
}
/** Universal connection string parser logic. */
@@ -1887,4 +1833,55 @@ public abstract class CloudSolrClient extends SolrClient
{
return String.join(",", quorumItems) + (isZookeeper && zkChroot != null
? zkChroot : "");
}
}
+
+ class ExpiringCachedDocCollection {
+ final DocCollection cached;
+ final long cachedAtNano;
+ // This is the time at which the collection is retried and got the same
old version
+ volatile long retriedAtNano = -1;
+ // flag that suggests that this is potentially to be rechecked
+ volatile boolean maybeStale = false;
+
+ ExpiringCachedDocCollection(DocCollection cached) {
+ this.cached = cached;
+ this.cachedAtNano = System.nanoTime();
+ }
+
+ boolean isExpired(long timeToLiveMs) {
+ return (System.nanoTime() - cachedAtNano)
+ > TimeUnit.NANOSECONDS.convert(timeToLiveMs, TimeUnit.MILLISECONDS);
+ }
+
+ boolean shouldRetry() {
+ if (maybeStale) { // we are not sure if it is stale so check with retry
time
+ if ((retriedAtNano == -1 || (System.nanoTime() - retriedAtNano) >
retryExpiryTimeNano)) {
+ return true; // we retried a while back. and we could not get
anything new.
+ // it's likely that it is not going to be available now also.
+ }
+ }
+ return false;
+ }
+
+ void setRetriedAt() {
+ retriedAtNano = System.nanoTime();
+ }
+
+ /**
+ * Marks this entry as {@code maybeStale} if the provided backoff window
has elapsed since the
+ * last retry.
+ *
+ * @return {@code true} if the entry was flagged as maybe stale
+ */
+ boolean markMaybeStaleIfOutsideBackoff(long retryBackoffNano) {
+ if (maybeStale) {
+ return true;
+ }
+ long lastRetry = retriedAtNano;
+ if (lastRetry != -1 && (System.nanoTime() - lastRetry) <=
retryBackoffNano) {
+ return false;
+ }
+ maybeStale = true;
+ return true;
+ }
+ }
}