This is an automated email from the ASF dual-hosted git repository.

dsmiley pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 86bc6f29224 SOLR-14070: Deprecate CloudSolrClient ZkHost constructor 
(#4533)
86bc6f29224 is described below

commit 86bc6f292245e0566511a9d9c3fb8aaba933e564
Author: David Smiley <[email protected]>
AuthorDate: Thu Jun 18 10:44:17 2026 -0400

    SOLR-14070: Deprecate CloudSolrClient ZkHost constructor (#4533)
    
    -- again; was accidentally un-deprecated for 10.0.  Oops.
---
 .../solr/client/solrj/impl/CloudSolrClient.java    | 2935 ++++++++++----------
 1 file changed, 1466 insertions(+), 1469 deletions(-)

diff --git 
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java 
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 69e5c8d5809..b9b52442297 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -92,25 +92,12 @@ import org.slf4j.MDC;
  */
 public abstract class CloudSolrClient extends SolrClient {
 
+  public static final String STATE_VERSION = "_stateVer_";
+  static final int DEFAULT_STATE_REFRESH_PARALLELISM = 5;
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
   // no of times collection state to be reloaded if stale state error is 
received
   private static final int MAX_STALE_RETRIES =
       
Integer.parseInt(System.getProperty("solr.solrj.cloud.max.stale.retries", "5"));
-  static final int DEFAULT_STATE_REFRESH_PARALLELISM = 5;
-  private final Random rand = new Random();
-
-  private final boolean updatesToLeaders;
-  private final boolean directUpdatesToLeadersOnly;
-  private final RequestReplicaListTransformerGenerator requestRLTGenerator;
-  private final boolean parallelUpdates;
-  private final ExecutorService threadPool =
-      ExecutorUtil.newMDCAwareCachedThreadPool(
-          new SolrNamedThreadFactory("CloudSolrClient ThreadPool"));
-
-  public static final String STATE_VERSION = "_stateVer_";
-  protected long retryExpiryTimeNano =
-      TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS); // 3 seconds or 3 
million nanos
   private static final Set<String> NON_ROUTABLE_PARAMS =
       Set.of(
           UpdateParams.EXPUNGE_DELETES,
@@ -125,1713 +112,1672 @@ public abstract class CloudSolrClient extends 
SolrClient {
           // Not supported via SolrCloud
           // UpdateParams.ROLLBACK
           );
-
+  protected final StateCache collectionStateCache = new StateCache();
+  private final Random rand = new Random();
+  private final boolean updatesToLeaders;
+  private final boolean directUpdatesToLeadersOnly;
+  private final RequestReplicaListTransformerGenerator requestRLTGenerator;
+  private final boolean parallelUpdates;
+  private final ExecutorService threadPool =
+      ExecutorUtil.newMDCAwareCachedThreadPool(
+          new SolrNamedThreadFactory("CloudSolrClient ThreadPool"));
   private final ConcurrentHashMap<String, CompletableFuture<DocCollection>> 
collectionRefreshes =
       new ConcurrentHashMap<>();
   private final Semaphore stateRefreshSemaphore;
   private final int stateRefreshParallelism;
+  protected long retryExpiryTimeNano =
+      TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS); // 3 seconds or 3 
million nanos
   private volatile boolean closed;
 
-  /**
-   * Constructs {@link CloudSolrClient} instances from provided configuration. 
It will use a Jetty
-   * based {@code HttpClient} if available, or will otherwise use the JDK.
-   */
-  public static class Builder {
+  protected CloudSolrClient(
+      boolean updatesToLeaders, boolean parallelUpdates, boolean 
directUpdatesToLeadersOnly) {
+    this(
+        updatesToLeaders,
+        parallelUpdates,
+        directUpdatesToLeadersOnly,
+        DEFAULT_STATE_REFRESH_PARALLELISM);
+  }
 
-    protected Collection<String> zkHosts = new ArrayList<>();
-    protected List<String> solrUrls = new ArrayList<>();
-    protected String zkChroot;
-    protected HttpSolrClient httpClient;
-    protected boolean shardLeadersOnly = true;
-    protected boolean directUpdatesToLeadersOnly = false;
-    protected boolean parallelUpdates = true;
-    protected ClusterStateProvider stateProvider;
-    protected HttpSolrClient.BuilderBase<?, ?> internalClientBuilder;
-    protected RequestWriter requestWriter;
-    protected ResponseParser responseParser;
-    protected long retryExpiryTimeNano =
-        TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS); // 3 seconds or 3 
million nanos
+  protected CloudSolrClient(
+      boolean updatesToLeaders,
+      boolean parallelUpdates,
+      boolean directUpdatesToLeadersOnly,
+      int stateRefreshThreads) {
+    this.updatesToLeaders = updatesToLeaders;
+    this.parallelUpdates = parallelUpdates;
+    this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
+    this.requestRLTGenerator = new RequestReplicaListTransformerGenerator();
+    this.stateRefreshParallelism = Math.max(1, stateRefreshThreads);
+    this.stateRefreshSemaphore = new Semaphore(this.stateRefreshParallelism);
+  }
 
-    protected String defaultCollection;
-    protected long timeToLiveSeconds = 60;
-    protected int parallelCacheRefreshesLocks = 
DEFAULT_STATE_REFRESH_PARALLELISM;
-    protected int zkConnectTimeout = 
SolrZkClientTimeout.DEFAULT_ZK_CONNECT_TIMEOUT;
-    protected int zkClientTimeout = 
SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT;
-    protected boolean canUseZkACLs = true;
+  /**
+   * Determines whether an UpdateRequest contains sufficient routing 
information to identify shard
+   * leaders for direct updates when directUpdatesToLeadersOnly is enabled.
+   */
+  private static boolean hasInfoToFindLeaders(UpdateRequest updateRequest, 
String idField) {
+    final Map<SolrInputDocument, Map<String, Object>> documents = 
updateRequest.getDocumentsMap();
+    final Map<String, Map<String, Object>> deleteById = 
updateRequest.getDeleteByIdMap();
 
-    /**
-     * Provide a series of Solr URLs to be used when configuring {@link 
CloudSolrClient} instances.
-     * The solr client will use these urls to understand the cluster topology, 
which solr nodes are
-     * active etc.
-     *
-     * <p>Provided Solr URLs are expected to point to the root Solr path
-     * ("http://hostname:8983/solr";); they should not include any collections, 
cores, or other path
-     * components.
-     *
-     * <p>Usage example:
-     *
-     * <pre>
-     *   final List&lt;String&gt; solrBaseUrls = new ArrayList&lt;String&gt;();
-     *   solrBaseUrls.add("http://solr1:8983/solr";); 
solrBaseUrls.add("http://solr2:8983/solr";); 
solrBaseUrls.add("http://solr3:8983/solr";);
-     *   final SolrClient client = new 
CloudSolrClient.Builder(solrBaseUrls).build();
-     * </pre>
-     */
-    public Builder(List<String> solrUrls) {
-      this.solrUrls = solrUrls;
+    final boolean hasNoDocuments = (documents == null || documents.isEmpty());
+    final boolean hasNoDeleteById = (deleteById == null || 
deleteById.isEmpty());
+    if (hasNoDocuments && hasNoDeleteById) {
+      // no documents and no delete-by-id, so no info to find leader(s)
+      return false;
     }
 
-    /**
-     * Provide a series of ZK hosts which will be used when configuring {@link 
CloudSolrClient}
-     * instances.
-     *
-     * <p>Usage example when Solr stores data at the ZooKeeper root ('/'):
-     *
-     * <pre>
-     *   final List&lt;String&gt; zkServers = new ArrayList&lt;String&gt;();
-     *   zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181"); 
zkServers.add("zookeeper3:2181");
-     *   final SolrClient client = new CloudSolrClient.Builder(zkServers, 
Optional.empty()).build();
-     * </pre>
-     *
-     * Usage example when Solr data is stored in a ZooKeeper chroot:
-     *
-     * <pre>
-     *    final List&lt;String&gt; zkServers = new ArrayList&lt;String&gt;();
-     *    zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181"); 
zkServers.add("zookeeper3:2181");
-     *    final SolrClient client = new CloudSolrClient.Builder(zkServers, 
Optional.of("/solr")).build();
-     *  </pre>
-     *
-     * @param zkHosts a List of at least one ZooKeeper host and port (e.g. 
"zookeeper1:2181")
-     * @param zkChroot the path to the root ZooKeeper node containing Solr 
data. Provide {@code
-     *     java.util.Optional.empty()} if no ZK chroot is used.
-     */
-    public Builder(List<String> zkHosts, Optional<String> zkChroot) {
-      this.zkHosts = zkHosts;
-      if (zkChroot.isPresent()) this.zkChroot = zkChroot.get();
+    if (documents != null) {
+      for (final Map.Entry<SolrInputDocument, Map<String, Object>> entry : 
documents.entrySet()) {
+        final SolrInputDocument doc = entry.getKey();
+        final Object fieldValue = doc.getFieldValue(idField);
+        if (fieldValue == null) {
+          // a document with no id field value, so can't find leader for it
+          return false;
+        }
+      }
     }
 
-    /** for an expert use-case */
-    public Builder(ClusterStateProvider stateProvider) {
-      this.stateProvider = stateProvider;
+    if (deleteById != null) {
+      for (final Map.Entry<String, Map<String, Object>> entry : 
deleteById.entrySet()) {
+        final Map<String, Object> params = entry.getValue();
+        if (params == null || params.get(ShardParams._ROUTE_) == null) {
+          // deleteById entry lacks explicit route parameter, can't find 
leader for it
+          return false;
+        }
+      }
     }
 
-    /**
-     * Creates a client builder based on a connection string of 2 possible 
formats:
-     *
-     * <ul>
-     *   <li>ZooKeeper connection string (optionally with chroot), e.g. {@code
-     *       zk1:2181,zk2:2181,zk3:2181/solr}
-     *   <li>Comma-separated list of Solr node base URLs (HTTP or HTTPS), e.g. 
{@code
-     *       http://solr1:8983/solr,http://solr2:8983/solr}
-     * </ul>
-     *
-     * @param connectionString a string specifying either ZooKeeper connection 
string or HTTP(S)
-     *     Solr URLs
-     * @throws IllegalArgumentException if string is null, empty, or malformed
-     */
-    public Builder(String connectionString) {
-      this(CloudSolrClientConnection.parse(connectionString));
-    }
+    return true;
+  }
 
-    /**
-     * Creates a client builder from a {@link CloudSolrClientConnection}.
-     *
-     * @param connection instance of {@link CloudSolrClientConnection}, which 
can be obtained from
-     *     the solr connection string or created via the constructor
-     */
-    public Builder(CloudSolrClientConnection connection) {
-      if (connection.isZookeeper()) {
-        this.zkHosts = connection.quorumItems();
-        this.zkChroot = connection.zkChroot();
-      } else {
-        this.solrUrls = connection.quorumItems();
-      }
-    }
+  protected abstract LBSolrClient getLbClient();
 
-    /** Whether to use the default ZK ACLs when building a ZK Client. */
-    public Builder canUseZkACLs(boolean canUseZkACLs) {
-      this.canUseZkACLs = canUseZkACLs;
-      return this;
-    }
+  public abstract ClusterStateProvider getClusterStateProvider();
 
-    /**
-     * Tells {@link Builder} that created clients should be configured such 
that {@link
-     * CloudSolrClient#isUpdatesToLeaders} returns <code>true</code>.
-     *
-     * @see #sendUpdatesToAnyReplica
-     * @see CloudSolrClient#isUpdatesToLeaders
-     */
-    public Builder sendUpdatesOnlyToShardLeaders() {
-      shardLeadersOnly = true;
-      return this;
-    }
+  /**
+   * @deprecated problematic as a 'get' method, since one implementation will 
do a remote request
+   *     each time this is called, potentially return lots of data that isn't 
even needed.
+   */
+  @Deprecated
+  public ClusterState getClusterState() {
+    // The future of "ClusterState" isn't clear.  Could make it more of a 
cache instead of a
+    // snapshot, so we un-deprecate. Or we avoid it and maybe make the 
ClusterStateProvider as that
+    // cache.  SOLR-17604 is related.
+    return getClusterStateProvider().getClusterState();
+  }
 
-    /**
-     * Tells {@link Builder} that created clients should be configured such 
that {@link
-     * CloudSolrClient#isUpdatesToLeaders} returns <code>false</code>.
-     *
-     * @see #sendUpdatesOnlyToShardLeaders
-     * @see CloudSolrClient#isUpdatesToLeaders
-     */
-    public Builder sendUpdatesToAnyReplica() {
-      shardLeadersOnly = false;
-      return this;
-    }
+  /** Is this a communication error? We will retry if so. */
+  protected boolean wasCommError(Throwable t) {
+    return t instanceof SocketException || t instanceof UnknownHostException;
+  }
 
-    /**
-     * Tells {@link CloudSolrClient.Builder} that created clients should send 
direct updates to
-     * shard leaders only.
-     *
-     * <p>UpdateRequests whose leaders cannot be found will "fail fast" on the 
client side with a
-     * {@link SolrException}
-     *
-     * @see #sendDirectUpdatesToAnyShardReplica
-     * @see CloudSolrClient#isDirectUpdatesToLeadersOnly
-     */
-    public Builder sendDirectUpdatesToShardLeadersOnly() {
-      directUpdatesToLeadersOnly = true;
-      return this;
+  @Override
+  public void close() {
+    closed = true;
+    collectionRefreshes.clear();
+    if (!ExecutorUtil.isShutdown(this.threadPool)) {
+      ExecutorUtil.shutdownAndAwaitTermination(this.threadPool);
     }
+  }
 
-    /**
-     * Tells {@link CloudSolrClient.Builder} that created clients can send 
updates to any shard
-     * replica (shard leaders and non-leaders).
-     *
-     * <p>Shard leaders are still preferred, but the created clients will fall 
back to using other
-     * replicas if a leader cannot be found.
-     *
-     * @see #sendDirectUpdatesToShardLeadersOnly
-     * @see CloudSolrClient#isDirectUpdatesToLeadersOnly
-     */
-    public Builder sendDirectUpdatesToAnyShardReplica() {
-      directUpdatesToLeadersOnly = false;
-      return this;
-    }
-
-    /** Provides a {@link RequestWriter} for created clients to use when 
handing requests. */
-    public Builder withRequestWriter(RequestWriter requestWriter) {
-      this.requestWriter = requestWriter;
-      return this;
-    }
-
-    /** Provides a {@link ResponseParser} for created clients to use when 
handling requests. */
-    public Builder withResponseParser(ResponseParser responseParser) {
-      this.responseParser = responseParser;
-      return this;
-    }
-
-    /**
-     * Tells {@link CloudSolrClient.Builder} whether created clients should 
send shard updates
-     * serially or in parallel
-     *
-     * <p>When an {@link UpdateRequest} affects multiple shards, {@link 
CloudSolrClient} splits it
-     * up and sends a request to each affected shard. This setting chooses 
whether those
-     * sub-requests are sent serially or in parallel.
-     *
-     * <p>If not set, this defaults to 'true' and sends sub-requests in 
parallel.
-     */
-    public Builder withParallelUpdates(boolean parallelUpdates) {
-      this.parallelUpdates = parallelUpdates;
-      return this;
-    }
+  public ResponseParser getParser() {
+    return getLbClient().getParser();
+  }
 
-    /**
-     * Configures how many collection state refresh operations may run in 
parallel using a dedicated
-     * thread pool. This controls the maximum number of concurrent 
ZooKeeper/cluster state lookups.
-     *
-     * <p>Defaults to 5.
-     */
-    public Builder withParallelCacheRefreshes(int parallelCacheRefreshesLocks) 
{
-      this.parallelCacheRefreshesLocks = parallelCacheRefreshesLocks;
-      return this;
-    }
+  public RequestWriter getRequestWriter() {
+    return getLbClient().getRequestWriter();
+  }
 
-    /**
-     * This is the time to wait to re-fetch the state after getting the same 
state version from ZK
-     */
-    public Builder withRetryExpiryTime(long expiryTime, TimeUnit unit) {
-      this.retryExpiryTimeNano = TimeUnit.NANOSECONDS.convert(expiryTime, 
unit);
-      return this;
-    }
+  /** Gets whether direct updates are sent in parallel */
+  public boolean isParallelUpdates() {
+    return parallelUpdates;
+  }
 
-    /** Sets the default collection for request. */
-    public Builder withDefaultCollection(String defaultCollection) {
-      this.defaultCollection = defaultCollection;
-      return this;
-    }
+  /**
+   * Connect to the zookeeper ensemble. This is an optional method that may be 
used to force a
+   * connection before any other requests are sent.
+   *
+   * @deprecated Call {@link ClusterStateProvider#getLiveNodes()} instead.
+   */
+  @Deprecated
+  public void connect() {
+    getClusterStateProvider().connect();
+  }
 
-    /**
-     * Sets the cache ttl for DocCollection Objects cached.
-     *
-     * @param timeToLive ttl value
-     */
-    public Builder withCollectionCacheTtl(long timeToLive, TimeUnit unit) {
-      assert timeToLive > 0;
-      this.timeToLiveSeconds = TimeUnit.SECONDS.convert(timeToLive, unit);
-      return this;
+  /**
+   * Connect to a cluster. If the cluster is not ready, retry connection up to 
a given timeout.
+   *
+   * @param duration the timeout
+   * @param timeUnit the units of the timeout
+   * @throws TimeoutException if the cluster is not ready after the timeout
+   * @throws InterruptedException if the wait is interrupted
+   */
+  @Deprecated
+  public void connect(long duration, TimeUnit timeUnit)
+      throws TimeoutException, InterruptedException {
+    if (log.isInfoEnabled()) {
+      log.info(
+          "Waiting for {} {} for cluster at {} to be ready",
+          duration,
+          timeUnit,
+          getClusterStateProvider());
     }
-
-    /**
-     * Set the internal Solr HTTP client.
-     *
-     * <p>Note: closing the client instance is the responsibility of the 
caller.
-     *
-     * @return this
-     */
-    public Builder withHttpClient(HttpSolrClient httpSolrClient) {
-      if (this.internalClientBuilder != null) {
-        throw new IllegalStateException(
-            "The builder can't accept an httpClient AND an 
internalClientBuilder, only one of those can be provided");
+    long timeout = System.nanoTime() + timeUnit.toNanos(duration);
+    while (System.nanoTime() < timeout) {
+      try {
+        connect();
+        if (log.isInfoEnabled()) {
+          log.info("Cluster at {} ready", getClusterStateProvider());
+        }
+        return;
+      } catch (RuntimeException e) {
+        // not ready yet, then...
       }
-      this.httpClient = httpSolrClient;
-      return this;
+      TimeUnit.MILLISECONDS.sleep(250);
     }
+    throw new TimeoutException("Timed out waiting for cluster");
+  }
 
-    /**
-     * If provided, the CloudSolrClient will build it's internal client using 
this builder (instead
-     * of the empty default one). Providing this builder allows users to 
configure the internal
-     * clients (authentication, timeouts, etc.).
-     *
-     * @param internalClientBuilder the builder to use for creating the 
internal http client.
-     * @return this
-     */
-    public Builder withHttpClientBuilder(HttpSolrClient.BuilderBase<?, ?> 
internalClientBuilder) {
-      if (this.httpClient != null) {
-        throw new IllegalStateException(
-            "The builder can't accept an httpClient AND an 
internalClientBuilder, only one of those can be provided");
-      }
-      this.internalClientBuilder = internalClientBuilder;
-      return this;
-    }
+  @SuppressWarnings({"unchecked"})
+  private NamedList<Object> directUpdate(UpdateRequest request, String 
collection)
+      throws SolrServerException {
+    SolrParams params = request.getParams();
+    ModifiableSolrParams routableParams = new ModifiableSolrParams();
+    ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
 
-    @Deprecated(since = "9.10")
-    public Builder withInternalClientBuilder(
-        HttpSolrClient.BuilderBase<?, ?> internalClientBuilder) {
-      return withHttpClientBuilder(internalClientBuilder);
+    if (params != null) {
+      nonRoutableParams.add(params);
+      routableParams.add(params);
+      for (String param : NON_ROUTABLE_PARAMS) {
+        routableParams.remove(param);
+      }
+    } else {
+      params = new ModifiableSolrParams();
     }
 
-    /**
-     * Sets the Zk connection timeout
-     *
-     * @param zkConnectTimeout timeout value
-     * @param unit time unit
-     */
-    public Builder withZkConnectTimeout(int zkConnectTimeout, TimeUnit unit) {
-      this.zkConnectTimeout = Math.toIntExact(unit.toMillis(zkConnectTimeout));
-      return this;
+    if (collection == null) {
+      throw new SolrServerException(
+          "No collection param specified on request and no default collection 
has been set.");
     }
 
-    /**
-     * Sets the Zk client session timeout
-     *
-     * @param zkClientTimeout timeout value
-     * @param unit time unit
-     */
-    public Builder withZkClientTimeout(int zkClientTimeout, TimeUnit unit) {
-      this.zkClientTimeout = Math.toIntExact(unit.toMillis(zkClientTimeout));
-      return this;
+    // Check to see if the collection is an alias. Updates to multi-collection 
aliases are ok as
+    // long as they are routed aliases
+    List<String> aliasedCollections = new 
ArrayList<>(resolveAliases(List.of(collection)));
+    if (aliasedCollections.size() == 1 || 
getClusterStateProvider().isRoutedAlias(collection)) {
+      collection = aliasedCollections.get(0); // pick 1st (consistent with 
HttpSolrCall behavior)
+    } else {
+      throw new SolrException(
+          SolrException.ErrorCode.BAD_REQUEST,
+          "Update request to non-routed multi-collection alias not supported: "
+              + collection
+              + " -> "
+              + aliasedCollections);
     }
 
-    /** Create a {@link CloudSolrClient} based on the provided configuration. 
*/
-    public CloudHttp2SolrClient build() {
-      int providedOptions = 0;
-      if (!zkHosts.isEmpty()) providedOptions++;
-      if (!solrUrls.isEmpty()) providedOptions++;
-      if (stateProvider != null) providedOptions++;
+    DocCollection col = getDocCollection(collection, null);
 
-      if (providedOptions > 1) {
-        throw new IllegalArgumentException(
-            "Only one of zkHost(s), solrUrl(s), or stateProvider should be 
specified.");
-      } else if (providedOptions == 0) {
-        throw new IllegalArgumentException(
-            "One of zkHosts, solrUrls, or stateProvider must be specified.");
-      }
+    DocRouter router = col.getRouter();
 
-      return new CloudHttp2SolrClient(this);
+    if (router instanceof ImplicitDocRouter) {
+      // short circuit as optimization
+      return null;
     }
 
-    protected HttpSolrClient createOrGetHttpClient() {
-      if (httpClient != null) {
-        return httpClient;
-      } else if (internalClientBuilder != null) {
-        return internalClientBuilder.build();
+    ReplicaListTransformer replicaListTransformer =
+        requestRLTGenerator.getReplicaListTransformer(params);
+
+    // Create the URL map, which is keyed on slice name.
+    // The value is a list of URLs for each replica in the slice.
+    // The first value in the list is the leader for the slice.
+    final Map<String, List<String>> urlMap = buildUrlMap(col, 
replicaListTransformer);
+    String routeField =
+        (col.getRouter().getRouteField(col) == null) ? ID : 
col.getRouter().getRouteField(col);
+    final Map<String, ? extends LBSolrClient.Req> routes =
+        createRoutes(request, routableParams, col, router, urlMap, routeField);
+    if (routes == null) {
+      if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(request, 
routeField)) {
+        // we have info (documents with ids and/or ids to delete) with
+        // which to find the leaders, but we could not find (all of) them
+        throw new SolrException(
+            SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+            "directUpdatesToLeadersOnly==true but could not find leader(s)");
       } else {
-        return HttpSolrClient.builder(null).build();
+        // we could not find a leader or routes yet - use unoptimized general 
path
+        log.warn(
+            "No routing info found for update to collection '{}', broadcasting 
to all shards.",
+            collection);
+        return null;
       }
     }
 
-    protected LBSolrClient createOrGetLbClient(HttpSolrClient myClient) {
-      return myClient.createLBSolrClient();
-    }
+    final NamedList<Throwable> exceptions = new NamedList<>();
+    final NamedList<NamedList<?>> shardResponses =
+        new NamedList<>(routes.size() + 1); // +1 for deleteQuery
 
-    protected ClusterStateProvider createZkClusterStateProvider() {
-      ClusterStateProvider stateProvider =
-          ClusterStateProvider.newZkClusterStateProvider(zkHosts, zkChroot, 
canUseZkACLs);
-      if (stateProvider instanceof 
SolrZkClientTimeout.SolrZkClientTimeoutAware timeoutAware) {
-        timeoutAware.setZkClientTimeout(zkClientTimeout);
-        timeoutAware.setZkConnectTimeout(zkConnectTimeout);
-      }
-      return stateProvider;
-    }
+    long start = System.nanoTime();
 
-    protected ClusterStateProvider 
createHttpClusterStateProvider(HttpSolrClient httpClient) {
-      try {
-        return new HttpClusterStateProvider<>(solrUrls, httpClient);
-      } catch (Exception e) {
-        throw new RuntimeException(
-            "Couldn't initialize a HttpClusterStateProvider (is/are the "
-                + "Solr server(s), "
-                + solrUrls
-                + ", down?)",
-            e);
+    if (parallelUpdates) {
+      final Map<String, Future<NamedList<?>>> responseFutures =
+          CollectionUtil.newHashMap(routes.size());
+      for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
+        final String url = entry.getKey();
+        final LBSolrClient.Req lbRequest = entry.getValue();
+        try {
+          MDC.put("CloudSolrClient.url", url);
+          responseFutures.put(
+              url,
+              threadPool.submit(
+                  () -> {
+                    return getLbClient().request(lbRequest).getResponse();
+                  }));
+        } finally {
+          MDC.remove("CloudSolrClient.url");
+        }
       }
-    }
-  }
 
-  protected static class StateCache extends ConcurrentHashMap<String, 
ExpiringCachedDocCollection> {
-    final AtomicLong puts = new AtomicLong();
-    final AtomicLong hits = new AtomicLong();
-    final Lock evictLock = new ReentrantLock(true);
-    public volatile long timeToLiveMs = 60 * 1000L;
+      for (final Map.Entry<String, Future<NamedList<?>>> entry : 
responseFutures.entrySet()) {
+        final String url = entry.getKey();
+        final Future<NamedList<?>> responseFuture = entry.getValue();
+        try {
+          shardResponses.add(url, responseFuture.get());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        } catch (ExecutionException e) {
+          exceptions.add(url, e.getCause());
+        }
+      }
 
-    @Override
-    public ExpiringCachedDocCollection get(Object key) {
-      ExpiringCachedDocCollection val = super.get(key);
-      if (val == null) {
-        // a new collection is likely to be added now.
-        // check if there are stale items and remove them
-        evictStale();
-        return null;
+      if (exceptions.size() > 0) {
+        Throwable firstException = exceptions.getVal(0);
+        if (firstException instanceof SolrException e) {
+          throw getRouteException(
+              SolrException.ErrorCode.getErrorCode(e.code()), exceptions, 
routes);
+        } else {
+          throw getRouteException(SolrException.ErrorCode.SERVER_ERROR, 
exceptions, routes);
+        }
       }
-      if (val.isExpired(timeToLiveMs)) {
-        super.remove(key);
-        return null;
+    } else {
+      for (Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
+        String url = entry.getKey();
+        LBSolrClient.Req lbRequest = entry.getValue();
+        try {
+          NamedList<Object> rsp = 
getLbClient().request(lbRequest).getResponse();
+          shardResponses.add(url, rsp);
+        } catch (Exception e) {
+          if (e instanceof SolrException) {
+            throw (SolrException) e;
+          } else {
+            throw new SolrServerException(e);
+          }
+        }
       }
-      hits.incrementAndGet();
-      return val;
     }
 
-    ExpiringCachedDocCollection peek(Object key) {
-      return super.get(key);
+    UpdateRequest nonRoutableRequest = null;
+    List<String> deleteQuery = request.getDeleteQuery();
+    if (deleteQuery != null && deleteQuery.size() > 0) {
+      UpdateRequest deleteQueryRequest = new UpdateRequest();
+      deleteQueryRequest.setDeleteQuery(deleteQuery);
+      nonRoutableRequest = deleteQueryRequest;
     }
 
-    @Override
-    public ExpiringCachedDocCollection put(String key, 
ExpiringCachedDocCollection value) {
-      puts.incrementAndGet();
-      return super.put(key, value);
-    }
+    Set<String> paramNames = nonRoutableParams.getParameterNames();
 
-    void evictStale() {
-      if (!evictLock.tryLock()) return;
+    Set<String> intersection = new HashSet<>(paramNames);
+    intersection.retainAll(NON_ROUTABLE_PARAMS);
+
+    if (nonRoutableRequest != null || intersection.size() > 0) {
+      if (nonRoutableRequest == null) {
+        nonRoutableRequest = new UpdateRequest();
+      }
+      nonRoutableRequest.setParams(nonRoutableParams);
+      nonRoutableRequest.setBasicAuthCredentials(
+          request.getBasicAuthUser(), request.getBasicAuthPassword());
+      final var endpoints =
+          routes.keySet().stream()
+              .map(url -> LBSolrClient.Endpoint.from(url))
+              .collect(Collectors.toList());
+      Collections.shuffle(endpoints, rand);
+      LBSolrClient.Req req = new LBSolrClient.Req(nonRoutableRequest, 
endpoints);
       try {
-        for (Entry<String, ExpiringCachedDocCollection> e : entrySet()) {
-          if (e.getValue().isExpired(timeToLiveMs)) {
-            super.remove(e.getKey());
-          }
-        }
-      } finally {
-        evictLock.unlock();
+        LBSolrClient.Rsp rsp = getLbClient().request(req);
+        shardResponses.add(endpoints.get(0).toString(), rsp.getResponse());
+      } catch (Exception e) {
+        throw new SolrException(
+            SolrException.ErrorCode.SERVER_ERROR, endpoints.get(0).toString(), 
e);
       }
     }
-  }
 
-  protected final StateCache collectionStateCache = new StateCache();
+    long end = System.nanoTime();
 
-  class ExpiringCachedDocCollection {
-    final DocCollection cached;
-    final long cachedAtNano;
-    // This is the time at which the collection is retried and got the same 
old version
-    volatile long retriedAtNano = -1;
-    // flag that suggests that this is potentially to be rechecked
-    volatile boolean maybeStale = false;
+    @SuppressWarnings({"rawtypes"})
+    RouteResponse rr =
+        condenseResponse(
+            shardResponses, (int) TimeUnit.MILLISECONDS.convert(end - start, 
TimeUnit.NANOSECONDS));
+    rr.setRouteResponses(shardResponses);
+    rr.setRoutes(routes);
+    return rr;
+  }
 
-    ExpiringCachedDocCollection(DocCollection cached) {
-      this.cached = cached;
-      this.cachedAtNano = System.nanoTime();
-    }
+  protected RouteException getRouteException(
+      SolrException.ErrorCode serverError,
+      NamedList<Throwable> exceptions,
+      Map<String, ? extends LBSolrClient.Req> routes) {
+    return new RouteException(serverError, exceptions, routes);
+  }
 
-    boolean isExpired(long timeToLiveMs) {
-      return (System.nanoTime() - cachedAtNano)
-          > TimeUnit.NANOSECONDS.convert(timeToLiveMs, TimeUnit.MILLISECONDS);
-    }
+  protected Map<String, ? extends LBSolrClient.Req> createRoutes(
+      UpdateRequest updateRequest,
+      ModifiableSolrParams routableParams,
+      DocCollection col,
+      DocRouter router,
+      Map<String, List<String>> urlMap,
+      String routeField) {
+    return urlMap == null
+        ? null
+        : updateRequest.getRoutesToCollection(router, col, urlMap, 
routableParams, routeField);
+  }
 
-    boolean shouldRetry() {
-      if (maybeStale) { // we are not sure if it is stale so check with retry 
time
-        if ((retriedAtNano == -1 || (System.nanoTime() - retriedAtNano) > 
retryExpiryTimeNano)) {
-          return true; // we retried a while back. and we could not get 
anything new.
-          // it's likely that it is not going to be available now also.
+  private Map<String, List<String>> buildUrlMap(
+      DocCollection col, ReplicaListTransformer replicaListTransformer) {
+    Map<String, List<String>> urlMap = new HashMap<>();
+    Collection<Slice> slices = col.getActiveSlices();
+    Set<String> liveNodes = getClusterStateProvider().getLiveNodes();
+    for (Slice slice : slices) {
+      String name = slice.getName();
+      List<Replica> sortedReplicas = new ArrayList<>();
+      Replica leader = slice.getLeader();
+      if (directUpdatesToLeadersOnly && leader == null) {
+        for (Replica replica :
+            slice.getReplicas(
+                replica -> replica.isActive(liveNodes) && replica.getType() == 
Replica.Type.NRT)) {
+          leader = replica;
+          break;
         }
       }
-      return false;
-    }
-
-    void setRetriedAt() {
-      retriedAtNano = System.nanoTime();
-    }
-
-    /**
-     * Marks this entry as {@code maybeStale} if the provided backoff window 
has elapsed since the
-     * last retry.
-     *
-     * @return {@code true} if the entry was flagged as maybe stale
-     */
-    boolean markMaybeStaleIfOutsideBackoff(long retryBackoffNano) {
-      if (maybeStale) {
-        return true;
+      if (leader == null) {
+        if (directUpdatesToLeadersOnly) {
+          continue;
+        }
+        // take unoptimized general path - we cannot find a leader yet
+        return null;
       }
-      long lastRetry = retriedAtNano;
-      if (lastRetry != -1 && (System.nanoTime() - lastRetry) <= 
retryBackoffNano) {
-        return false;
+
+      if (!directUpdatesToLeadersOnly) {
+        for (Replica replica : slice.getReplicas()) {
+          if (!replica.equals(leader)) {
+            sortedReplicas.add(replica);
+          }
+        }
       }
-      maybeStale = true;
-      return true;
-    }
-  }
 
-  protected CloudSolrClient(
-      boolean updatesToLeaders, boolean parallelUpdates, boolean 
directUpdatesToLeadersOnly) {
-    this(
-        updatesToLeaders,
-        parallelUpdates,
-        directUpdatesToLeadersOnly,
-        DEFAULT_STATE_REFRESH_PARALLELISM);
-  }
+      // Sort the non-leader replicas according to the request parameters
+      replicaListTransformer.transform(sortedReplicas);
 
-  protected CloudSolrClient(
-      boolean updatesToLeaders,
-      boolean parallelUpdates,
-      boolean directUpdatesToLeadersOnly,
-      int stateRefreshThreads) {
-    this.updatesToLeaders = updatesToLeaders;
-    this.parallelUpdates = parallelUpdates;
-    this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
-    this.requestRLTGenerator = new RequestReplicaListTransformerGenerator();
-    this.stateRefreshParallelism = Math.max(1, stateRefreshThreads);
-    this.stateRefreshSemaphore = new Semaphore(this.stateRefreshParallelism);
+      // put the leaderUrl first.
+      sortedReplicas.add(0, leader);
+
+      urlMap.put(
+          name, 
sortedReplicas.stream().map(Replica::getCoreUrl).collect(Collectors.toList()));
+    }
+    return urlMap;
   }
 
-  protected abstract LBSolrClient getLbClient();
+  protected <T extends RouteResponse<?>> T condenseResponse(
+      NamedList<?> response, int timeMillis, Supplier<T> supplier) {
+    T condensed = supplier.get();
+    int status = 0;
+    Integer rf = null;
 
-  public abstract ClusterStateProvider getClusterStateProvider();
+    // TolerantUpdateProcessor
+    List<SimpleOrderedMap<String>> toleratedErrors = null;
+    int maxToleratedErrors = Integer.MAX_VALUE;
 
-  /**
-   * @deprecated problematic as a 'get' method, since one implementation will 
do a remote request
-   *     each time this is called, potentially return lots of data that isn't 
even needed.
-   */
-  @Deprecated
-  public ClusterState getClusterState() {
-    // The future of "ClusterState" isn't clear.  Could make it more of a 
cache instead of a
-    // snapshot, so we un-deprecate. Or we avoid it and maybe make the 
ClusterStateProvider as that
-    // cache.  SOLR-17604 is related.
-    return getClusterStateProvider().getClusterState();
-  }
+    // For "adds", "deletes", "deleteByQuery" etc.
+    Map<String, NamedList<Object>> versions = new HashMap<>();
 
-  /** Is this a communication error? We will retry if so. */
-  protected boolean wasCommError(Throwable t) {
-    return t instanceof SocketException || t instanceof UnknownHostException;
-  }
+    for (int i = 0; i < response.size(); i++) {
+      NamedList<?> shardResponse = (NamedList<?>) response.getVal(i);
+      NamedList<?> header = (NamedList<?>) shardResponse.get("responseHeader");
+      Integer shardStatus = (Integer) header.get("status");
+      int s = shardStatus.intValue();
+      if (s > 0) {
+        status = s;
+      }
+      Object rfObj = header.get(UpdateRequest.REPFACT);
+      if (rfObj != null && rfObj instanceof Integer routeRf) {
+        if (rf == null || routeRf < rf) rf = routeRf;
+      }
 
-  @Override
-  public void close() {
-    closed = true;
-    collectionRefreshes.clear();
-    if (!ExecutorUtil.isShutdown(this.threadPool)) {
-      ExecutorUtil.shutdownAndAwaitTermination(this.threadPool);
+      @SuppressWarnings("unchecked")
+      List<SimpleOrderedMap<String>> shardTolerantErrors =
+          (List<SimpleOrderedMap<String>>) header.get("errors");
+      if (null != shardTolerantErrors) {
+        Integer shardMaxToleratedErrors = (Integer) header.get("maxErrors");
+        assert null != shardMaxToleratedErrors
+            : "TolerantUpdateProcessor reported errors but not maxErrors";
+        // if we get into some weird state where the nodes disagree about the 
effective maxErrors,
+        // assume the min value seen to decide if we should fail.
+        maxToleratedErrors =
+            Math.min(
+                maxToleratedErrors,
+                
ToleratedUpdateError.getEffectiveMaxErrors(shardMaxToleratedErrors.intValue()));
+
+        if (null == toleratedErrors) {
+          toleratedErrors = new 
ArrayList<SimpleOrderedMap<String>>(shardTolerantErrors.size());
+        }
+        for (SimpleOrderedMap<String> err : shardTolerantErrors) {
+          toleratedErrors.add(err);
+        }
+      }
+      for (String updateType : Arrays.asList("adds", "deletes", 
"deleteByQuery")) {
+        Object obj = shardResponse.get(updateType);
+        if (obj instanceof NamedList<?> nl) {
+          NamedList<Object> versionsList =
+              versions.containsKey(updateType) ? versions.get(updateType) : 
new NamedList<>();
+          versionsList.addAll(nl);
+          versions.put(updateType, versionsList);
+        }
+      }
     }
-  }
 
-  public ResponseParser getParser() {
-    return getLbClient().getParser();
-  }
+    NamedList<Object> cheader = new NamedList<>();
+    cheader.add("status", status);
+    cheader.add("QTime", timeMillis);
+    if (rf != null) cheader.add(UpdateRequest.REPFACT, rf);
+    if (null != toleratedErrors) {
+      cheader.add("maxErrors", 
ToleratedUpdateError.getUserFriendlyMaxErrors(maxToleratedErrors));
+      cheader.add("errors", toleratedErrors);
+      if (maxToleratedErrors < toleratedErrors.size()) {
+        // cumulative errors are too high, we need to throw a client exception 
w/correct metadata
 
-  public RequestWriter getRequestWriter() {
-    return getLbClient().getRequestWriter();
+        // NOTE: it shouldn't be possible for 1 == toleratedErrors.size(), 
because if that were the
+        // case then at least one shard should have thrown a real error before 
this, so we don't
+        // worry about having a more "singular" exception msg for that 
situation
+        StringBuilder msgBuf =
+            new StringBuilder()
+                .append(toleratedErrors.size())
+                .append(" Async failures during distributed update: ");
+
+        NamedList<String> metadata = new NamedList<>();
+        for (SimpleOrderedMap<String> err : toleratedErrors) {
+          ToleratedUpdateError te = ToleratedUpdateError.parseMap(err);
+          metadata.add(te.getMetadataKey(), te.getMetadataValue());
+
+          msgBuf.append("\n").append(te.getMessage());
+        }
+
+        SolrException toThrow =
+            new SolrException(SolrException.ErrorCode.BAD_REQUEST, 
msgBuf.toString());
+        toThrow.setMetadata(metadata);
+        throw toThrow;
+      }
+    }
+    for (Map.Entry<String, NamedList<Object>> entry : versions.entrySet()) {
+      condensed.add(entry.getKey(), entry.getValue());
+    }
+    condensed.add("responseHeader", cheader);
+    return condensed;
   }
 
-  /** Gets whether direct updates are sent in parallel */
-  public boolean isParallelUpdates() {
-    return parallelUpdates;
+  @SuppressWarnings({"rawtypes"})
+  public RouteResponse condenseResponse(NamedList<?> response, int timeMillis) 
{
+    return condenseResponse(response, timeMillis, RouteResponse::new);
   }
 
-  /**
-   * Connect to the zookeeper ensemble. This is an optional method that may be 
used to force a
-   * connection before any other requests are sent.
-   *
-   * @deprecated Call {@link ClusterStateProvider#getLiveNodes()} instead.
-   */
-  @Deprecated
-  public void connect() {
-    getClusterStateProvider().connect();
+  @Override
+  public NamedList<Object> request(SolrRequest<?> request, String collection)
+      throws SolrServerException, IOException {
+    // the collection parameter of the request overrides that of the parameter 
to this method
+    String requestCollection = request.getCollection();
+    if (requestCollection != null) {
+      collection = requestCollection;
+    } else if (collection == null) {
+      collection = defaultCollection;
+    }
+
+    List<String> inputCollections =
+        collection == null ? List.of() : StrUtils.splitSmart(collection, ",", 
true);
+    return requestWithRetryOnStaleState(
+        request,
+        0,
+        inputCollections,
+        /*skipStateVersion*/ false,
+        Map.of(),
+        /*waitedForRefresh*/ false);
   }
 
   /**
-   * Connect to a cluster. If the cluster is not ready, retry connection up to 
a given timeout.
-   *
-   * @param duration the timeout
-   * @param timeUnit the units of the timeout
-   * @throws TimeoutException if the cluster is not ready after the timeout
-   * @throws InterruptedException if the wait is interrupted
+   * As this class doesn't watch external collections on the client side, 
there's a chance that the
+   * request will fail due to cached stale state, which means the state must 
be refreshed from ZK
+   * and retried.
    */
-  @Deprecated
-  public void connect(long duration, TimeUnit timeUnit)
-      throws TimeoutException, InterruptedException {
-    if (log.isInfoEnabled()) {
-      log.info(
-          "Waiting for {} {} for cluster at {} to be ready",
-          duration,
-          timeUnit,
-          getClusterStateProvider());
+  protected NamedList<Object> requestWithRetryOnStaleState(
+      SolrRequest<?> request,
+      int retryCount,
+      List<String> inputCollections,
+      boolean skipStateVersion,
+      Map<String, CompletableFuture<DocCollection>> pendingRefreshes,
+      boolean waitedForRefresh)
+      throws SolrServerException, IOException {
+    // build up a _stateVer_ param to pass to the server containing all the
+    // external collection state versions involved in this request, which 
allows
+    // the server to notify us that our cached state for one or more of the 
external
+    // collections is stale and needs to be refreshed ... this code has no 
impact on internal
+    // collections
+    String stateVerParam = null;
+    List<DocCollection> requestedCollections = null;
+    boolean isCollectionRequestOfV2 = false;
+    if (request instanceof V2Request) {
+      isCollectionRequestOfV2 = ((V2Request) request).isPerCollectionRequest();
     }
-    long timeout = System.nanoTime() + timeUnit.toNanos(duration);
-    while (System.nanoTime() < timeout) {
-      try {
-        connect();
-        if (log.isInfoEnabled()) {
-          log.info("Cluster at {} ready", getClusterStateProvider());
+    boolean isAdmin =
+        request.getRequestType() == SolrRequestType.ADMIN && 
!request.requiresCollection();
+    if (!inputCollections.isEmpty()
+        && !isAdmin
+        && !isCollectionRequestOfV2) { // don't do _stateVer_ checking for 
admin, v2 api requests
+      Set<String> requestedCollectionNames = resolveAliases(inputCollections);
+
+      StringBuilder stateVerParamBuilder = null;
+      for (String requestedCollection : requestedCollectionNames) {
+        // track the version of state we're using on the client side using the 
_stateVer_ param
+        DocCollection coll = getDocCollection(requestedCollection, null);
+        if (coll == null) {
+          throw new SolrException(
+              SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + 
requestedCollection);
         }
-        return;
-      } catch (RuntimeException e) {
-        // not ready yet, then...
-      }
-      TimeUnit.MILLISECONDS.sleep(250);
-    }
-    throw new TimeoutException("Timed out waiting for cluster");
-  }
+        int collVer = coll.getZNodeVersion();
+        if (requestedCollections == null)
+          requestedCollections = new 
ArrayList<>(requestedCollectionNames.size());
+        requestedCollections.add(coll);
 
-  @SuppressWarnings({"unchecked"})
-  private NamedList<Object> directUpdate(UpdateRequest request, String 
collection)
-      throws SolrServerException {
-    SolrParams params = request.getParams();
-    ModifiableSolrParams routableParams = new ModifiableSolrParams();
-    ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
+        if (stateVerParamBuilder == null) {
+          stateVerParamBuilder = new StringBuilder();
+        } else {
+          stateVerParamBuilder.append(
+              "|"); // hopefully pipe is not an allowed char in a collection 
name
+        }
 
-    if (params != null) {
-      nonRoutableParams.add(params);
-      routableParams.add(params);
-      for (String param : NON_ROUTABLE_PARAMS) {
-        routableParams.remove(param);
+        
stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
       }
-    } else {
-      params = new ModifiableSolrParams();
-    }
 
-    if (collection == null) {
-      throw new SolrServerException(
-          "No collection param specified on request and no default collection 
has been set.");
+      if (stateVerParamBuilder != null) {
+        stateVerParam = stateVerParamBuilder.toString();
+      }
     }
 
-    // Check to see if the collection is an alias. Updates to multi-collection 
aliases are ok as
-    // long as they are routed aliases
-    List<String> aliasedCollections = new 
ArrayList<>(resolveAliases(List.of(collection)));
-    if (aliasedCollections.size() == 1 || 
getClusterStateProvider().isRoutedAlias(collection)) {
-      collection = aliasedCollections.get(0); // pick 1st (consistent with 
HttpSolrCall behavior)
-    } else {
-      throw new SolrException(
-          SolrException.ErrorCode.BAD_REQUEST,
-          "Update request to non-routed multi-collection alias not supported: "
-              + collection
-              + " -> "
-              + aliasedCollections);
-    }
-
-    DocCollection col = getDocCollection(collection, null);
-
-    DocRouter router = col.getRouter();
-
-    if (router instanceof ImplicitDocRouter) {
-      // short circuit as optimization
-      return null;
-    }
-
-    ReplicaListTransformer replicaListTransformer =
-        requestRLTGenerator.getReplicaListTransformer(params);
-
-    // Create the URL map, which is keyed on slice name.
-    // The value is a list of URLs for each replica in the slice.
-    // The first value in the list is the leader for the slice.
-    final Map<String, List<String>> urlMap = buildUrlMap(col, 
replicaListTransformer);
-    String routeField =
-        (col.getRouter().getRouteField(col) == null) ? ID : 
col.getRouter().getRouteField(col);
-    final Map<String, ? extends LBSolrClient.Req> routes =
-        createRoutes(request, routableParams, col, router, urlMap, routeField);
-    if (routes == null) {
-      if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(request, 
routeField)) {
-        // we have info (documents with ids and/or ids to delete) with
-        // which to find the leaders, but we could not find (all of) them
-        throw new SolrException(
-            SolrException.ErrorCode.SERVICE_UNAVAILABLE,
-            "directUpdatesToLeadersOnly==true but could not find leader(s)");
+    if (request.getParams() instanceof ModifiableSolrParams params) {
+      if (!skipStateVersion && stateVerParam != null) {
+        params.set(STATE_VERSION, stateVerParam);
       } else {
-        // we could not find a leader or routes yet - use unoptimized general 
path
-        log.warn(
-            "No routing info found for update to collection '{}', broadcasting 
to all shards.",
-            collection);
-        return null;
+        params.remove(STATE_VERSION);
       }
-    }
-
-    final NamedList<Throwable> exceptions = new NamedList<>();
-    final NamedList<NamedList<?>> shardResponses =
-        new NamedList<>(routes.size() + 1); // +1 for deleteQuery
+    } // else: ??? how to set this ???
 
-    long start = System.nanoTime();
+    NamedList<Object> resp = null;
+    try {
+      resp = sendRequest(request, inputCollections);
+      // to avoid an O(n) operation we always add STATE_VERSION to the last 
and try to read it from
+      // there
+      Object o = resp == null || resp.size() == 0 ? null : 
resp.get(STATE_VERSION, resp.size() - 1);
+      if (o != null && o instanceof Map<?, ?> invalidStates) {
+        // remove this because no one else needs this and tests would fail if 
they are comparing
+        // responses
+        resp.remove(resp.size() - 1);
+        for (Map.Entry<?, ?> e : invalidStates.entrySet()) {
+          getDocCollection((String) e.getKey(), (Integer) e.getValue());
+        }
+      }
+    } catch (Exception exc) {
 
-    if (parallelUpdates) {
-      final Map<String, Future<NamedList<?>>> responseFutures =
-          CollectionUtil.newHashMap(routes.size());
-      for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
-        final String url = entry.getKey();
-        final LBSolrClient.Req lbRequest = entry.getValue();
-        try {
-          MDC.put("CloudSolrClient.url", url);
-          responseFutures.put(
-              url,
-              threadPool.submit(
-                  () -> {
-                    return getLbClient().request(lbRequest).getResponse();
-                  }));
-        } finally {
-          MDC.remove("CloudSolrClient.url");
+      Throwable rootCause = SolrException.getRootCause(exc);
+      // don't do retry support for admin requests
+      // or if the request doesn't have a collection specified
+      // or request is v2 api and its method is not GET
+      if (inputCollections.isEmpty()
+          || isAdmin
+          || (request.getApiVersion() == SolrRequest.ApiVersion.V2
+              && request.getMethod() != SolrRequest.METHOD.GET)) {
+        if (exc instanceof SolrServerException) {
+          throw (SolrServerException) exc;
+        } else if (exc instanceof IOException) {
+          throw (IOException) exc;
+        } else if (exc instanceof RuntimeException) {
+          throw (RuntimeException) exc;
+        } else {
+          throw new SolrServerException(rootCause);
         }
       }
 
-      for (final Map.Entry<String, Future<NamedList<?>>> entry : 
responseFutures.entrySet()) {
-        final String url = entry.getKey();
-        final Future<NamedList<?>> responseFuture = entry.getValue();
-        try {
-          shardResponses.add(url, responseFuture.get());
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-          exceptions.add(url, e.getCause());
+      int errorCode =
+          (rootCause instanceof SolrException)
+              ? ((SolrException) rootCause).code()
+              : SolrException.ErrorCode.UNKNOWN.code;
+
+      final boolean wasCommError = wasCommError(rootCause);
+
+      if (wasCommError
+          || (exc instanceof RouteException
+              && (errorCode == 503)) // 404 because the core does not exist 
503 service unavailable
+      // TODO there are other reasons for 404. We need to change the solr 
response format from HTML
+      // to structured data to know that
+      ) {
+        // it was a communication error. it is likely that
+        // the node to which the request to be sent is down . So , expire the 
state
+        // so that the next attempt would fetch the fresh state
+        // just re-read state for all of them, if it has not been retried
+        // in retryExpiryTime time
+        if (requestedCollections != null) {
+          for (DocCollection ext : requestedCollections) {
+            String name = ext.getName();
+            ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(name);
+            if (cacheEntry != null) {
+              if (wasCommError) {
+                cacheEntry.maybeStale = true;
+              } else {
+                boolean markedStale =
+                    
cacheEntry.markMaybeStaleIfOutsideBackoff(retryExpiryTimeNano);
+                if (markedStale && cacheEntry.shouldRetry()) {
+                  triggerCollectionRefresh(name);
+                }
+              }
+            } else {
+              triggerCollectionRefresh(name);
+            }
+          }
+        }
+        if (retryCount < MAX_STALE_RETRIES) { // if it is a communication 
error , we must try again
+          // may be, we have a stale version of the collection state,
+          // and we could not get any information from the server
+          // it is probably not worth trying again and again because
+          // the state would not have been updated
+          log.info(
+              "Request to collection {} failed due to ({}) {}, retry={} 
maxRetries={} commError={} errorCode={} - retrying",
+              inputCollections,
+              errorCode,
+              rootCause,
+              retryCount,
+              MAX_STALE_RETRIES,
+              wasCommError,
+              errorCode);
+          return requestWithRetryOnStaleState(
+              request,
+              retryCount + 1,
+              inputCollections,
+              skipStateVersion,
+              pendingRefreshes,
+              waitedForRefresh);
         }
+      } else {
+        log.info("request was not communication error it seems");
       }
+      log.info(
+          "Request to collection {} failed due to ({}) {}, retry={} 
maxRetries={} commError={} errorCode={} ",
+          inputCollections,
+          errorCode,
+          rootCause,
+          retryCount,
+          MAX_STALE_RETRIES,
+          wasCommError,
+          errorCode);
 
-      if (exceptions.size() > 0) {
-        Throwable firstException = exceptions.getVal(0);
-        if (firstException instanceof SolrException e) {
-          throw getRouteException(
-              SolrException.ErrorCode.getErrorCode(e.code()), exceptions, 
routes);
-        } else {
-          throw getRouteException(SolrException.ErrorCode.SERVER_ERROR, 
exceptions, routes);
+      boolean stateWasStale = false;
+      if (retryCount < MAX_STALE_RETRIES
+          && requestedCollections != null
+          && !requestedCollections.isEmpty()
+          && (SolrException.ErrorCode.getErrorCode(errorCode)
+                  == SolrException.ErrorCode.INVALID_STATE
+              || errorCode == 404)) {
+        // cached state for one or more external collections was stale
+        // re-issue request using updated state
+        stateWasStale = true;
+
+        // just re-read state for all of them, which is a little heavy-handed 
but hopefully a rare
+        // occurrence
+        for (DocCollection ext : requestedCollections) {
+          collectionStateCache.remove(ext.getName());
         }
       }
-    } else {
-      for (Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
-        String url = entry.getKey();
-        LBSolrClient.Req lbRequest = entry.getValue();
-        try {
-          NamedList<Object> rsp = 
getLbClient().request(lbRequest).getResponse();
-          shardResponses.add(url, rsp);
-        } catch (Exception e) {
-          if (e instanceof SolrException) {
-            throw (SolrException) e;
-          } else {
-            throw new SolrServerException(e);
+
+      // if we experienced a communication error, it's worth checking the state
+      // with ZK just to make sure the node we're trying to hit is still part 
of the collection
+      if (retryCount < MAX_STALE_RETRIES
+          && !stateWasStale
+          && requestedCollections != null
+          && !requestedCollections.isEmpty()
+          && wasCommError) {
+        for (DocCollection ext : requestedCollections) {
+          DocCollection latestStateFromZk = getDocCollection(ext.getName(), 
null);
+          if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
+            // looks like we couldn't reach the server because the state was 
stale == retry
+            stateWasStale = true;
+            // we just pulled state from ZK, so update the cache so that the 
retry uses it
+            collectionStateCache.put(
+                ext.getName(), new 
ExpiringCachedDocCollection(latestStateFromZk));
           }
         }
       }
-    }
 
-    UpdateRequest nonRoutableRequest = null;
-    List<String> deleteQuery = request.getDeleteQuery();
-    if (deleteQuery != null && deleteQuery.size() > 0) {
-      UpdateRequest deleteQueryRequest = new UpdateRequest();
-      deleteQueryRequest.setDeleteQuery(deleteQuery);
-      nonRoutableRequest = deleteQueryRequest;
-    }
-
-    Set<String> paramNames = nonRoutableParams.getParameterNames();
+      // if the state was stale, then we retry the request once with new state 
pulled from Zk
+      if (stateWasStale) {
+        log.warn(
+            "Re-trying request to collection(s) {} after stale state error 
from server.",
+            inputCollections);
 
-    Set<String> intersection = new HashSet<>(paramNames);
-    intersection.retainAll(NON_ROUTABLE_PARAMS);
+        Map<String, CompletableFuture<DocCollection>> refreshesToWaitFor = 
pendingRefreshes;
+        if (!waitedForRefresh && (pendingRefreshes == null || 
pendingRefreshes.isEmpty())) {
+          refreshesToWaitFor = new HashMap<>();
+          for (DocCollection ext : requestedCollections) {
+            refreshesToWaitFor.put(ext.getName(), 
triggerCollectionRefresh(ext.getName()));
+          }
+        }
 
-    if (nonRoutableRequest != null || intersection.size() > 0) {
-      if (nonRoutableRequest == null) {
-        nonRoutableRequest = new UpdateRequest();
-      }
-      nonRoutableRequest.setParams(nonRoutableParams);
-      nonRoutableRequest.setBasicAuthCredentials(
-          request.getBasicAuthUser(), request.getBasicAuthPassword());
-      final var endpoints =
-          routes.keySet().stream()
-              .map(url -> LBSolrClient.Endpoint.from(url))
-              .collect(Collectors.toList());
-      Collections.shuffle(endpoints, rand);
-      LBSolrClient.Req req = new LBSolrClient.Req(nonRoutableRequest, 
endpoints);
-      try {
-        LBSolrClient.Rsp rsp = getLbClient().request(req);
-        shardResponses.add(endpoints.get(0).toString(), rsp.getResponse());
-      } catch (Exception e) {
-        throw new SolrException(
-            SolrException.ErrorCode.SERVER_ERROR, endpoints.get(0).toString(), 
e);
+        // First retry without sending state versions so the server does not 
immediately reject the
+        // request while we intentionally rely on stale routing (e.g., to 
allow forwarding to a new
+        // leader) as the background refresh completes.
+        if (!skipStateVersion && !waitedForRefresh) {
+          resp =
+              requestWithRetryOnStaleState(
+                  request,
+                  retryCount + 1,
+                  inputCollections,
+                  /*skipStateVersion*/ true,
+                  refreshesToWaitFor,
+                  waitedForRefresh);
+        } else if (!waitedForRefresh
+            && refreshesToWaitFor != null
+            && !refreshesToWaitFor.isEmpty()) {
+          for (Map.Entry<String, CompletableFuture<DocCollection>> entry :
+              refreshesToWaitFor.entrySet()) {
+            waitForCollectionRefresh(entry.getKey(), entry.getValue());
+          }
+          resp =
+              requestWithRetryOnStaleState(
+                  request,
+                  retryCount + 1,
+                  inputCollections,
+                  /*skipStateVersion*/ false,
+                  Map.of(),
+                  /*waitedForRefresh*/ true);
+        } else {
+          resp =
+              requestWithRetryOnStaleState(
+                  request,
+                  retryCount + 1,
+                  inputCollections,
+                  /*skipStateVersion*/ false,
+                  Map.of(),
+                  /*waitedForRefresh*/ waitedForRefresh);
+        }
+      } else {
+        if (exc instanceof SolrException
+            || exc instanceof SolrServerException
+            || exc instanceof IOException) {
+          throw exc;
+        } else {
+          throw new SolrServerException(rootCause);
+        }
       }
-    }
 
-    long end = System.nanoTime();
+      if (requestedCollections != null) {
+        requestedCollections.clear(); // done with this
+      }
+    }
 
-    @SuppressWarnings({"rawtypes"})
-    RouteResponse rr =
-        condenseResponse(
-            shardResponses, (int) TimeUnit.MILLISECONDS.convert(end - start, 
TimeUnit.NANOSECONDS));
-    rr.setRouteResponses(shardResponses);
-    rr.setRoutes(routes);
-    return rr;
+    return resp;
   }
 
-  protected RouteException getRouteException(
-      SolrException.ErrorCode serverError,
-      NamedList<Throwable> exceptions,
-      Map<String, ? extends LBSolrClient.Req> routes) {
-    return new RouteException(serverError, exceptions, routes);
-  }
+  protected NamedList<Object> sendRequest(SolrRequest<?> request, List<String> 
inputCollections)
+      throws SolrServerException, IOException {
+    boolean sendToLeaders = false;
 
-  protected Map<String, ? extends LBSolrClient.Req> createRoutes(
-      UpdateRequest updateRequest,
-      ModifiableSolrParams routableParams,
-      DocCollection col,
-      DocRouter router,
-      Map<String, List<String>> urlMap,
-      String routeField) {
-    return urlMap == null
-        ? null
-        : updateRequest.getRoutesToCollection(router, col, urlMap, 
routableParams, routeField);
-  }
+    if (request.getRequestType() == SolrRequestType.UPDATE) {
+      sendToLeaders = this.isUpdatesToLeaders();
 
-  private Map<String, List<String>> buildUrlMap(
-      DocCollection col, ReplicaListTransformer replicaListTransformer) {
-    Map<String, List<String>> urlMap = new HashMap<>();
-    Collection<Slice> slices = col.getActiveSlices();
-    Set<String> liveNodes = getClusterStateProvider().getLiveNodes();
-    for (Slice slice : slices) {
-      String name = slice.getName();
-      List<Replica> sortedReplicas = new ArrayList<>();
-      Replica leader = slice.getLeader();
-      if (directUpdatesToLeadersOnly && leader == null) {
-        for (Replica replica :
-            slice.getReplicas(
-                replica -> replica.isActive(liveNodes) && replica.getType() == 
Replica.Type.NRT)) {
-          leader = replica;
-          break;
-        }
-      }
-      if (leader == null) {
-        if (directUpdatesToLeadersOnly) {
-          continue;
-        }
-        // take unoptimized general path - we cannot find a leader yet
-        return null;
-      }
+      if (sendToLeaders && request instanceof UpdateRequest updateRequest) {
+        sendToLeaders = sendToLeaders && updateRequest.isSendToLeaders();
 
-      if (!directUpdatesToLeadersOnly) {
-        for (Replica replica : slice.getReplicas()) {
-          if (!replica.equals(leader)) {
-            sortedReplicas.add(replica);
+        // Check if we can do a "directUpdate" ...
+        if (sendToLeaders) {
+          if (inputCollections.size() > 1) {
+            throw new SolrException(
+                SolrException.ErrorCode.BAD_REQUEST,
+                "Update request must be sent to a single collection "
+                    + "or an alias: "
+                    + inputCollections);
+          }
+          String collection =
+              inputCollections.isEmpty()
+                  ? null
+                  : inputCollections.get(0); // getting first mimics 
HttpSolrCall
+          NamedList<Object> response = directUpdate(updateRequest, collection);
+          if (response != null) {
+            return response;
           }
         }
       }
+    }
 
-      // Sort the non-leader replicas according to the request parameters
-      replicaListTransformer.transform(sortedReplicas);
-
-      // put the leaderUrl first.
-      sortedReplicas.add(0, leader);
+    SolrParams reqParams = request.getParams();
+    assert reqParams != null;
 
-      urlMap.put(
-          name, 
sortedReplicas.stream().map(Replica::getCoreUrl).collect(Collectors.toList()));
-    }
-    return urlMap;
-  }
+    ReplicaListTransformer replicaListTransformer =
+        requestRLTGenerator.getReplicaListTransformer(reqParams);
 
-  protected <T extends RouteResponse<?>> T condenseResponse(
-      NamedList<?> response, int timeMillis, Supplier<T> supplier) {
-    T condensed = supplier.get();
-    int status = 0;
-    Integer rf = null;
+    final ClusterStateProvider provider = getClusterStateProvider();
+    final String urlScheme = provider.getUrlScheme();
+    final Set<String> liveNodes = provider.getLiveNodes();
 
-    // TolerantUpdateProcessor
-    List<SimpleOrderedMap<String>> toleratedErrors = null;
-    int maxToleratedErrors = Integer.MAX_VALUE;
+    final List<LBSolrClient.Endpoint> requestEndpoints =
+        new ArrayList<>(); // we populate this as follows...
 
-    // For "adds", "deletes", "deleteByQuery" etc.
-    Map<String, NamedList<Object>> versions = new HashMap<>();
+    if (request.getApiVersion() == SolrRequest.ApiVersion.V2) {
+      if (!liveNodes.isEmpty()) {
+        List<String> liveNodesList = new ArrayList<>(liveNodes);
+        Collections.shuffle(liveNodesList, rand);
+        final var chosenNodeUrl = 
Utils.getBaseUrlForNodeName(liveNodesList.get(0), urlScheme);
+        requestEndpoints.add(new LBSolrClient.Endpoint(chosenNodeUrl));
+      }
 
-    for (int i = 0; i < response.size(); i++) {
-      NamedList<?> shardResponse = (NamedList<?>) response.getVal(i);
-      NamedList<?> header = (NamedList<?>) shardResponse.get("responseHeader");
-      Integer shardStatus = (Integer) header.get("status");
-      int s = shardStatus.intValue();
-      if (s > 0) {
-        status = s;
+    } else if (!request.requiresCollection()) {
+      for (String liveNode : liveNodes) {
+        final var nodeBaseUrl = Utils.getBaseUrlForNodeName(liveNode, 
urlScheme);
+        requestEndpoints.add(new LBSolrClient.Endpoint(nodeBaseUrl));
       }
-      Object rfObj = header.get(UpdateRequest.REPFACT);
-      if (rfObj != null && rfObj instanceof Integer routeRf) {
-        if (rf == null || routeRf < rf) rf = routeRf;
+    } else { // API call to a particular collection / core / alias (i.e.
+      // request.requiresCollection() == true)
+      Set<String> collectionNames = resolveAliases(inputCollections);
+      if (collectionNames.isEmpty()) {
+        throw new SolrException(
+            SolrException.ErrorCode.BAD_REQUEST,
+            "No collection param specified on request and no default 
collection has been set: "
+                + inputCollections);
       }
 
-      @SuppressWarnings("unchecked")
-      List<SimpleOrderedMap<String>> shardTolerantErrors =
-          (List<SimpleOrderedMap<String>>) header.get("errors");
-      if (null != shardTolerantErrors) {
-        Integer shardMaxToleratedErrors = (Integer) header.get("maxErrors");
-        assert null != shardMaxToleratedErrors
-            : "TolerantUpdateProcessor reported errors but not maxErrors";
-        // if we get into some weird state where the nodes disagree about the 
effective maxErrors,
-        // assume the min value seen to decide if we should fail.
-        maxToleratedErrors =
-            Math.min(
-                maxToleratedErrors,
-                
ToleratedUpdateError.getEffectiveMaxErrors(shardMaxToleratedErrors.intValue()));
-
-        if (null == toleratedErrors) {
-          toleratedErrors = new 
ArrayList<SimpleOrderedMap<String>>(shardTolerantErrors.size());
-        }
-        for (SimpleOrderedMap<String> err : shardTolerantErrors) {
-          toleratedErrors.add(err);
+      List<String> preferredNodes = request.getPreferredNodes();
+      if (preferredNodes != null && !preferredNodes.isEmpty()) {
+        String joinedInputCollections = StrUtils.join(inputCollections, ',');
+        final var endpoints =
+            preferredNodes.stream()
+                .map(nodeName -> Utils.getBaseUrlForNodeName(nodeName, 
urlScheme))
+                .map(nodeUrl -> new LBSolrClient.Endpoint(nodeUrl, 
joinedInputCollections))
+                .collect(Collectors.toList());
+        if (!endpoints.isEmpty()) {
+          LBSolrClient.Req req = new LBSolrClient.Req(request, endpoints);
+          LBSolrClient.Rsp rsp = getLbClient().request(req);
+          return rsp.getResponse();
         }
       }
-      for (String updateType : Arrays.asList("adds", "deletes", 
"deleteByQuery")) {
-        Object obj = shardResponse.get(updateType);
-        if (obj instanceof NamedList<?> nl) {
-          NamedList<Object> versionsList =
-              versions.containsKey(updateType) ? versions.get(updateType) : 
new NamedList<>();
-          versionsList.addAll(nl);
-          versions.put(updateType, versionsList);
+
+      // TODO: not a big deal because of the caching, but we could avoid 
looking
+      //   at every shard when getting leaders if we tweaked some things
+
+      // Retrieve slices from the cloud state and, for each collection 
specified, add it to the Map
+      // of slices.
+      Map<String, Slice> slices = new HashMap<>();
+      String shardKeys = reqParams.get(ShardParams._ROUTE_);
+      for (String collectionName : collectionNames) {
+        DocCollection col = getDocCollection(collectionName, null);
+        if (col == null) {
+          throw new SolrException(
+              SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + 
collectionName);
         }
+        Collection<Slice> routeSlices = 
col.getRouter().getSearchSlices(shardKeys, reqParams, col);
+        ClientUtils.addSlices(slices, collectionName, routeSlices, true);
       }
-    }
 
-    NamedList<Object> cheader = new NamedList<>();
-    cheader.add("status", status);
-    cheader.add("QTime", timeMillis);
-    if (rf != null) cheader.add(UpdateRequest.REPFACT, rf);
-    if (null != toleratedErrors) {
-      cheader.add("maxErrors", 
ToleratedUpdateError.getUserFriendlyMaxErrors(maxToleratedErrors));
-      cheader.add("errors", toleratedErrors);
-      if (maxToleratedErrors < toleratedErrors.size()) {
-        // cumulative errors are too high, we need to throw a client exception 
w/correct metadata
+      // Gather URLs, grouped by leader or replica
+      List<Replica> sortedReplicas = new ArrayList<>();
+      List<Replica> replicas = new ArrayList<>();
+      for (Slice slice : slices.values()) {
+        Replica leader = slice.getLeader();
+        for (Replica replica : slice.getReplicas()) {
+          String node = replica.getNodeName();
+          if (!liveNodes.contains(node) // Must be a live node to continue
+              || replica.getState()
+                  != Replica.State.ACTIVE) { // Must be an ACTIVE replica to 
continue
+            continue;
+          }
+          if (sendToLeaders && replica.equals(leader)) {
+            sortedReplicas.add(replica); // put leaders here eagerly (if 
sendToLeader mode)
+          } else {
+            replicas.add(replica); // replicas here
+          }
+        }
+      }
 
-        // NOTE: it shouldn't be possible for 1 == toleratedErrors.size(), 
because if that were the
-        // case then at least one shard should have thrown a real error before 
this, so we don't
-        // worry about having a more "singular" exception msg for that 
situation
-        StringBuilder msgBuf =
-            new StringBuilder()
-                .append(toleratedErrors.size())
-                .append(" Async failures during distributed update: ");
+      // Sort the leader replicas, if any, according to the request 
preferences    (none if
+      // !sendToLeaders)
+      replicaListTransformer.transform(sortedReplicas);
 
-        NamedList<String> metadata = new NamedList<>();
-        for (SimpleOrderedMap<String> err : toleratedErrors) {
-          ToleratedUpdateError te = ToleratedUpdateError.parseMap(err);
-          metadata.add(te.getMetadataKey(), te.getMetadataValue());
+      // Sort the replicas, if any, according to the request preferences and 
append to our list
+      replicaListTransformer.transform(replicas);
 
-          msgBuf.append("\n").append(te.getMessage());
-        }
+      sortedReplicas.addAll(replicas);
 
-        SolrException toThrow =
-            new SolrException(SolrException.ErrorCode.BAD_REQUEST, 
msgBuf.toString());
-        toThrow.setMetadata(metadata);
-        throw toThrow;
+      String joinedInputCollections = StrUtils.join(inputCollections, ',');
+      Set<String> seenNodes = new HashSet<>();
+      sortedReplicas.forEach(
+          replica -> {
+            if (seenNodes.add(replica.getNodeName())) {
+              if (inputCollections.size() == 1 && collectionNames.size() == 1) 
{
+                // If we have a single collection name (and not an alias to 
multiple collection),
+                // send the query directly to a replica of this collection.
+                requestEndpoints.add(
+                    new LBSolrClient.Endpoint(replica.getBaseUrl(), 
replica.getCoreName()));
+              } else {
+                requestEndpoints.add(
+                    new LBSolrClient.Endpoint(replica.getBaseUrl(), 
joinedInputCollections));
+              }
+            }
+          });
+
+      if (requestEndpoints.isEmpty()) {
+        collectionStateCache.keySet().removeAll(collectionNames);
+        throw new SolrException(
+            SolrException.ErrorCode.INVALID_STATE,
+            "Could not find a healthy node to handle the request.");
       }
     }
-    for (Map.Entry<String, NamedList<Object>> entry : versions.entrySet()) {
-      condensed.add(entry.getKey(), entry.getValue());
+
+    LBSolrClient.Req req = new LBSolrClient.Req(request, requestEndpoints);
+    LBSolrClient.Rsp rsp = getLbClient().request(req);
+    return rsp.getResponse();
+  }
+
+  /**
+   * Resolves the input collections to their possible aliased collections. 
Doesn't validate
+   * collection existence.
+   */
+  private Set<String> resolveAliases(List<String> inputCollections) {
+    if (inputCollections.isEmpty()) {
+      return Set.of();
     }
-    condensed.add("responseHeader", cheader);
-    return condensed;
+    LinkedHashSet<String> uniqueNames = new LinkedHashSet<>(); // consistent 
ordering
+    for (String collectionName : inputCollections) {
+      if (getDocCollection(collectionName, -1) == null) {
+        // perhaps it's an alias
+        
uniqueNames.addAll(getClusterStateProvider().resolveAlias(collectionName));
+      } else {
+        uniqueNames.add(collectionName); // it's a collection
+      }
+    }
+    return uniqueNames;
   }
 
-  @SuppressWarnings({"rawtypes"})
-  public RouteResponse condenseResponse(NamedList<?> response, int timeMillis) 
{
-    return condenseResponse(response, timeMillis, RouteResponse::new);
+  /**
+   * If true, this client has been configured such that it will generally 
prefer to send {@link
+   * SolrRequestType#UPDATE} requests to a shard leader, if and only if {@link
+   * UpdateRequest#isSendToLeaders} is also true. If false, then this client 
has been configured to
+   * obey normal routing preferences when dealing with {@link 
SolrRequestType#UPDATE} requests.
+   *
+   * @see #isDirectUpdatesToLeadersOnly
+   */
+  public boolean isUpdatesToLeaders() {
+    return updatesToLeaders;
   }
 
-  @SuppressWarnings({"rawtypes"})
-  public static class RouteResponse<T extends LBSolrClient.Req> extends 
NamedList<Object> {
-    private NamedList<NamedList<?>> routeResponses;
-    private Map<String, T> routes;
+  /**
+   * If true, this client has been configured such that "direct updates" will 
<em>only</em> be sent
+   * to the current leader of the corresponding shard, and will not be retried 
with other replicas.
+   * This method has no effect if {@link #isUpdatesToLeaders()} or {@link
+   * UpdateRequest#isSendToLeaders} returns false.
+   *
+   * <p>A "direct update" is any update that can be sent directly to a single 
shard, and does not
+   * need to be broadcast to every shard. (Example: document updates or 
"delete by id" when using
+   * the default router; non-direct updates are things like commits and 
"delete by query").
+   *
+   * <p>NOTE: If a single {@link UpdateRequest} contains multiple "direct 
updates" for different
+   * shards, this client may break the request up and merge the responses.
+   *
+   * @return true if direct updates are sent to shard leaders only
+   */
+  public boolean isDirectUpdatesToLeadersOnly() {
+    return directUpdatesToLeadersOnly;
+  }
 
-    public void setRouteResponses(NamedList<NamedList<?>> routeResponses) {
-      this.routeResponses = routeResponses;
-    }
+  /** Visible for tests so they can assert the configured refresh parallelism. 
*/
+  protected int getStateRefreshParallelism() {
+    return stateRefreshParallelism;
+  }
 
-    public NamedList<NamedList<?>> getRouteResponses() {
-      return routeResponses;
+  protected DocCollection getDocCollection(String collection, Integer 
expectedVersion)
+      throws SolrException {
+    if (expectedVersion == null) {
+      expectedVersion = -1;
+    }
+    if (collection == null) {
+      return null;
     }
 
-    public void setRoutes(Map<String, T> routes) {
-      this.routes = routes;
+    ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(collection);
+    if (cacheEntry != null && 
cacheEntry.isExpired(collectionStateCache.timeToLiveMs)) {
+      collectionStateCache.remove(collection, cacheEntry);
+      cacheEntry = null;
     }
 
-    public Map<String, T> getRoutes() {
-      return routes;
+    DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
+
+    if (cacheEntry != null && cacheEntry.shouldRetry()) {
+      triggerCollectionRefresh(collection);
     }
-  }
 
-  public static class RouteException extends SolrException {
+    if (cached != null && expectedVersion <= cached.getZNodeVersion()) {
+      return cached;
+    }
 
-    private NamedList<Throwable> throwables;
-    private Map<String, ? extends LBSolrClient.Req> routes;
+    CompletableFuture<DocCollection> refreshFuture = 
triggerCollectionRefresh(collection);
+    return waitForCollectionRefresh(collection, refreshFuture);
+  }
 
-    public RouteException(
-        ErrorCode errorCode,
-        NamedList<Throwable> throwables,
-        Map<String, ? extends LBSolrClient.Req> routes) {
-      super(errorCode, throwables.getVal(0).getMessage(), 
throwables.getVal(0));
-      this.throwables = throwables;
-      this.routes = routes;
+  private CompletableFuture<DocCollection> triggerCollectionRefresh(String 
collection) {
+    return collectionRefreshes.compute(
+        collection,
+        (key, existingFuture) -> {
+          // A refresh is still in progress; return it.
+          if (existingFuture != null && !existingFuture.isDone()) {
+            return existingFuture;
+          }
+          // No refresh is in-progress, so trigger it.
 
-      // create a merged copy of the metadata from all wrapped exceptions
-      NamedList<String> metadata = new NamedList<String>();
-      for (int i = 0; i < throwables.size(); i++) {
-        Throwable t = throwables.getVal(i);
-        if (t instanceof SolrException e) {
-          NamedList<String> eMeta = e.getMetadata();
-          if (null != eMeta) {
-            metadata.addAll(eMeta);
+          if (ExecutorUtil.isShutdown(threadPool)) {
+            assert closed; // see close() for the sequence
+            ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(key);
+            DocCollection cached = cacheEntry == null ? null : 
cacheEntry.cached;
+            return CompletableFuture.completedFuture(cached);
+          } else {
+            return CompletableFuture.supplyAsync(
+                () -> {
+                  stateRefreshSemaphore.acquireUninterruptibly();
+                  try {
+                    return loadDocCollection(key);
+                  } finally {
+                    stateRefreshSemaphore.release();
+                    // Remove the entry in case of many collections
+                    collectionRefreshes.remove(key);
+                  }
+                },
+                threadPool);
           }
-        }
-      }
-      if (0 < metadata.size()) {
-        this.setMetadata(metadata);
+        });
+  }
+
+  private DocCollection waitForCollectionRefresh(
+      String collection, CompletableFuture<DocCollection> refreshFuture) {
+    try {
+      return refreshFuture.get();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new SolrException(
+          SolrException.ErrorCode.SERVER_ERROR,
+          "Interrupted while refreshing state for collection " + collection,
+          e);
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SolrException) {
+        throw (SolrException) cause;
       }
+      throw new SolrException(
+          SolrException.ErrorCode.SERVER_ERROR,
+          "Error refreshing state for collection " + collection,
+          cause);
     }
+  }
 
-    public NamedList<Throwable> getThrowables() {
-      return throwables;
+  private DocCollection loadDocCollection(String collection) {
+    ClusterState.CollectionRef ref = getCollectionRef(collection);
+    if (ref == null) {
+      collectionStateCache.remove(collection);
+      return null;
     }
 
-    public Map<String, ? extends LBSolrClient.Req> getRoutes() {
-      return this.routes;
+    DocCollection fetchedCol = ref.get();
+    if (fetchedCol == null) {
+      collectionStateCache.remove(collection);
+      return null;
     }
-  }
 
-  @Override
-  public NamedList<Object> request(SolrRequest<?> request, String collection)
-      throws SolrServerException, IOException {
-    // the collection parameter of the request overrides that of the parameter 
to this method
-    String requestCollection = request.getCollection();
-    if (requestCollection != null) {
-      collection = requestCollection;
-    } else if (collection == null) {
-      collection = defaultCollection;
+    ExpiringCachedDocCollection existing = 
collectionStateCache.peek(collection);
+    if (existing != null && existing.cached.getZNodeVersion() == 
fetchedCol.getZNodeVersion()) {
+      existing.setRetriedAt();
+      existing.maybeStale = false;
+      return existing.cached;
     }
 
-    List<String> inputCollections =
-        collection == null ? List.of() : StrUtils.splitSmart(collection, ",", 
true);
-    return requestWithRetryOnStaleState(
-        request,
-        0,
-        inputCollections,
-        /*skipStateVersion*/ false,
-        Map.of(),
-        /*waitedForRefresh*/ false);
+    collectionStateCache.put(collection, new 
ExpiringCachedDocCollection(fetchedCol));
+    return fetchedCol;
+  }
+
+  ClusterState.CollectionRef getCollectionRef(String collection) {
+    return getClusterStateProvider().getState(collection);
   }
 
   /**
-   * As this class doesn't watch external collections on the client side, 
there's a chance that the
-   * request will fail due to cached stale state, which means the state must 
be refreshed from ZK
-   * and retried.
+   * Useful for determining the minimum achieved replication factor across all 
shards involved in
+   * processing an update request, typically useful for gauging the 
replication factor of a batch.
    */
-  protected NamedList<Object> requestWithRetryOnStaleState(
-      SolrRequest<?> request,
-      int retryCount,
-      List<String> inputCollections,
-      boolean skipStateVersion,
-      Map<String, CompletableFuture<DocCollection>> pendingRefreshes,
-      boolean waitedForRefresh)
-      throws SolrServerException, IOException {
-    // build up a _stateVer_ param to pass to the server containing all the
-    // external collection state versions involved in this request, which 
allows
-    // the server to notify us that our cached state for one or more of the 
external
-    // collections is stale and needs to be refreshed ... this code has no 
impact on internal
-    // collections
-    String stateVerParam = null;
-    List<DocCollection> requestedCollections = null;
-    boolean isCollectionRequestOfV2 = false;
-    if (request instanceof V2Request) {
-      isCollectionRequestOfV2 = ((V2Request) request).isPerCollectionRequest();
-    }
-    boolean isAdmin =
-        request.getRequestType() == SolrRequestType.ADMIN && 
!request.requiresCollection();
-    if (!inputCollections.isEmpty()
-        && !isAdmin
-        && !isCollectionRequestOfV2) { // don't do _stateVer_ checking for 
admin, v2 api requests
-      Set<String> requestedCollectionNames = resolveAliases(inputCollections);
-
-      StringBuilder stateVerParamBuilder = null;
-      for (String requestedCollection : requestedCollectionNames) {
-        // track the version of state we're using on the client side using the 
_stateVer_ param
-        DocCollection coll = getDocCollection(requestedCollection, null);
-        if (coll == null) {
-          throw new SolrException(
-              SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + 
requestedCollection);
-        }
-        int collVer = coll.getZNodeVersion();
-        if (requestedCollections == null)
-          requestedCollections = new 
ArrayList<>(requestedCollectionNames.size());
-        requestedCollections.add(coll);
-
-        if (stateVerParamBuilder == null) {
-          stateVerParamBuilder = new StringBuilder();
-        } else {
-          stateVerParamBuilder.append(
-              "|"); // hopefully pipe is not an allowed char in a collection 
name
-        }
-
-        
stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
-      }
+  public int getMinAchievedReplicationFactor(String collection, NamedList<?> 
resp) {
+    // it's probably already on the top-level header set by condense
+    NamedList<?> header = (NamedList<?>) resp.get("responseHeader");
+    Integer achRf = (Integer) header.get(UpdateRequest.REPFACT);
+    if (achRf != null) return achRf.intValue();
 
-      if (stateVerParamBuilder != null) {
-        stateVerParam = stateVerParamBuilder.toString();
+    // not on the top-level header, walk the shard route tree
+    Map<String, Integer> shardRf = getShardReplicationFactor(collection, resp);
+    for (Integer rf : shardRf.values()) {
+      if (achRf == null || rf < achRf) {
+        achRf = rf;
       }
     }
+    return (achRf != null) ? achRf.intValue() : -1;
+  }
 
-    if (request.getParams() instanceof ModifiableSolrParams params) {
-      if (!skipStateVersion && stateVerParam != null) {
-        params.set(STATE_VERSION, stateVerParam);
-      } else {
-        params.remove(STATE_VERSION);
-      }
-    } // else: ??? how to set this ???
-
-    NamedList<Object> resp = null;
-    try {
-      resp = sendRequest(request, inputCollections);
-      // to avoid an O(n) operation we always add STATE_VERSION to the last 
and try to read it from
-      // there
-      Object o = resp == null || resp.size() == 0 ? null : 
resp.get(STATE_VERSION, resp.size() - 1);
-      if (o != null && o instanceof Map<?, ?> invalidStates) {
-        // remove this because no one else needs this and tests would fail if 
they are comparing
-        // responses
-        resp.remove(resp.size() - 1);
-        for (Map.Entry<?, ?> e : invalidStates.entrySet()) {
-          getDocCollection((String) e.getKey(), (Integer) e.getValue());
-        }
-      }
-    } catch (Exception exc) {
-
-      Throwable rootCause = SolrException.getRootCause(exc);
-      // don't do retry support for admin requests
-      // or if the request doesn't have a collection specified
-      // or request is v2 api and its method is not GET
-      if (inputCollections.isEmpty()
-          || isAdmin
-          || (request.getApiVersion() == SolrRequest.ApiVersion.V2
-              && request.getMethod() != SolrRequest.METHOD.GET)) {
-        if (exc instanceof SolrServerException) {
-          throw (SolrServerException) exc;
-        } else if (exc instanceof IOException) {
-          throw (IOException) exc;
-        } else if (exc instanceof RuntimeException) {
-          throw (RuntimeException) exc;
-        } else {
-          throw new SolrServerException(rootCause);
+  /**
+   * Walks the NamedList response after performing an update request looking 
for the replication
+   * factor that was achieved in each shard involved in the request. For 
single doc updates, there
+   * will be only one shard in the return value.
+   */
+  public Map<String, Integer> getShardReplicationFactor(String collection, 
NamedList<?> resp) {
+    Map<String, Integer> results = new HashMap<>();
+    if (resp instanceof RouteResponse) {
+      NamedList<NamedList<?>> routes = ((RouteResponse<?>) 
resp).getRouteResponses();
+      DocCollection coll = getDocCollection(collection, null);
+      Map<String, String> leaders = new HashMap<>();
+      for (Slice slice : coll.getActiveSlices()) {
+        Replica leader = slice.getLeader();
+        if (leader != null) {
+          String leaderUrl = leader.getBaseUrl() + "/" + leader.getCoreName();
+          leaders.put(leaderUrl, slice.getName());
+          String altLeaderUrl = leader.getBaseUrl() + "/" + collection;
+          leaders.put(altLeaderUrl, slice.getName());
         }
       }
 
-      int errorCode =
-          (rootCause instanceof SolrException)
-              ? ((SolrException) rootCause).code()
-              : SolrException.ErrorCode.UNKNOWN.code;
-
-      final boolean wasCommError = wasCommError(rootCause);
-
-      if (wasCommError
-          || (exc instanceof RouteException
-              && (errorCode == 503)) // 404 because the core does not exist 
503 service unavailable
-      // TODO there are other reasons for 404. We need to change the solr 
response format from HTML
-      // to structured data to know that
-      ) {
-        // it was a communication error. it is likely that
-        // the node to which the request to be sent is down . So , expire the 
state
-        // so that the next attempt would fetch the fresh state
-        // just re-read state for all of them, if it has not been retried
-        // in retryExpiryTime time
-        if (requestedCollections != null) {
-          for (DocCollection ext : requestedCollections) {
-            String name = ext.getName();
-            ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(name);
-            if (cacheEntry != null) {
-              if (wasCommError) {
-                cacheEntry.maybeStale = true;
-              } else {
-                boolean markedStale =
-                    
cacheEntry.markMaybeStaleIfOutsideBackoff(retryExpiryTimeNano);
-                if (markedStale && cacheEntry.shouldRetry()) {
-                  triggerCollectionRefresh(name);
-                }
-              }
-            } else {
-              triggerCollectionRefresh(name);
+      Iterator<Map.Entry<String, NamedList<?>>> routeIter = routes.iterator();
+      while (routeIter.hasNext()) {
+        Map.Entry<String, NamedList<?>> next = routeIter.next();
+        String host = next.getKey();
+        NamedList<?> hostResp = next.getValue();
+        Integer rf =
+            (Integer) ((NamedList<?>) 
hostResp.get("responseHeader")).get(UpdateRequest.REPFACT);
+        if (rf != null) {
+          String shard = leaders.get(host);
+          if (shard == null) {
+            if (host.endsWith("/")) shard = leaders.get(host.substring(0, 
host.length() - 1));
+            if (shard == null) {
+              shard = host;
             }
           }
+          results.put(shard, rf);
         }
-        if (retryCount < MAX_STALE_RETRIES) { // if it is a communication 
error , we must try again
-          // may be, we have a stale version of the collection state,
-          // and we could not get any information from the server
-          // it is probably not worth trying again and again because
-          // the state would not have been updated
-          log.info(
-              "Request to collection {} failed due to ({}) {}, retry={} 
maxRetries={} commError={} errorCode={} - retrying",
-              inputCollections,
-              errorCode,
-              rootCause,
-              retryCount,
-              MAX_STALE_RETRIES,
-              wasCommError,
-              errorCode);
-          return requestWithRetryOnStaleState(
-              request,
-              retryCount + 1,
-              inputCollections,
-              skipStateVersion,
-              pendingRefreshes,
-              waitedForRefresh);
-        }
-      } else {
-        log.info("request was not communication error it seems");
       }
-      log.info(
-          "Request to collection {} failed due to ({}) {}, retry={} 
maxRetries={} commError={} errorCode={} ",
-          inputCollections,
-          errorCode,
-          rootCause,
-          retryCount,
-          MAX_STALE_RETRIES,
-          wasCommError,
-          errorCode);
-
-      boolean stateWasStale = false;
-      if (retryCount < MAX_STALE_RETRIES
-          && requestedCollections != null
-          && !requestedCollections.isEmpty()
-          && (SolrException.ErrorCode.getErrorCode(errorCode)
-                  == SolrException.ErrorCode.INVALID_STATE
-              || errorCode == 404)) {
-        // cached state for one or more external collections was stale
-        // re-issue request using updated state
-        stateWasStale = true;
+    }
+    return results;
+  }
 
-        // just re-read state for all of them, which is a little heavy-handed 
but hopefully a rare
-        // occurrence
-        for (DocCollection ext : requestedCollections) {
-          collectionStateCache.remove(ext.getName());
-        }
-      }
+  /**
+   * Constructs {@link CloudSolrClient} instances from provided configuration. 
It will use a Jetty
+   * based {@code HttpClient} if available, or will otherwise use the JDK.
+   */
+  public static class Builder {
 
-      // if we experienced a communication error, it's worth checking the state
-      // with ZK just to make sure the node we're trying to hit is still part 
of the collection
-      if (retryCount < MAX_STALE_RETRIES
-          && !stateWasStale
-          && requestedCollections != null
-          && !requestedCollections.isEmpty()
-          && wasCommError) {
-        for (DocCollection ext : requestedCollections) {
-          DocCollection latestStateFromZk = getDocCollection(ext.getName(), 
null);
-          if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
-            // looks like we couldn't reach the server because the state was 
stale == retry
-            stateWasStale = true;
-            // we just pulled state from ZK, so update the cache so that the 
retry uses it
-            collectionStateCache.put(
-                ext.getName(), new 
ExpiringCachedDocCollection(latestStateFromZk));
-          }
-        }
-      }
-
-      // if the state was stale, then we retry the request once with new state 
pulled from Zk
-      if (stateWasStale) {
-        log.warn(
-            "Re-trying request to collection(s) {} after stale state error 
from server.",
-            inputCollections);
-
-        Map<String, CompletableFuture<DocCollection>> refreshesToWaitFor = 
pendingRefreshes;
-        if (!waitedForRefresh && (pendingRefreshes == null || 
pendingRefreshes.isEmpty())) {
-          refreshesToWaitFor = new HashMap<>();
-          for (DocCollection ext : requestedCollections) {
-            refreshesToWaitFor.put(ext.getName(), 
triggerCollectionRefresh(ext.getName()));
-          }
-        }
+    protected Collection<String> zkHosts = new ArrayList<>();
+    protected List<String> solrUrls = new ArrayList<>();
+    protected String zkChroot;
+    protected HttpSolrClient httpClient;
+    protected boolean shardLeadersOnly = true;
+    protected boolean directUpdatesToLeadersOnly = false;
+    protected boolean parallelUpdates = true;
+    protected ClusterStateProvider stateProvider;
+    protected HttpSolrClient.BuilderBase<?, ?> internalClientBuilder;
+    protected RequestWriter requestWriter;
+    protected ResponseParser responseParser;
+    protected long retryExpiryTimeNano =
+        TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS); // 3 seconds or 3 
million nanos
 
-        // First retry without sending state versions so the server does not 
immediately reject the
-        // request while we intentionally rely on stale routing (e.g., to 
allow forwarding to a new
-        // leader) as the background refresh completes.
-        if (!skipStateVersion && !waitedForRefresh) {
-          resp =
-              requestWithRetryOnStaleState(
-                  request,
-                  retryCount + 1,
-                  inputCollections,
-                  /*skipStateVersion*/ true,
-                  refreshesToWaitFor,
-                  waitedForRefresh);
-        } else if (!waitedForRefresh
-            && refreshesToWaitFor != null
-            && !refreshesToWaitFor.isEmpty()) {
-          for (Map.Entry<String, CompletableFuture<DocCollection>> entry :
-              refreshesToWaitFor.entrySet()) {
-            waitForCollectionRefresh(entry.getKey(), entry.getValue());
-          }
-          resp =
-              requestWithRetryOnStaleState(
-                  request,
-                  retryCount + 1,
-                  inputCollections,
-                  /*skipStateVersion*/ false,
-                  Map.of(),
-                  /*waitedForRefresh*/ true);
-        } else {
-          resp =
-              requestWithRetryOnStaleState(
-                  request,
-                  retryCount + 1,
-                  inputCollections,
-                  /*skipStateVersion*/ false,
-                  Map.of(),
-                  /*waitedForRefresh*/ waitedForRefresh);
-        }
-      } else {
-        if (exc instanceof SolrException
-            || exc instanceof SolrServerException
-            || exc instanceof IOException) {
-          throw exc;
-        } else {
-          throw new SolrServerException(rootCause);
-        }
-      }
+    protected String defaultCollection;
+    protected long timeToLiveSeconds = 60;
+    protected int parallelCacheRefreshesLocks = 
DEFAULT_STATE_REFRESH_PARALLELISM;
+    protected int zkConnectTimeout = 
SolrZkClientTimeout.DEFAULT_ZK_CONNECT_TIMEOUT;
+    protected int zkClientTimeout = 
SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT;
+    protected boolean canUseZkACLs = true;
 
-      if (requestedCollections != null) {
-        requestedCollections.clear(); // done with this
-      }
+    /**
+     * Provide a series of Solr URLs to be used when configuring {@link 
CloudSolrClient} instances.
+     * The solr client will use these urls to understand the cluster topology, 
which solr nodes are
+     * active etc.
+     *
+     * <p>Provided Solr URLs are expected to point to the root Solr path
+     * ("http://hostname:8983/solr";); they should not include any collections, 
cores, or other path
+     * components.
+     *
+     * <p>Usage example:
+     *
+     * <pre>
+     *   final List&lt;String&gt; solrBaseUrls = new ArrayList&lt;String&gt;();
+     *   solrBaseUrls.add("http://solr1:8983/solr";); 
solrBaseUrls.add("http://solr2:8983/solr";); 
solrBaseUrls.add("http://solr3:8983/solr";);
+     *   final SolrClient client = new 
CloudSolrClient.Builder(solrBaseUrls).build();
+     * </pre>
+     */
+    public Builder(List<String> solrUrls) {
+      this.solrUrls = solrUrls;
     }
 
-    return resp;
-  }
-
-  protected NamedList<Object> sendRequest(SolrRequest<?> request, List<String> 
inputCollections)
-      throws SolrServerException, IOException {
-    boolean sendToLeaders = false;
-
-    if (request.getRequestType() == SolrRequestType.UPDATE) {
-      sendToLeaders = this.isUpdatesToLeaders();
-
-      if (sendToLeaders && request instanceof UpdateRequest updateRequest) {
-        sendToLeaders = sendToLeaders && updateRequest.isSendToLeaders();
-
-        // Check if we can do a "directUpdate" ...
-        if (sendToLeaders) {
-          if (inputCollections.size() > 1) {
-            throw new SolrException(
-                SolrException.ErrorCode.BAD_REQUEST,
-                "Update request must be sent to a single collection "
-                    + "or an alias: "
-                    + inputCollections);
-          }
-          String collection =
-              inputCollections.isEmpty()
-                  ? null
-                  : inputCollections.get(0); // getting first mimics 
HttpSolrCall
-          NamedList<Object> response = directUpdate(updateRequest, collection);
-          if (response != null) {
-            return response;
-          }
-        }
-      }
+    /**
+     * Provide a series of ZK hosts which will be used when configuring {@link 
CloudSolrClient}
+     * instances.
+     *
+     * <p>Usage example when Solr stores data at the ZooKeeper root ('/'):
+     *
+     * <pre>
+     *   final List&lt;String&gt; zkServers = new ArrayList&lt;String&gt;();
+     *   zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181"); 
zkServers.add("zookeeper3:2181");
+     *   final SolrClient client = new CloudSolrClient.Builder(zkServers, 
Optional.empty()).build();
+     * </pre>
+     *
+     * Usage example when Solr data is stored in a ZooKeeper chroot:
+     *
+     * <pre>
+     *    final List&lt;String&gt; zkServers = new ArrayList&lt;String&gt;();
+     *    zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181"); 
zkServers.add("zookeeper3:2181");
+     *    final SolrClient client = new CloudSolrClient.Builder(zkServers, 
Optional.of("/solr")).build();
+     *  </pre>
+     *
+     * @param zkHosts a List of at least one ZooKeeper host and port (e.g. 
"zookeeper1:2181")
+     * @param zkChroot the path to the root ZooKeeper node containing Solr 
data. Provide {@code
+     *     java.util.Optional.empty()} if no ZK chroot is used.
+     * @deprecated Use a connectionString constructor and/or prefer HTTP URLs 
instead.
+     */
+    @Deprecated(since = "10.1") // sort of 10.0 but accidentally removed
+    public Builder(List<String> zkHosts, Optional<String> zkChroot) {
+      this.zkHosts = zkHosts;
+      if (zkChroot.isPresent()) this.zkChroot = zkChroot.get();
     }
 
-    SolrParams reqParams = request.getParams();
-    assert reqParams != null;
-
-    ReplicaListTransformer replicaListTransformer =
-        requestRLTGenerator.getReplicaListTransformer(reqParams);
-
-    final ClusterStateProvider provider = getClusterStateProvider();
-    final String urlScheme = provider.getUrlScheme();
-    final Set<String> liveNodes = provider.getLiveNodes();
-
-    final List<LBSolrClient.Endpoint> requestEndpoints =
-        new ArrayList<>(); // we populate this as follows...
-
-    if (request.getApiVersion() == SolrRequest.ApiVersion.V2) {
-      if (!liveNodes.isEmpty()) {
-        List<String> liveNodesList = new ArrayList<>(liveNodes);
-        Collections.shuffle(liveNodesList, rand);
-        final var chosenNodeUrl = 
Utils.getBaseUrlForNodeName(liveNodesList.get(0), urlScheme);
-        requestEndpoints.add(new LBSolrClient.Endpoint(chosenNodeUrl));
-      }
-
-    } else if (!request.requiresCollection()) {
-      for (String liveNode : liveNodes) {
-        final var nodeBaseUrl = Utils.getBaseUrlForNodeName(liveNode, 
urlScheme);
-        requestEndpoints.add(new LBSolrClient.Endpoint(nodeBaseUrl));
-      }
-    } else { // API call to a particular collection / core / alias (i.e.
-      // request.requiresCollection() == true)
-      Set<String> collectionNames = resolveAliases(inputCollections);
-      if (collectionNames.isEmpty()) {
-        throw new SolrException(
-            SolrException.ErrorCode.BAD_REQUEST,
-            "No collection param specified on request and no default 
collection has been set: "
-                + inputCollections);
-      }
-
-      List<String> preferredNodes = request.getPreferredNodes();
-      if (preferredNodes != null && !preferredNodes.isEmpty()) {
-        String joinedInputCollections = StrUtils.join(inputCollections, ',');
-        final var endpoints =
-            preferredNodes.stream()
-                .map(nodeName -> Utils.getBaseUrlForNodeName(nodeName, 
urlScheme))
-                .map(nodeUrl -> new LBSolrClient.Endpoint(nodeUrl, 
joinedInputCollections))
-                .collect(Collectors.toList());
-        if (!endpoints.isEmpty()) {
-          LBSolrClient.Req req = new LBSolrClient.Req(request, endpoints);
-          LBSolrClient.Rsp rsp = getLbClient().request(req);
-          return rsp.getResponse();
-        }
-      }
-
-      // TODO: not a big deal because of the caching, but we could avoid 
looking
-      //   at every shard when getting leaders if we tweaked some things
+    /** for an expert use-case */
+    public Builder(ClusterStateProvider stateProvider) {
+      this.stateProvider = stateProvider;
+    }
 
-      // Retrieve slices from the cloud state and, for each collection 
specified, add it to the Map
-      // of slices.
-      Map<String, Slice> slices = new HashMap<>();
-      String shardKeys = reqParams.get(ShardParams._ROUTE_);
-      for (String collectionName : collectionNames) {
-        DocCollection col = getDocCollection(collectionName, null);
-        if (col == null) {
-          throw new SolrException(
-              SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + 
collectionName);
-        }
-        Collection<Slice> routeSlices = 
col.getRouter().getSearchSlices(shardKeys, reqParams, col);
-        ClientUtils.addSlices(slices, collectionName, routeSlices, true);
-      }
+    /**
+     * Creates a client builder based on a connection string of 2 possible 
formats:
+     *
+     * <ul>
+     *   <li>ZooKeeper connection string (optionally with chroot), e.g. {@code
+     *       zk1:2181,zk2:2181,zk3:2181/solr}
+     *   <li>Comma-separated list of Solr node base URLs (HTTP or HTTPS), e.g. 
{@code
+     *       http://solr1:8983/solr,http://solr2:8983/solr}
+     * </ul>
+     *
+     * @param connectionString a string specifying either ZooKeeper connection 
string or HTTP(S)
+     *     Solr URLs
+     * @throws IllegalArgumentException if string is null, empty, or malformed
+     */
+    public Builder(String connectionString) {
+      this(CloudSolrClientConnection.parse(connectionString));
+    }
 
-      // Gather URLs, grouped by leader or replica
-      List<Replica> sortedReplicas = new ArrayList<>();
-      List<Replica> replicas = new ArrayList<>();
-      for (Slice slice : slices.values()) {
-        Replica leader = slice.getLeader();
-        for (Replica replica : slice.getReplicas()) {
-          String node = replica.getNodeName();
-          if (!liveNodes.contains(node) // Must be a live node to continue
-              || replica.getState()
-                  != Replica.State.ACTIVE) { // Must be an ACTIVE replica to 
continue
-            continue;
-          }
-          if (sendToLeaders && replica.equals(leader)) {
-            sortedReplicas.add(replica); // put leaders here eagerly (if 
sendToLeader mode)
-          } else {
-            replicas.add(replica); // replicas here
-          }
-        }
+    /**
+     * Creates a client builder from a {@link CloudSolrClientConnection}.
+     *
+     * @param connection instance of {@link CloudSolrClientConnection}, which 
can be obtained from
+     *     the solr connection string or created via the constructor
+     */
+    public Builder(CloudSolrClientConnection connection) {
+      if (connection.isZookeeper()) {
+        this.zkHosts = connection.quorumItems();
+        this.zkChroot = connection.zkChroot();
+      } else {
+        this.solrUrls = connection.quorumItems();
       }
+    }
 
-      // Sort the leader replicas, if any, according to the request 
preferences    (none if
-      // !sendToLeaders)
-      replicaListTransformer.transform(sortedReplicas);
+    /** Whether to use the default ZK ACLs when building a ZK Client. */
+    public Builder canUseZkACLs(boolean canUseZkACLs) {
+      this.canUseZkACLs = canUseZkACLs;
+      return this;
+    }
 
-      // Sort the replicas, if any, according to the request preferences and 
append to our list
-      replicaListTransformer.transform(replicas);
+    /**
+     * Tells {@link Builder} that created clients should be configured such 
that {@link
+     * CloudSolrClient#isUpdatesToLeaders} returns <code>true</code>.
+     *
+     * @see #sendUpdatesToAnyReplica
+     * @see CloudSolrClient#isUpdatesToLeaders
+     */
+    public Builder sendUpdatesOnlyToShardLeaders() {
+      shardLeadersOnly = true;
+      return this;
+    }
 
-      sortedReplicas.addAll(replicas);
+    /**
+     * Tells {@link Builder} that created clients should be configured such 
that {@link
+     * CloudSolrClient#isUpdatesToLeaders} returns <code>false</code>.
+     *
+     * @see #sendUpdatesOnlyToShardLeaders
+     * @see CloudSolrClient#isUpdatesToLeaders
+     */
+    public Builder sendUpdatesToAnyReplica() {
+      shardLeadersOnly = false;
+      return this;
+    }
 
-      String joinedInputCollections = StrUtils.join(inputCollections, ',');
-      Set<String> seenNodes = new HashSet<>();
-      sortedReplicas.forEach(
-          replica -> {
-            if (seenNodes.add(replica.getNodeName())) {
-              if (inputCollections.size() == 1 && collectionNames.size() == 1) 
{
-                // If we have a single collection name (and not an alias to 
multiple collection),
-                // send the query directly to a replica of this collection.
-                requestEndpoints.add(
-                    new LBSolrClient.Endpoint(replica.getBaseUrl(), 
replica.getCoreName()));
-              } else {
-                requestEndpoints.add(
-                    new LBSolrClient.Endpoint(replica.getBaseUrl(), 
joinedInputCollections));
-              }
-            }
-          });
+    /**
+     * Tells {@link CloudSolrClient.Builder} that created clients should send 
direct updates to
+     * shard leaders only.
+     *
+     * <p>UpdateRequests whose leaders cannot be found will "fail fast" on the 
client side with a
+     * {@link SolrException}
+     *
+     * @see #sendDirectUpdatesToAnyShardReplica
+     * @see CloudSolrClient#isDirectUpdatesToLeadersOnly
+     */
+    public Builder sendDirectUpdatesToShardLeadersOnly() {
+      directUpdatesToLeadersOnly = true;
+      return this;
+    }
 
-      if (requestEndpoints.isEmpty()) {
-        collectionStateCache.keySet().removeAll(collectionNames);
-        throw new SolrException(
-            SolrException.ErrorCode.INVALID_STATE,
-            "Could not find a healthy node to handle the request.");
-      }
+    /**
+     * Tells {@link CloudSolrClient.Builder} that created clients can send 
updates to any shard
+     * replica (shard leaders and non-leaders).
+     *
+     * <p>Shard leaders are still preferred, but the created clients will fall 
back to using other
+     * replicas if a leader cannot be found.
+     *
+     * @see #sendDirectUpdatesToShardLeadersOnly
+     * @see CloudSolrClient#isDirectUpdatesToLeadersOnly
+     */
+    public Builder sendDirectUpdatesToAnyShardReplica() {
+      directUpdatesToLeadersOnly = false;
+      return this;
     }
 
-    LBSolrClient.Req req = new LBSolrClient.Req(request, requestEndpoints);
-    LBSolrClient.Rsp rsp = getLbClient().request(req);
-    return rsp.getResponse();
-  }
-
-  /**
-   * Resolves the input collections to their possible aliased collections. 
Doesn't validate
-   * collection existence.
-   */
-  private Set<String> resolveAliases(List<String> inputCollections) {
-    if (inputCollections.isEmpty()) {
-      return Set.of();
-    }
-    LinkedHashSet<String> uniqueNames = new LinkedHashSet<>(); // consistent 
ordering
-    for (String collectionName : inputCollections) {
-      if (getDocCollection(collectionName, -1) == null) {
-        // perhaps it's an alias
-        
uniqueNames.addAll(getClusterStateProvider().resolveAlias(collectionName));
-      } else {
-        uniqueNames.add(collectionName); // it's a collection
-      }
+    /** Provides a {@link RequestWriter} for created clients to use when 
handing requests. */
+    public Builder withRequestWriter(RequestWriter requestWriter) {
+      this.requestWriter = requestWriter;
+      return this;
     }
-    return uniqueNames;
-  }
 
-  /**
-   * If true, this client has been configured such that it will generally 
prefer to send {@link
-   * SolrRequestType#UPDATE} requests to a shard leader, if and only if {@link
-   * UpdateRequest#isSendToLeaders} is also true. If false, then this client 
has been configured to
-   * obey normal routing preferences when dealing with {@link 
SolrRequestType#UPDATE} requests.
-   *
-   * @see #isDirectUpdatesToLeadersOnly
-   */
-  public boolean isUpdatesToLeaders() {
-    return updatesToLeaders;
-  }
+    /** Provides a {@link ResponseParser} for created clients to use when 
handling requests. */
+    public Builder withResponseParser(ResponseParser responseParser) {
+      this.responseParser = responseParser;
+      return this;
+    }
 
-  /**
-   * If true, this client has been configured such that "direct updates" will 
<em>only</em> be sent
-   * to the current leader of the corresponding shard, and will not be retried 
with other replicas.
-   * This method has no effect if {@link #isUpdatesToLeaders()} or {@link
-   * UpdateRequest#isSendToLeaders} returns false.
-   *
-   * <p>A "direct update" is any update that can be sent directly to a single 
shard, and does not
-   * need to be broadcast to every shard. (Example: document updates or 
"delete by id" when using
-   * the default router; non-direct updates are things like commits and 
"delete by query").
-   *
-   * <p>NOTE: If a single {@link UpdateRequest} contains multiple "direct 
updates" for different
-   * shards, this client may break the request up and merge the responses.
-   *
-   * @return true if direct updates are sent to shard leaders only
-   */
-  public boolean isDirectUpdatesToLeadersOnly() {
-    return directUpdatesToLeadersOnly;
-  }
+    /**
+     * Tells {@link CloudSolrClient.Builder} whether created clients should 
send shard updates
+     * serially or in parallel
+     *
+     * <p>When an {@link UpdateRequest} affects multiple shards, {@link 
CloudSolrClient} splits it
+     * up and sends a request to each affected shard. This setting chooses 
whether those
+     * sub-requests are sent serially or in parallel.
+     *
+     * <p>If not set, this defaults to 'true' and sends sub-requests in 
parallel.
+     */
+    public Builder withParallelUpdates(boolean parallelUpdates) {
+      this.parallelUpdates = parallelUpdates;
+      return this;
+    }
 
-  /** Visible for tests so they can assert the configured refresh parallelism. 
*/
-  protected int getStateRefreshParallelism() {
-    return stateRefreshParallelism;
-  }
+    /**
+     * Configures how many collection state refresh operations may run in 
parallel using a dedicated
+     * thread pool. This controls the maximum number of concurrent 
ZooKeeper/cluster state lookups.
+     *
+     * <p>Defaults to 5.
+     */
+    public Builder withParallelCacheRefreshes(int parallelCacheRefreshesLocks) 
{
+      this.parallelCacheRefreshesLocks = parallelCacheRefreshesLocks;
+      return this;
+    }
 
-  protected DocCollection getDocCollection(String collection, Integer 
expectedVersion)
-      throws SolrException {
-    if (expectedVersion == null) {
-      expectedVersion = -1;
+    /**
+     * This is the time to wait to re-fetch the state after getting the same 
state version from ZK
+     */
+    public Builder withRetryExpiryTime(long expiryTime, TimeUnit unit) {
+      this.retryExpiryTimeNano = TimeUnit.NANOSECONDS.convert(expiryTime, 
unit);
+      return this;
     }
-    if (collection == null) {
-      return null;
+
+    /** Sets the default collection for request. */
+    public Builder withDefaultCollection(String defaultCollection) {
+      this.defaultCollection = defaultCollection;
+      return this;
     }
 
-    ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(collection);
-    if (cacheEntry != null && 
cacheEntry.isExpired(collectionStateCache.timeToLiveMs)) {
-      collectionStateCache.remove(collection, cacheEntry);
-      cacheEntry = null;
+    /**
+     * Sets the cache ttl for DocCollection Objects cached.
+     *
+     * @param timeToLive ttl value
+     */
+    public Builder withCollectionCacheTtl(long timeToLive, TimeUnit unit) {
+      assert timeToLive > 0;
+      this.timeToLiveSeconds = TimeUnit.SECONDS.convert(timeToLive, unit);
+      return this;
     }
 
-    DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
+    /**
+     * Set the internal Solr HTTP client.
+     *
+     * <p>Note: closing the client instance is the responsibility of the 
caller.
+     *
+     * @return this
+     */
+    public Builder withHttpClient(HttpSolrClient httpSolrClient) {
+      if (this.internalClientBuilder != null) {
+        throw new IllegalStateException(
+            "The builder can't accept an httpClient AND an 
internalClientBuilder, only one of those can be provided");
+      }
+      this.httpClient = httpSolrClient;
+      return this;
+    }
 
-    if (cacheEntry != null && cacheEntry.shouldRetry()) {
-      triggerCollectionRefresh(collection);
+    /**
+     * If provided, the CloudSolrClient will build it's internal client using 
this builder (instead
+     * of the empty default one). Providing this builder allows users to 
configure the internal
+     * clients (authentication, timeouts, etc.).
+     *
+     * @param internalClientBuilder the builder to use for creating the 
internal http client.
+     * @return this
+     */
+    public Builder withHttpClientBuilder(HttpSolrClient.BuilderBase<?, ?> 
internalClientBuilder) {
+      if (this.httpClient != null) {
+        throw new IllegalStateException(
+            "The builder can't accept an httpClient AND an 
internalClientBuilder, only one of those can be provided");
+      }
+      this.internalClientBuilder = internalClientBuilder;
+      return this;
     }
 
-    if (cached != null && expectedVersion <= cached.getZNodeVersion()) {
-      return cached;
+    @Deprecated(since = "9.10")
+    public Builder withInternalClientBuilder(
+        HttpSolrClient.BuilderBase<?, ?> internalClientBuilder) {
+      return withHttpClientBuilder(internalClientBuilder);
     }
 
-    CompletableFuture<DocCollection> refreshFuture = 
triggerCollectionRefresh(collection);
-    return waitForCollectionRefresh(collection, refreshFuture);
-  }
+    /**
+     * Sets the Zk connection timeout
+     *
+     * @param zkConnectTimeout timeout value
+     * @param unit time unit
+     */
+    public Builder withZkConnectTimeout(int zkConnectTimeout, TimeUnit unit) {
+      this.zkConnectTimeout = Math.toIntExact(unit.toMillis(zkConnectTimeout));
+      return this;
+    }
 
-  private CompletableFuture<DocCollection> triggerCollectionRefresh(String 
collection) {
-    return collectionRefreshes.compute(
-        collection,
-        (key, existingFuture) -> {
-          // A refresh is still in progress; return it.
-          if (existingFuture != null && !existingFuture.isDone()) {
-            return existingFuture;
-          }
-          // No refresh is in-progress, so trigger it.
+    /**
+     * Sets the Zk client session timeout
+     *
+     * @param zkClientTimeout timeout value
+     * @param unit time unit
+     */
+    public Builder withZkClientTimeout(int zkClientTimeout, TimeUnit unit) {
+      this.zkClientTimeout = Math.toIntExact(unit.toMillis(zkClientTimeout));
+      return this;
+    }
 
-          if (ExecutorUtil.isShutdown(threadPool)) {
-            assert closed; // see close() for the sequence
-            ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(key);
-            DocCollection cached = cacheEntry == null ? null : 
cacheEntry.cached;
-            return CompletableFuture.completedFuture(cached);
-          } else {
-            return CompletableFuture.supplyAsync(
-                () -> {
-                  stateRefreshSemaphore.acquireUninterruptibly();
-                  try {
-                    return loadDocCollection(key);
-                  } finally {
-                    stateRefreshSemaphore.release();
-                    // Remove the entry in case of many collections
-                    collectionRefreshes.remove(key);
-                  }
-                },
-                threadPool);
-          }
-        });
-  }
+    /** Create a {@link CloudSolrClient} based on the provided configuration. 
*/
+    public CloudHttp2SolrClient build() {
+      int providedOptions = 0;
+      if (!zkHosts.isEmpty()) providedOptions++;
+      if (!solrUrls.isEmpty()) providedOptions++;
+      if (stateProvider != null) providedOptions++;
 
-  private DocCollection waitForCollectionRefresh(
-      String collection, CompletableFuture<DocCollection> refreshFuture) {
-    try {
-      return refreshFuture.get();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new SolrException(
-          SolrException.ErrorCode.SERVER_ERROR,
-          "Interrupted while refreshing state for collection " + collection,
-          e);
-    } catch (ExecutionException e) {
-      Throwable cause = e.getCause();
-      if (cause instanceof SolrException) {
-        throw (SolrException) cause;
+      if (providedOptions > 1) {
+        throw new IllegalArgumentException(
+            "Only one of zkHost(s), solrUrl(s), or stateProvider should be 
specified.");
+      } else if (providedOptions == 0) {
+        throw new IllegalArgumentException(
+            "One of zkHosts, solrUrls, or stateProvider must be specified.");
       }
-      throw new SolrException(
-          SolrException.ErrorCode.SERVER_ERROR,
-          "Error refreshing state for collection " + collection,
-          cause);
+
+      return new CloudHttp2SolrClient(this);
     }
-  }
 
-  private DocCollection loadDocCollection(String collection) {
-    ClusterState.CollectionRef ref = getCollectionRef(collection);
-    if (ref == null) {
-      collectionStateCache.remove(collection);
-      return null;
+    protected HttpSolrClient createOrGetHttpClient() {
+      if (httpClient != null) {
+        return httpClient;
+      } else if (internalClientBuilder != null) {
+        return internalClientBuilder.build();
+      } else {
+        return HttpSolrClient.builder(null).build();
+      }
     }
 
-    DocCollection fetchedCol = ref.get();
-    if (fetchedCol == null) {
-      collectionStateCache.remove(collection);
-      return null;
+    protected LBSolrClient createOrGetLbClient(HttpSolrClient myClient) {
+      return myClient.createLBSolrClient();
     }
 
-    ExpiringCachedDocCollection existing = 
collectionStateCache.peek(collection);
-    if (existing != null && existing.cached.getZNodeVersion() == 
fetchedCol.getZNodeVersion()) {
-      existing.setRetriedAt();
-      existing.maybeStale = false;
-      return existing.cached;
+    protected ClusterStateProvider createZkClusterStateProvider() {
+      ClusterStateProvider stateProvider =
+          ClusterStateProvider.newZkClusterStateProvider(zkHosts, zkChroot, 
canUseZkACLs);
+      if (stateProvider instanceof 
SolrZkClientTimeout.SolrZkClientTimeoutAware timeoutAware) {
+        timeoutAware.setZkClientTimeout(zkClientTimeout);
+        timeoutAware.setZkConnectTimeout(zkConnectTimeout);
+      }
+      return stateProvider;
     }
 
-    collectionStateCache.put(collection, new 
ExpiringCachedDocCollection(fetchedCol));
-    return fetchedCol;
-  }
-
-  ClusterState.CollectionRef getCollectionRef(String collection) {
-    return getClusterStateProvider().getState(collection);
+    protected ClusterStateProvider 
createHttpClusterStateProvider(HttpSolrClient httpClient) {
+      try {
+        return new HttpClusterStateProvider<>(solrUrls, httpClient);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            "Couldn't initialize a HttpClusterStateProvider (is/are the "
+                + "Solr server(s), "
+                + solrUrls
+                + ", down?)",
+            e);
+      }
+    }
   }
 
-  /**
-   * Useful for determining the minimum achieved replication factor across all 
shards involved in
-   * processing an update request, typically useful for gauging the 
replication factor of a batch.
-   */
-  public int getMinAchievedReplicationFactor(String collection, NamedList<?> 
resp) {
-    // it's probably already on the top-level header set by condense
-    NamedList<?> header = (NamedList<?>) resp.get("responseHeader");
-    Integer achRf = (Integer) header.get(UpdateRequest.REPFACT);
-    if (achRf != null) return achRf.intValue();
+  protected static class StateCache extends ConcurrentHashMap<String, 
ExpiringCachedDocCollection> {
+    final AtomicLong puts = new AtomicLong();
+    final AtomicLong hits = new AtomicLong();
+    final Lock evictLock = new ReentrantLock(true);
+    public volatile long timeToLiveMs = 60 * 1000L;
 
-    // not on the top-level header, walk the shard route tree
-    Map<String, Integer> shardRf = getShardReplicationFactor(collection, resp);
-    for (Integer rf : shardRf.values()) {
-      if (achRf == null || rf < achRf) {
-        achRf = rf;
+    @Override
+    public ExpiringCachedDocCollection get(Object key) {
+      ExpiringCachedDocCollection val = super.get(key);
+      if (val == null) {
+        // a new collection is likely to be added now.
+        // check if there are stale items and remove them
+        evictStale();
+        return null;
+      }
+      if (val.isExpired(timeToLiveMs)) {
+        super.remove(key);
+        return null;
       }
+      hits.incrementAndGet();
+      return val;
     }
-    return (achRf != null) ? achRf.intValue() : -1;
-  }
 
-  /**
-   * Walks the NamedList response after performing an update request looking 
for the replication
-   * factor that was achieved in each shard involved in the request. For 
single doc updates, there
-   * will be only one shard in the return value.
-   */
-  public Map<String, Integer> getShardReplicationFactor(String collection, 
NamedList<?> resp) {
-    Map<String, Integer> results = new HashMap<>();
-    if (resp instanceof RouteResponse) {
-      NamedList<NamedList<?>> routes = ((RouteResponse<?>) 
resp).getRouteResponses();
-      DocCollection coll = getDocCollection(collection, null);
-      Map<String, String> leaders = new HashMap<>();
-      for (Slice slice : coll.getActiveSlices()) {
-        Replica leader = slice.getLeader();
-        if (leader != null) {
-          String leaderUrl = leader.getBaseUrl() + "/" + leader.getCoreName();
-          leaders.put(leaderUrl, slice.getName());
-          String altLeaderUrl = leader.getBaseUrl() + "/" + collection;
-          leaders.put(altLeaderUrl, slice.getName());
-        }
-      }
+    ExpiringCachedDocCollection peek(Object key) {
+      return super.get(key);
+    }
 
-      Iterator<Map.Entry<String, NamedList<?>>> routeIter = routes.iterator();
-      while (routeIter.hasNext()) {
-        Map.Entry<String, NamedList<?>> next = routeIter.next();
-        String host = next.getKey();
-        NamedList<?> hostResp = next.getValue();
-        Integer rf =
-            (Integer) ((NamedList<?>) 
hostResp.get("responseHeader")).get(UpdateRequest.REPFACT);
-        if (rf != null) {
-          String shard = leaders.get(host);
-          if (shard == null) {
-            if (host.endsWith("/")) shard = leaders.get(host.substring(0, 
host.length() - 1));
-            if (shard == null) {
-              shard = host;
-            }
+    @Override
+    public ExpiringCachedDocCollection put(String key, 
ExpiringCachedDocCollection value) {
+      puts.incrementAndGet();
+      return super.put(key, value);
+    }
+
+    void evictStale() {
+      if (!evictLock.tryLock()) return;
+      try {
+        for (Entry<String, ExpiringCachedDocCollection> e : entrySet()) {
+          if (e.getValue().isExpired(timeToLiveMs)) {
+            super.remove(e.getKey());
           }
-          results.put(shard, rf);
         }
+      } finally {
+        evictLock.unlock();
       }
     }
-    return results;
   }
 
-  /**
-   * Determines whether an UpdateRequest contains sufficient routing 
information to identify shard
-   * leaders for direct updates when directUpdatesToLeadersOnly is enabled.
-   */
-  private static boolean hasInfoToFindLeaders(UpdateRequest updateRequest, 
String idField) {
-    final Map<SolrInputDocument, Map<String, Object>> documents = 
updateRequest.getDocumentsMap();
-    final Map<String, Map<String, Object>> deleteById = 
updateRequest.getDeleteByIdMap();
+  @SuppressWarnings({"rawtypes"})
+  public static class RouteResponse<T extends LBSolrClient.Req> extends 
NamedList<Object> {
+    private NamedList<NamedList<?>> routeResponses;
+    private Map<String, T> routes;
 
-    final boolean hasNoDocuments = (documents == null || documents.isEmpty());
-    final boolean hasNoDeleteById = (deleteById == null || 
deleteById.isEmpty());
-    if (hasNoDocuments && hasNoDeleteById) {
-      // no documents and no delete-by-id, so no info to find leader(s)
-      return false;
+    public void setRouteResponses(NamedList<NamedList<?>> routeResponses) {
+      this.routeResponses = routeResponses;
     }
 
-    if (documents != null) {
-      for (final Map.Entry<SolrInputDocument, Map<String, Object>> entry : 
documents.entrySet()) {
-        final SolrInputDocument doc = entry.getKey();
-        final Object fieldValue = doc.getFieldValue(idField);
-        if (fieldValue == null) {
-          // a document with no id field value, so can't find leader for it
-          return false;
-        }
-      }
+    public NamedList<NamedList<?>> getRouteResponses() {
+      return routeResponses;
     }
 
-    if (deleteById != null) {
-      for (final Map.Entry<String, Map<String, Object>> entry : 
deleteById.entrySet()) {
-        final Map<String, Object> params = entry.getValue();
-        if (params == null || params.get(ShardParams._ROUTE_) == null) {
-          // deleteById entry lacks explicit route parameter, can't find 
leader for it
-          return false;
+    public void setRoutes(Map<String, T> routes) {
+      this.routes = routes;
+    }
+
+    public Map<String, T> getRoutes() {
+      return routes;
+    }
+  }
+
+  public static class RouteException extends SolrException {
+
+    private NamedList<Throwable> throwables;
+    private Map<String, ? extends LBSolrClient.Req> routes;
+
+    public RouteException(
+        ErrorCode errorCode,
+        NamedList<Throwable> throwables,
+        Map<String, ? extends LBSolrClient.Req> routes) {
+      super(errorCode, throwables.getVal(0).getMessage(), 
throwables.getVal(0));
+      this.throwables = throwables;
+      this.routes = routes;
+
+      // create a merged copy of the metadata from all wrapped exceptions
+      NamedList<String> metadata = new NamedList<String>();
+      for (int i = 0; i < throwables.size(); i++) {
+        Throwable t = throwables.getVal(i);
+        if (t instanceof SolrException e) {
+          NamedList<String> eMeta = e.getMetadata();
+          if (null != eMeta) {
+            metadata.addAll(eMeta);
+          }
         }
       }
+      if (0 < metadata.size()) {
+        this.setMetadata(metadata);
+      }
     }
 
-    return true;
+    public NamedList<Throwable> getThrowables() {
+      return throwables;
+    }
+
+    public Map<String, ? extends LBSolrClient.Req> getRoutes() {
+      return this.routes;
+    }
   }
 
   /** Universal connection string parser logic. */
@@ -1887,4 +1833,55 @@ public abstract class CloudSolrClient extends SolrClient 
{
       return String.join(",", quorumItems) + (isZookeeper && zkChroot != null 
? zkChroot : "");
     }
   }
+
+  class ExpiringCachedDocCollection {
+    final DocCollection cached;
+    final long cachedAtNano;
+    // This is the time at which the collection is retried and got the same 
old version
+    volatile long retriedAtNano = -1;
+    // flag that suggests that this is potentially to be rechecked
+    volatile boolean maybeStale = false;
+
+    ExpiringCachedDocCollection(DocCollection cached) {
+      this.cached = cached;
+      this.cachedAtNano = System.nanoTime();
+    }
+
+    boolean isExpired(long timeToLiveMs) {
+      return (System.nanoTime() - cachedAtNano)
+          > TimeUnit.NANOSECONDS.convert(timeToLiveMs, TimeUnit.MILLISECONDS);
+    }
+
+    boolean shouldRetry() {
+      if (maybeStale) { // we are not sure if it is stale so check with retry 
time
+        if ((retriedAtNano == -1 || (System.nanoTime() - retriedAtNano) > 
retryExpiryTimeNano)) {
+          return true; // we retried a while back. and we could not get 
anything new.
+          // it's likely that it is not going to be available now also.
+        }
+      }
+      return false;
+    }
+
+    void setRetriedAt() {
+      retriedAtNano = System.nanoTime();
+    }
+
+    /**
+     * Marks this entry as {@code maybeStale} if the provided backoff window 
has elapsed since the
+     * last retry.
+     *
+     * @return {@code true} if the entry was flagged as maybe stale
+     */
+    boolean markMaybeStaleIfOutsideBackoff(long retryBackoffNano) {
+      if (maybeStale) {
+        return true;
+      }
+      long lastRetry = retriedAtNano;
+      if (lastRetry != -1 && (System.nanoTime() - lastRetry) <= 
retryBackoffNano) {
+        return false;
+      }
+      maybeStale = true;
+      return true;
+    }
+  }
 }

Reply via email to