This is an automated email from the ASF dual-hosted git repository.

broustant pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new 58d34f10fb0 SOLR-16438: Support optional split.setPreferredLeaders 
prop in shard split command.
58d34f10fb0 is described below

commit 58d34f10fb0e933d1fe91f681ebee1f7306df041
Author: Bruno Roustant <[email protected]>
AuthorDate: Fri Dec 23 14:30:19 2022 +0100

    SOLR-16438: Support optional split.setPreferredLeaders prop in shard split 
command.
---
 ...istributedCollectionConfigSetCommandRunner.java |  12 +-
 .../solr/cloud/api/collections/SplitShardCmd.java  | 146 ++++++++++------
 .../solr/handler/admin/CollectionsHandler.java     | 192 +++++++++++++++++---
 .../solr/handler/admin/RebalanceLeaders.java       |  90 +++++++++-
 .../test/org/apache/solr/cloud/SplitShardTest.java | 194 ++++++++++++++++-----
 .../solr/cloud/api/collections/ShardSplitTest.java |  13 +-
 .../solr/handler/admin/TestCollectionAPIs.java     |  15 +-
 .../deployment-guide/pages/shard-management.adoc   |  12 ++
 .../solrj/request/CollectionAdminRequest.java      |  11 +-
 .../solr/common/params/CommonAdminParams.java      |   6 +
 10 files changed, 546 insertions(+), 145 deletions(-)

diff --git 
a/solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java
 
b/solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java
index 07fe08f377b..caf001140ad 100644
--- 
a/solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java
+++ 
b/solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java
@@ -163,9 +163,9 @@ public class DistributedCollectionConfigSetCommandRunner {
   }
 
   /**
-   * When {@link 
org.apache.solr.handler.admin.CollectionsHandler#invokeAction} does not enqueue 
to
-   * overseer queue and instead calls this method, this method is expected to 
do the equivalent of
-   * what Overseer does in {@link
+   * When {@link 
org.apache.solr.handler.admin.CollectionsHandler#invokeOperation} does not 
enqueue
+   * to overseer queue and instead calls this method, this method is expected 
to do the equivalent
+   * of what Overseer does in {@link
    * org.apache.solr.cloud.OverseerConfigSetMessageHandler#processMessage}.
    *
    * <p>The steps leading to that call in the Overseer execution path are (and 
the equivalent is
@@ -235,9 +235,9 @@ public class DistributedCollectionConfigSetCommandRunner {
   }
 
   /**
-   * When {@link 
org.apache.solr.handler.admin.CollectionsHandler#invokeAction} does not enqueue 
to
-   * overseer queue and instead calls this method, this method is expected to 
do the equivalent of
-   * what Overseer does in {@link
+   * When {@link 
org.apache.solr.handler.admin.CollectionsHandler#invokeOperation} does not 
enqueue
+   * to overseer queue and instead calls this method, this method is expected 
to do the equivalent
+   * of what Overseer does in {@link
    * 
org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler#processMessage}.
    *
    * <p>The steps leading to that call in the Overseer execution path are (and 
the equivalent is
diff --git 
a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java 
b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index 23473694b59..3b3ab835359 100644
--- 
a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ 
b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -28,7 +28,9 @@ import static 
org.apache.solr.common.params.CollectionParams.CollectionAction.CR
 import static 
org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 import static org.apache.solr.common.params.CommonAdminParams.NUM_SUB_SHARDS;
+import static 
org.apache.solr.handler.admin.CollectionsHandler.AUTO_PREFERRED_LEADERS;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -46,6 +48,7 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.NodeStateProvider;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.VersionedData;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.cloud.DistributedClusterStateUpdater;
 import org.apache.solr.cloud.Overseer;
@@ -114,21 +117,22 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
    * <ul>
    *   <li>1. Verify that there is enough disk space to create sub-shards.
    *   <li>2. If splitByPrefix is true, make request to get prefix ranges.
-   *   <li>3. If this split was attempted previously and there are lingering 
sub-shards, delete
-   *       them.
-   *   <li>4. Create sub-shards in CONSTRUCTION state.
-   *   <li>5. Add an initial replica to each sub-shard.
-   *   <li>6. Request that parent shard wait for children to become ACTIVE.
-   *   <li>7. Execute split: either LINK or REWRITE.
-   *   <li>8. Apply buffered updates to the sub-shards so they are up-to-date 
with parent.
-   *   <li>9. Determine node placement for additional replicas (but do not 
create yet).
-   *   <li>10. If replicationFactor is more than 1, set shard state for 
sub-shards to RECOVERY; else
+   *   <li>3. Fill the sub-shards ranges.
+   *   <li>4. If this split was attempted previously and there are lingering 
INACTIVE sub-shards,
+   *       delete them.
+   *   <li>5. Create sub-shards in CONSTRUCTION state.
+   *   <li>6. Add an initial replica to each sub-shard.
+   *   <li>7. Request that parent shard wait for children to become ACTIVE.
+   *   <li>8. Execute split: either LINK or REWRITE.
+   *   <li>9. Apply buffered updates to the sub-shards so they are up-to-date 
with parent.
+   *   <li>10. Determine node placement for additional replicas (but do not 
create yet).
+   *   <li>11. If replicationFactor is more than 1, set shard state for 
sub-shards to RECOVERY; else
    *       mark ACTIVE.
-   *   <li>11. Create additional replicas of sub-shards.
+   *   <li>12. Create additional replicas of sub-shards.
+   *   <li>13. If setPreferredLeaders param is true, set the preferred leader 
property on one
+   *       replica of each sub-shard. Distribute preferred leaders evenly 
among the nodes.
    * </ul>
    *
-   * <br>
-   *
    * <p>There is a shard split doc (dev-docs/shard-split/shard-split.adoc) on 
how shard split works;
    * illustrated with diagrams.
    */
@@ -192,19 +196,25 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Interrupted.");
     }
 
+    boolean setPreferredLeaders =
+        message.getBool(
+            CommonAdminParams.SPLIT_SET_PREFERRED_LEADERS,
+            Boolean.getBoolean(AUTO_PREFERRED_LEADERS));
+
+    // 1. Verify that there is enough disk space to create sub-shards.
     RTimerTree t;
-    if (ccc.getCoreContainer().getNodeConfig().getMetricsConfig().isEnabled()) 
{
-      // check disk space for shard split
-      if 
(Boolean.parseBoolean(System.getProperty(SHARDSPLIT_CHECKDISKSPACE_ENABLED, 
"true"))) {
-        // 1. verify that there is enough space on disk to create sub-shards
+    if (ccc.getCoreContainer().getNodeConfig().getMetricsConfig().isEnabled()
+        && 
Boolean.parseBoolean(System.getProperty(SHARDSPLIT_CHECKDISKSPACE_ENABLED, 
"true"))) {
+      if (log.isDebugEnabled()) {
         log.debug(
-            "SplitShardCmd: verify that there is enough space on disk to 
create sub-shards for slice: {}",
+            "Check disk space before splitting shard {} on replica {}",
+            slice.get(),
             parentShardLeader);
-        t = timings.sub("checkDiskSpace");
-        checkDiskSpace(
-            collectionName, slice.get(), parentShardLeader, splitMethod, 
ccc.getSolrCloudManager());
-        t.stop();
       }
+      t = timings.sub("checkDiskSpace");
+      checkDiskSpace(
+          collectionName, slice.get(), parentShardLeader, splitMethod, 
ccc.getSolrCloudManager());
+      t.stop();
     }
 
     // let's record the ephemeralOwner of the parent leader node
@@ -283,8 +293,7 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
 
       ShardHandler shardHandler = ccc.newShardHandler();
 
-      // 2. if split request has splitByPrefix set to true, make request to 
SplitOp to get prefix
-      // ranges of sub-shards
+      // 2. If splitByPrefix is true, make request to get prefix ranges.
       if (message.getBool(CommonAdminParams.SPLIT_BY_PREFIX, false)) {
         t = timings.sub("getRanges");
 
@@ -328,8 +337,8 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
         t.stop();
       }
 
+      // 3. Fill the sub-shards ranges.
       t = timings.sub("fillRanges");
-
       String rangesStr =
           fillRanges(
               ccc.getSolrCloudManager(),
@@ -342,8 +351,8 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
               firstNrtReplica);
       t.stop();
 
-      // 3. if this shard has attempted a split before and failed, there will 
be lingering INACTIVE
-      // sub-shards.  Clean these up before proceeding
+      // 4. If this split was attempted previously and there are lingering 
INACTIVE sub-shards,
+      // delete them.
       boolean oldShardsDeleted = false;
       for (String subSlice : subSlices) {
         Slice oSlice = collection.getSlice(subSlice);
@@ -374,16 +383,14 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
           }
         }
       }
-
       if (oldShardsDeleted) {
         // refresh the locally cached cluster state
         // we know we have the latest because otherwise deleteshard would have 
failed
         clusterState = zkStateReader.getClusterState();
       }
 
-      // 4. create the child sub-shards in CONSTRUCTION state
+      // 5. Create sub-shards in CONSTRUCTION state.
       String nodeName = parentShardLeader.getNodeName();
-
       t = timings.sub("createSubSlicesAndLeadersInState");
       for (int i = 0; i < subRanges.size(); i++) {
         String subSlice = subSlices.get(i);
@@ -419,7 +426,7 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
             CollectionHandlingUtils.waitForNewShard(
                 collectionName, subSlice, ccc.getZkStateReader());
 
-        // 5. and add the initial replica for each sub-shard
+        // 6. Add an initial replica to each sub-shard.
         log.debug(
             "Adding first replica {} as part of slice {} of collection {} on 
{}",
             subShardName,
@@ -459,7 +466,7 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
       }
       t.stop();
 
-      // 6. request that parent shard wait for children to become active
+      // 7. Request that parent shard wait for children to become ACTIVE.
       t = timings.sub("waitForSubSliceLeadersAlive");
       {
         final ShardRequestTracker shardRequestTracker =
@@ -489,12 +496,6 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
       }
       t.stop();
 
-      log.debug(
-          "Successfully created all sub-shards for collection {} parent shard: 
{} on: {}",
-          collectionName,
-          slice,
-          parentShardLeader);
-
       if (log.isInfoEnabled()) {
         log.info(
             "Splitting shard {} as part of slice {} of collection {} on {}",
@@ -504,7 +505,8 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
             parentShardLeader);
       }
 
-      // 7. execute actual split
+      // 8. Execute split: either LINK or REWRITE.
+      t = timings.sub("splitParentCore");
       ModifiableSolrParams params = new ModifiableSolrParams();
       params.set(CoreAdminParams.ACTION, 
CoreAdminParams.CoreAdminAction.SPLIT.toString());
       params.set(CommonAdminParams.SPLIT_METHOD, splitMethod.toLower());
@@ -514,8 +516,6 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
         params.add(CoreAdminParams.TARGET_CORE, subShardName);
       }
       params.set(CoreAdminParams.RANGES, rangesStr);
-
-      t = timings.sub("splitParentCore");
       {
         final ShardRequestTracker shardRequestTracker =
             CollectionHandlingUtils.asyncRequestTracker(asyncId, ccc);
@@ -526,12 +526,11 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
         handleFailureOnAsyncRequest(results, msgOnError);
       }
       t.stop();
-
       if (log.isDebugEnabled()) {
         log.debug("Index on shard: {} split into {} successfully", nodeName, 
subShardNames.size());
       }
 
-      // 8. apply buffered updates on sub-shards
+      // 9. Apply buffered updates to the sub-shards, so they are up-to-date 
with parent.
       t = timings.sub("applyBufferedUpdates");
       {
         final ShardRequestTracker shardRequestTracker =
@@ -557,9 +556,9 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
         handleFailureOnAsyncRequest(results, msgOnError);
       }
       t.stop();
-
       log.debug("Successfully applied buffered updates on : {}", 
subShardNames);
 
+      // 10. Determine node placement for additional replicas (but do not 
create yet).
       // TODO: change this to handle sharding a slice into > 2 sub-shards.
 
       // we have already created one subReplica for each subShard on the 
parent node.
@@ -737,8 +736,8 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
       // this ensures that the logic inside ReplicaMutator to update sub-shard 
state to 'active'
       // always gets a chance to execute. See SOLR-7673
 
-      // 10. if replicationFactor > 1, set shard state for sub-shards to 
RECOVERY; otherwise mark
-      // ACTIVE
+      // 11. If replicationFactor is more than 1, set shard state for 
sub-shards to RECOVERY; else
+      // mark ACTIVE.
       if (repFactor == 1) {
         // A commit is needed so that documents are visible when the sub-shard 
replicas come up
         // (Note: This commit used to be after the state switch, but was 
brought here before the
@@ -802,14 +801,12 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
             });
       }
 
+      // 12. Create additional replicas of sub-shards.
       t = timings.sub("createCoresForReplicas");
-      // 11. now actually create replica cores on sub shard nodes
       for (Map<String, Object> replica : replicas) {
         new AddReplicaCmd(ccc).addReplica(clusterState, new 
ZkNodeProps(replica), results, null);
       }
-
       assert TestInjection.injectSplitFailureAfterReplicaCreation();
-
       {
         final ShardRequestTracker syncRequestTracker =
             CollectionHandlingUtils.syncRequestTracker(ccc);
@@ -818,9 +815,45 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
         handleFailureOnAsyncRequest(results, msgOnError);
       }
       t.stop();
-
       log.info("Successfully created all replica shards for all sub-slices 
{}", subSlices);
 
+      // 13. If setPreferredLeaders param is true, set the preferred leader 
property on one replica
+      // of each sub-shard. Distribute preferred leaders evenly among the 
nodes.
+      if (setPreferredLeaders && repFactor > 1) {
+        t = timings.sub("setPreferredLeaders");
+        log.info("Setting the preferred leaders");
+        clusterState = zkStateReader.getClusterState();
+        collection = clusterState.getCollection(collectionName);
+
+        // Keep the leader on the current node for the first sub-shard.
+        Map<String, Integer> numLeadersPerNode = new HashMap<>();
+        {
+          String subSliceName = subSlices.get(0);
+          Replica replica = collection.getSlice(subSliceName).getLeader();
+          setPreferredLeaderProp(collectionName, subSliceName, 
replica.getName());
+          numLeadersPerNode.put(replica.getNodeName(), 1);
+        }
+
+        // Distribute the preferred leaders for the other sub-shards evenly 
among the nodes.
+        for (String subSliceName : subSlices.subList(1, subSlices.size())) {
+          Slice subSlice = collection.getSlice(subSliceName);
+          Replica selectedReplica = null;
+          int minNumLeaders = Integer.MAX_VALUE;
+          for (Replica replica : subSlice.getReplicas()) {
+            int numLeaders = 
numLeadersPerNode.getOrDefault(replica.getNodeName(), 0);
+            if (numLeaders < minNumLeaders) {
+              selectedReplica = replica;
+              minNumLeaders = numLeaders;
+            }
+          }
+          assert selectedReplica != null;
+          setPreferredLeaderProp(collectionName, subSliceName, 
selectedReplica.getName());
+          numLeadersPerNode.compute(
+              selectedReplica.getNodeName(), (__, n) -> n == null ? 1 : n + 1);
+        }
+        t.stop();
+      }
+
       // The final commit was added in SOLR-4997 so that documents are visible
       // when the sub-shard replicas come up
       if (repFactor > 1) {
@@ -832,6 +865,9 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
       if (withTiming) {
         results.add(CommonParams.TIMING, timings.asNamedList());
       }
+      if (log.isInfoEnabled()) {
+        log.info("Timings for split operations: {}", timings.asNamedList());
+      }
       success = true;
       // don't unlock the shard yet - only do this if the final switch-over in 
ReplicaMutator
       // succeeds (or fails)
@@ -854,6 +890,15 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
     }
   }
 
+  private void setPreferredLeaderProp(String collectionName, String shardName, 
String replicaName)
+      throws IOException {
+    log.info("Setting replica {} as the preferred leader of shard {}", 
replicaName, shardName);
+    CollectionAdminRequest.AddReplicaProp addProp =
+        CollectionAdminRequest.addReplicaProperty(
+            collectionName, shardName, replicaName, "preferredleader", "true");
+    ccc.getSolrCloudManager().request(addProp);
+  }
+
   /**
    * In case of async requests, the ShardRequestTracker's processResponses() 
does not abort on
    * failure (as it should). Handling this here temporarily for now.
@@ -1227,6 +1272,7 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
 
   public static boolean lockForSplit(SolrCloudManager cloudManager, String 
collection, String shard)
       throws Exception {
+    log.debug("Getting lock for shard {} split", shard);
     String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/" + 
shard + "-splitting";
     final DistribStateManager stateManager = 
cloudManager.getDistribStateManager();
     synchronized (stateManager) {
@@ -1250,12 +1296,14 @@ public class SplitShardCmd implements 
CollApiCmds.CollectionApiCommand {
                 + shard,
             e);
       }
+      log.debug("Obtained lock for shard {} split", shard);
       return true;
     }
   }
 
   public static void unlockForSplit(SolrCloudManager cloudManager, String 
collection, String shard)
       throws Exception {
+    log.debug("Releasing lock for shard {} split", shard);
     if (shard != null) {
       String path =
           ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/" + shard + 
"-splitting";
diff --git 
a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java 
b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 102077b1bc3..750a43b985a 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -106,6 +106,7 @@ import static 
org.apache.solr.common.params.CommonAdminParams.NUM_SUB_SHARDS;
 import static org.apache.solr.common.params.CommonAdminParams.SPLIT_BY_PREFIX;
 import static org.apache.solr.common.params.CommonAdminParams.SPLIT_FUZZ;
 import static org.apache.solr.common.params.CommonAdminParams.SPLIT_METHOD;
+import static 
org.apache.solr.common.params.CommonAdminParams.SPLIT_SET_PREFERRED_LEADERS;
 import static 
org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
 import static org.apache.solr.common.params.CommonParams.NAME;
 import static org.apache.solr.common.params.CommonParams.TIMING;
@@ -133,6 +134,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -244,6 +246,13 @@ import org.slf4j.LoggerFactory;
 public class CollectionsHandler extends RequestHandlerBase implements 
PermissionNameProvider {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  /**
+   * Boolean system property to automatically set the preferred leaders at 
collection creation and
+   * during shard split. Default false. Otherwise, the caller needs to set it 
for each {@link
+   * CollectionAdminRequest#splitShard(String) SplitShard} request.
+   */
+  public static final String AUTO_PREFERRED_LEADERS = 
"solr.autoPreferredLeaders";
+
   protected final CoreContainer coreContainer;
   private final Optional<DistributedCollectionConfigSetCommandRunner>
       distributedCollectionConfigSetCommandRunner;
@@ -306,7 +315,7 @@ public class CollectionsHandler extends RequestHandlerBase 
implements Permission
   @Override
   public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) 
throws Exception {
     // Make sure the cores is enabled
-    CoreContainer cores = checkErrors();
+    checkCoreContainer();
 
     // Pick the action
     SolrParams params = req.getParams();
@@ -329,24 +338,24 @@ public class CollectionsHandler extends 
RequestHandlerBase implements Permission
     }
 
     CollectionOperation operation = CollectionOperation.get(action);
-    invokeAction(req, rsp, cores, action, operation);
+    for (CollectionOperation op : operation.getCombinedOps(req, this)) {
+      invokeOperation(req, rsp, op);
+      if (rsp.getException() != null) {
+        log.warn("Operation {} failed with exception, skipping subsequent 
operations", op);
+        break;
+      }
+    }
     rsp.setHttpCaching(false);
   }
 
-  protected CoreContainer checkErrors() {
-    CoreContainer cores = getCoreContainer();
-    AdminAPIBase.validateZooKeeperAwareCoreContainer(cores);
-    return cores;
+  protected void checkCoreContainer() {
+    AdminAPIBase.validateZooKeeperAwareCoreContainer(getCoreContainer());
   }
 
   @SuppressWarnings({"unchecked"})
-  void invokeAction(
-      SolrQueryRequest req,
-      SolrQueryResponse rsp,
-      CoreContainer cores,
-      CollectionAction action,
-      CollectionOperation operation)
+  void invokeOperation(SolrQueryRequest req, SolrQueryResponse rsp, 
CollectionOperation operation)
       throws Exception {
+    log.debug("Invoking {}", operation);
     if (!coreContainer.isZooKeeperAware()) {
       throw new SolrException(
           BAD_REQUEST, "Invalid request. collections can be accessed only in 
SolrCloud mode");
@@ -377,10 +386,10 @@ public class CollectionsHandler extends 
RequestHandlerBase implements Permission
     // Even if Overseer does wait for the collection to be created, it sees a 
different cluster
     // state than this node, so this wait is required to make sure the local 
node Zookeeper watches
     // fired and now see the collection.
-    if (action.equals(CollectionAction.CREATE) && asyncId == null) {
-      if (rsp.getException() == null) {
-        waitForActiveCollection(zkProps.getStr(NAME), cores, overseerResponse);
-      }
+    if (operation.equals(CollectionOperation.CREATE_OP)
+        && asyncId == null
+        && rsp.getException() == null) {
+      waitForActiveCollection(zkProps.getStr(NAME), getCoreContainer(), 
overseerResponse);
     }
   }
 
@@ -886,7 +895,6 @@ public class CollectionsHandler extends RequestHandlerBase 
implements Permission
         SPLITSHARD,
         DEFAULT_COLLECTION_OP_TIMEOUT * 5,
         (req, rsp, h) -> {
-          String name = req.getParams().required().get(COLLECTION_PROP);
           // TODO : add support for multiple shards
           String shard = req.getParams().get(SHARD_ID_PROP);
           String rangesStr = req.getParams().get(CoreAdminParams.RANGES);
@@ -932,8 +940,75 @@ public class CollectionsHandler extends RequestHandlerBase 
implements Permission
                   SPLIT_FUZZ,
                   SPLIT_BY_PREFIX,
                   FOLLOW_ALIASES,
-                  CREATE_NODE_SET_PARAM);
+                  CREATE_NODE_SET_PARAM,
+                  SPLIT_SET_PREFERRED_LEADERS);
           return copyPropertiesWithPrefix(req.getParams(), map, 
PROPERTY_PREFIX);
+        },
+        SplitShardHelper.OP_COMBINER),
+    /**
+     * Waits for a shard split to complete. Waits until the shard state is 
switched to INACTIVE (in
+     * ReplicaMutator.checkAndCompleteShardSplit). At the same time, the 
sub-shards states become
+     * ACTIVE.
+     */
+    WAIT_FOR_SHARD_SPLIT_OP(
+        null,
+        (req, rsp, h) -> {
+          String collectionName = req.getParams().get(COLLECTION_PROP);
+          String shardName = req.getParams().get(SHARD_ID_PROP);
+          log.info("Waiting for shard {} split to complete", shardName);
+          long startTime = System.nanoTime();
+          h.coreContainer
+              .getZkController()
+              .getZkStateReader()
+              .waitForState(
+                  collectionName,
+                  1,
+                  TimeUnit.HOURS,
+                  collection -> {
+                    Slice splitSlice = collection.getSlice(shardName);
+                    boolean splitComplete =
+                        splitSlice == null || 
splitSlice.getState().equals(Slice.State.INACTIVE);
+                    if (splitComplete) {
+                      if (log.isInfoEnabled()) {
+                        log.info(
+                            "Shard {} split completed in {} ms",
+                            shardName,
+                            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTime));
+                      }
+                    }
+                    return splitComplete;
+                  });
+          return null;
+        }),
+    /** Waits for the shard preferred leader to become the leader. */
+    WAIT_FOR_PREFERRED_LEADER_OP(
+        null,
+        (req, rsp, h) -> {
+          String collectionName = req.getParams().get(COLLECTION_PROP);
+          String shardName = req.getParams().get(SHARD_ID_PROP);
+          log.info("Waiting for shard {} preferred leader to become the 
leader", shardName);
+          long startTime = System.nanoTime();
+          h.coreContainer
+              .getZkController()
+              .getZkStateReader()
+              .waitForState(
+                  collectionName,
+                  5,
+                  TimeUnit.MINUTES,
+                  collection -> {
+                    boolean isLeader =
+                        
SplitShardHelper.isPreferredLeaderCurrentLeader(collection, shardName, h);
+                    if (isLeader) {
+                      if (log.isInfoEnabled()) {
+                        log.info(
+                            "Shard {} preferred leader is leader in {} ms",
+                            shardName,
+                            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTime));
+                      }
+                    }
+                    return isLeader;
+                  });
+          return null;
         }),
     DELETESHARD_OP(
         DELETESHARD,
@@ -1860,22 +1935,33 @@ public class CollectionsHandler extends 
RequestHandlerBase implements Permission
     public final CollectionOp fun;
     final CollectionAction action;
     final long timeOut;
+    final CollectionOpCombiner opCombiner;
 
     CollectionOperation(CollectionAction action, CollectionOp fun) {
-      this(action, DEFAULT_COLLECTION_OP_TIMEOUT, fun);
+      this(action, DEFAULT_COLLECTION_OP_TIMEOUT, fun, 
CollectionOpCombiner.SINGLE_OP);
     }
 
-    CollectionOperation(CollectionAction action, long timeOut, CollectionOp 
fun) {
+    CollectionOperation(
+        CollectionAction action, long timeOut, CollectionOp fun, 
CollectionOpCombiner opCombiner) {
       this.action = action;
       this.timeOut = timeOut;
       this.fun = fun;
+      this.opCombiner = opCombiner;
+      if (action != null) {
+        OperationMap.map.put(action, this);
+      }
     }
 
     public static CollectionOperation get(CollectionAction action) {
-      for (CollectionOperation op : values()) {
-        if (op.action == action) return op;
+      CollectionOperation op = OperationMap.map.get(action);
+      if (op == null) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "No such action " + 
action);
       }
-      throw new SolrException(ErrorCode.SERVER_ERROR, "No such action " + 
action);
+      return op;
+    }
+
+    List<CollectionOperation> getCombinedOps(SolrQueryRequest req, 
CollectionsHandler h) {
+      return opCombiner.getCombinedOps(this, req, h);
     }
 
     @Override
@@ -1883,6 +1969,58 @@ public class CollectionsHandler extends 
RequestHandlerBase implements Permission
         SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) 
throws Exception {
       return fun.execute(req, rsp, h);
     }
+
+    private static class OperationMap {
+      static final Map<CollectionAction, CollectionOperation> map =
+          new EnumMap<>(CollectionAction.class);
+    }
+
+    private static class SplitShardHelper {
+      static final CollectionOpCombiner OP_COMBINER =
+          (op, req, h) -> {
+            if (!req.getParams()
+                .getBool(SPLIT_SET_PREFERRED_LEADERS, 
Boolean.getBoolean(AUTO_PREFERRED_LEADERS))) {
+              return Collections.singletonList(op);
+            }
+            // The split.setPreferredLeader prop is true.
+            List<CollectionOperation> opSequence = new ArrayList<>();
+            String collectionName = req.getParams().get(COLLECTION_PROP);
+            String shardName = req.getParams().get(SHARD_ID_PROP);
+            DocCollection collection =
+                h.coreContainer
+                    .getZkController()
+                    .getZkStateReader()
+                    .getClusterState()
+                    .getCollection(collectionName);
+            // Ensure we split a preferred leader to help cluster balancing.
+            if (!isPreferredLeaderCurrentLeader(collection, shardName, h)) {
+              // A replica of the shard is defined as preferred leader, but is 
not the current
+              // leader yet.
+              opSequence.add(REBALANCELEADERS_OP); // Rebalance the leader on 
shard.
+              opSequence.add(WAIT_FOR_PREFERRED_LEADER_OP); // Wait for the 
rebalancing completion.
+            }
+            opSequence.add(op); // Split the shard and set the sub-shards 
preferred leaders.
+            opSequence.add(WAIT_FOR_SHARD_SPLIT_OP); // Wait for the shard 
split completion.
+            opSequence.add(REBALANCELEADERS_OP); // Rebalance the leaders on 
the sub-shards.
+            return opSequence;
+          };
+
+      static boolean isPreferredLeaderCurrentLeader(
+          DocCollection collection, String shardName, CollectionsHandler h) {
+        Slice slice = collection.getSlice(shardName);
+        if (slice == null) {
+          throw new SolrException(
+              ErrorCode.BAD_REQUEST, "Shard '" + shardName + "' does not 
exist, no action taken.");
+        }
+        for (Replica replica : slice.getReplicas()) {
+          if (replica.getBool(PROPERTY_PREFIX + "preferredleader", false)) {
+            return replica.equals(slice.getLeader());
+          }
+        }
+        // No replicas have the preferred leader property.
+        return true;
+      }
+    }
   }
 
   private static void forceLeaderElection(SolrQueryRequest req, 
CollectionsHandler handler) {
@@ -2069,6 +2207,14 @@ public class CollectionsHandler extends 
RequestHandlerBase implements Permission
         throws Exception;
   }
 
+  interface CollectionOpCombiner {
+
+    CollectionOpCombiner SINGLE_OP = (op, req, h) -> 
Collections.singletonList(op);
+
+    List<CollectionOperation> getCombinedOps(
+        CollectionOperation op, SolrQueryRequest req, CollectionsHandler h);
+  }
+
   @Override
   public Boolean registerV2() {
     return Boolean.TRUE;
diff --git 
a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java 
b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
index 42aa03c54ea..f354ed18589 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
@@ -127,6 +127,9 @@ class RebalanceLeaders {
 
   void execute() throws KeeperException, InterruptedException {
     DocCollection dc = checkParams();
+    if (log.isInfoEnabled()) {
+      log.info("Rebalancing leaders for collection {}", dc.getName());
+    }
 
     int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE);
     if (max <= 0) max = Integer.MAX_VALUE;
@@ -139,7 +142,9 @@ class RebalanceLeaders {
     for (Slice slice : dc.getSlices()) {
       ensurePreferredIsLeader(slice);
       if (asyncRequests.size() == max) {
-        log.info("Queued {} leader reassignments, waiting for some to 
complete.", max);
+        log.info(
+            "Max number '{}' of election queue movements requested, waiting 
for some to complete.",
+            max);
         keepGoing = waitAsyncRequests(maxWaitSecs, false);
         if (keepGoing == false) {
           break; // If we've waited longer than specified, don't continue to 
wait!
@@ -147,13 +152,14 @@ class RebalanceLeaders {
       }
     }
     if (keepGoing == true) {
+      log.info("Waiting for all election queue movements to complete");
       keepGoing = waitAsyncRequests(maxWaitSecs, true);
     }
     if (keepGoing == true) {
-      log.info("All leader reassignments completed.");
+      log.info("All election queue movements completed.");
     } else {
       log.warn(
-          "Exceeded specified timeout of '{}' all leaders may not have been 
reassigned'",
+          "Exceeded specified timeout of '{}' while waiting for election queue 
movements to complete",
           maxWaitSecs);
     }
 
@@ -197,6 +203,8 @@ class RebalanceLeaders {
   // Once we've done all the fiddling with the queues, check on the way out to 
see if all the active
   // preferred leaders that we intended to change are in fact the leaders.
   private void checkLeaderStatus() throws InterruptedException, 
KeeperException {
+    log.info("Waiting for all leader reassignments to complete");
+    long startTime = System.nanoTime();
     for (int idx = 0; pendingOps.size() > 0 && idx < 600; ++idx) {
       ClusterState clusterState = 
coreContainer.getZkController().getClusterState();
       Set<String> liveNodes = clusterState.getLiveNodes();
@@ -210,6 +218,12 @@ class RebalanceLeaders {
                 // Record for return that the leader changed successfully
                 pendingOps.remove(slice.getName());
                 addToSuccesses(slice, replica);
+                if (log.isDebugEnabled()) {
+                  log.debug(
+                      "Preferred leader replica {} is the leader of shard {}",
+                      replica.getName(),
+                      slice.getName());
+                }
                 break;
               }
             }
@@ -220,6 +234,13 @@ class RebalanceLeaders {
       
coreContainer.getZkController().getZkStateReader().forciblyRefreshAllClusterStateSlow();
     }
     addAnyFailures();
+    if (log.isInfoEnabled()) {
+      log.info(
+          "Finished waiting {} ms for leader reassignments with {} pending ops 
{}",
+          TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime),
+          pendingOps.size(),
+          pendingOps);
+    }
   }
 
   // The process is:
@@ -232,25 +253,47 @@ class RebalanceLeaders {
   // node in the leader election queue is removed and the only remaining node 
watching it is
   // triggered to become leader.
   private void ensurePreferredIsLeader(Slice slice) throws KeeperException, 
InterruptedException {
+    if (log.isInfoEnabled()) {
+      log.info("Ensuring preferred leader is leader for shard {}", 
slice.getName());
+    }
     for (Replica replica : slice.getReplicas()) {
       // Tell the replica to become the leader if we're the preferred leader 
AND active AND not the
       // leader already
       if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) 
{
+        if (log.isDebugEnabled()) {
+          log.debug(
+              "Replica {} of shard {} is not preferred leader", 
replica.getName(), slice.getName());
+        }
         continue;
       }
       // OK, we are the preferred leader, are we the actual leader?
       if (replica.getBool(LEADER_PROP, false)) {
         // We're a preferred leader, but we're _also_ the leader, don't need 
to do anything.
         addAlreadyLeaderToResults(slice, replica);
+        if (log.isDebugEnabled()) {
+          log.debug(
+              "Replica {} of shard {} is preferred leader and already leader",
+              replica.getName(),
+              slice.getName());
+        }
         return; // already the leader, do nothing.
       }
       ZkStateReader zkStateReader = 
coreContainer.getZkController().getZkStateReader();
       // We're the preferred leader, but someone else is leader. Only become 
leader if we're active.
       if (replica.isActive(zkStateReader.getClusterState().getLiveNodes()) == 
false) {
         addInactiveToResults(slice, replica);
+        if (log.isDebugEnabled()) {
+          log.debug(
+              "Replica {} of shard {} is preferred leader but not active",
+              replica.getName(),
+              slice.getName());
+        }
         return; // Don't try to become the leader if we're not active!
       }
 
+      if (log.isDebugEnabled()) {
+        log.debug("Getting the sorted election nodes for shard {}", 
slice.getName());
+      }
       List<String> electionNodes =
           OverseerTaskProcessor.getSortedElectionNodes(
               zkStateReader.getZkClient(),
@@ -273,6 +316,12 @@ class RebalanceLeaders {
       String firstWatcher = electionNodes.get(1);
 
       if (LeaderElector.getNodeName(firstWatcher).equals(replica.getName()) == 
false) {
+        if (log.isDebugEnabled()) {
+          log.debug(
+              "Re-enqueue replica {} to become the first watcher in election 
for shard {}",
+              replica.getName(),
+              slice.getName());
+        }
         makeReplicaFirstWatcher(slice, replica);
       }
 
@@ -280,10 +329,31 @@ class RebalanceLeaders {
       // to check at the end
       pendingOps.put(slice.getName(), replica.getName());
       String leaderElectionNode = electionNodes.get(0);
-      String coreName =
-          
slice.getReplica(LeaderElector.getNodeName(leaderElectionNode)).getStr(CORE_NAME_PROP);
+      Replica leaderReplica = 
slice.getReplica(LeaderElector.getNodeName(leaderElectionNode));
+      String coreName = leaderReplica.getStr(CORE_NAME_PROP);
+      if (log.isDebugEnabled()) {
+        log.debug(
+            "Move replica {} node {} core {} at the end of the election queue 
for shard {}",
+            leaderReplica.getName(),
+            leaderElectionNode,
+            coreName,
+            slice.getName());
+      }
       rejoinElectionQueue(slice, leaderElectionNode, coreName, false);
+      if (log.isDebugEnabled()) {
+        log.debug(
+            "Waiting for replica {} node {} change in the election queue for 
shard {}",
+            leaderReplica.getName(),
+            leaderElectionNode,
+            slice.getName());
+      }
       waitForNodeChange(slice, leaderElectionNode);
+      if (log.isDebugEnabled()) {
+        log.debug(
+            "Preferred leader {} is now the first in line for leader election 
for shard {}",
+            replica.getName(),
+            slice.getName());
+      }
 
       return; // Done with this slice, skip the rest of the replicas.
     }
@@ -419,6 +489,13 @@ class RebalanceLeaders {
   // that any requeueing we've done has happened.
   int waitForNodeChange(Slice slice, String electionNode)
       throws InterruptedException, KeeperException {
+    if (log.isDebugEnabled()) {
+      log.debug(
+          "Waiting for node {} to rejoin the election queue for shard {}",
+          electionNode,
+          slice.getName());
+    }
+    long startTime = System.nanoTime();
     String nodeName = LeaderElector.getNodeName(electionNode);
     int oldSeq = LeaderElector.getSeq(electionNode);
     for (int idx = 0; idx < 600; ++idx) {
@@ -437,6 +514,9 @@ class RebalanceLeaders {
       TimeUnit.MILLISECONDS.sleep(100);
       zkStateReader.forciblyRefreshAllClusterStateSlow();
     }
+    log.warn(
+        "Timeout waiting for node change after {} ms",
+        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
     return -1;
   }
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java 
b/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
index dd20903deed..ddcd5c57d55 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
@@ -17,22 +17,23 @@
 
 package org.apache.solr.cloud;
 
+import static 
org.apache.solr.handler.admin.CollectionsHandler.AUTO_PREFERRED_LEADERS;
 import static org.hamcrest.core.StringContains.containsString;
 
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -44,12 +45,11 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.CollectionAdminParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.embedded.JettySolrRunner;
 import org.hamcrest.MatcherAssert;
 import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,27 +59,51 @@ public class SplitShardTest extends SolrCloudTestCase {
 
   private final String COLLECTION_NAME = "splitshardtest-collection";
 
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    System.setProperty("metricsEnabled", "true");
-    configureCluster(1).addConfig("conf", 
configset("cloud-minimal")).configure();
-  }
-
-  @Before
-  @Override
-  public void setUp() throws Exception {
-    super.setUp();
-  }
-
   @After
   @Override
   public void tearDown() throws Exception {
     super.tearDown();
     cluster.deleteAllCollections();
+    shutdownCluster();
   }
 
   @Test
-  public void doTest() throws IOException, SolrServerException {
+  public void testSplitOneHostTwoSubShardsTwoReplicas() throws Exception {
+    setupCluster(1);
+    innerTestSplitTwoSubShardsTwoReplicas();
+  }
+
+  @Test
+  public void testSplitTwoHostsTwoSubShardsTwoReplicas() throws Exception {
+    setupCluster(2);
+    innerTestSplitTwoSubShardsTwoReplicas();
+  }
+
+  @Test
+  public void testSplitThreeHostsTwoSubShardsTwoReplicas() throws Exception {
+    setupCluster(3);
+    innerTestSplitTwoSubShardsTwoReplicas();
+  }
+
+  private void innerTestSplitTwoSubShardsTwoReplicas() throws Exception {
+    CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 1, 2)
+        .process(cluster.getSolrClient());
+
+    cluster.waitForActiveCollection(COLLECTION_NAME, 1, 2);
+
+    CollectionAdminRequest.SplitShard splitShard =
+        CollectionAdminRequest.splitShard(COLLECTION_NAME)
+            .setNumSubShards(2)
+            .setShardName("shard1");
+    splitShard.process(cluster.getSolrClient());
+    waitForState(
+        "Timed out waiting for sub shards to be active", COLLECTION_NAME, 
activeClusterShape(2, 6));
+  }
+
+  @Test
+  public void testSplitOneHostFiveSubShardsOneReplica() throws Exception {
+    setupCluster(1);
+
     CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 1)
         .process(cluster.getSolrClient());
 
@@ -130,7 +154,8 @@ public class SplitShardTest extends SolrCloudTestCase {
   }
 
   @Test
-  public void multipleOptionsSplitTest() {
+  public void multipleOptionsSplitTest() throws Exception {
+    setupCluster(1);
     CollectionAdminRequest.SplitShard splitShard =
         CollectionAdminRequest.splitShard(COLLECTION_NAME)
             .setNumSubShards(5)
@@ -145,6 +170,7 @@ public class SplitShardTest extends SolrCloudTestCase {
 
   @Test
   public void testSplitFuzz() throws Exception {
+    setupCluster(1);
     String collectionName = "splitFuzzCollection";
     CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1)
         .process(cluster.getSolrClient());
@@ -176,7 +202,11 @@ public class SplitShardTest extends SolrCloudTestCase {
     assertEquals("wrong range in s1_1", expected1, delta1);
   }
 
-  CloudSolrClient createCollection(String collectionName, int repFactor) 
throws Exception {
+  private void setupCluster(int nodeCount) throws Exception {
+    configureCluster(nodeCount).addConfig("conf", 
configset("cloud-minimal")).configure();
+  }
+
+  private CloudSolrClient createCollection(String collectionName, int 
repFactor) throws Exception {
 
     CollectionAdminRequest.createCollection(collectionName, "conf", 1, 
repFactor)
         .process(cluster.getSolrClient());
@@ -188,7 +218,7 @@ public class SplitShardTest extends SolrCloudTestCase {
     return client;
   }
 
-  long getNumDocs(CloudSolrClient client) throws Exception {
+  private long getNumDocs(CloudSolrClient client) throws Exception {
     String collectionName = client.getDefaultCollection();
     DocCollection collection = 
client.getClusterState().getCollection(collectionName);
     Collection<Slice> slices = collection.getSlices();
@@ -225,13 +255,35 @@ public class SplitShardTest extends SolrCloudTestCase {
     return totCount;
   }
 
-  void doLiveSplitShard(String collectionName, int repFactor, int nThreads) 
throws Exception {
+  @Test
+  public void testConcurrentSplitOneHostRepFactorOne() throws Exception {
+    setupCluster(1);
+    // Debugging tips: if this fails, it may be easier to debug by lowering 
the number fo threads to
+    // 1 and looping the test until you get another failure.
+    // You may need to further instrument things like 
DistributedZkUpdateProcessor to display the
+    // cluster state for the collection, etc. Using more threads increases the 
chance to hit a
+    // concurrency bug, but too many threads can overwhelm single-threaded 
buffering replay after
+    // the low level index split and result in subShard leaders that can't 
catch up and become
+    // active (a known issue that still needs to be resolved.)
+    splitWithConcurrentUpdates("livesplit-1-1", 1, 4, false);
+  }
+
+  @Test
+  public void testConcurrentSplitThreeHostsRepFactorTwo() throws Exception {
+    setupCluster(3);
+    splitWithConcurrentUpdates("livesplit-3-2", 2, 4, true);
+  }
+
+  private void splitWithConcurrentUpdates(
+      String collectionName, int repFactor, int nThreads, boolean 
setPreferredLeaders)
+      throws Exception {
     final CloudSolrClient client = createCollection(collectionName, repFactor);
 
     final ConcurrentHashMap<String, Long> model =
         new ConcurrentHashMap<>(); // what the index should contain
     final AtomicBoolean doIndex = new AtomicBoolean(true);
-    final AtomicInteger docsIndexed = new AtomicInteger();
+    final AtomicInteger numDocsAdded = new AtomicInteger();
+    final AtomicInteger numDocsDeleted = new AtomicInteger();
     Thread[] indexThreads = new Thread[nThreads];
     try {
 
@@ -239,23 +291,38 @@ public class SplitShardTest extends SolrCloudTestCase {
         indexThreads[i] =
             new Thread(
                 () -> {
+                  Random random = random();
+                  List<Integer> threadDocIndexes = new ArrayList<>();
                   while (doIndex.get()) {
                     try {
                       // Thread.sleep(10);  // cap indexing rate at 100 docs 
per second, per thread
-                      int currDoc = docsIndexed.incrementAndGet();
+                      int currDoc = numDocsAdded.incrementAndGet();
                       String docId = "doc_" + currDoc;
 
                       // Try all docs in the same update request
-                      UpdateRequest updateReq = new UpdateRequest();
-                      updateReq.add(sdoc("id", docId));
+                      UpdateRequest addReq = new UpdateRequest();
+                      addReq.add(sdoc("id", docId));
                       // UpdateResponse ursp = updateReq.commit(client, 
collectionName);
                       // uncomment this if you want a commit each time
-                      UpdateResponse ursp = updateReq.process(client, 
collectionName);
+                      UpdateResponse ursp = addReq.process(client, 
collectionName);
                       assertEquals(0, ursp.getStatus()); // for now, don't 
accept any failures
                       if (ursp.getStatus() == 0) {
                         // in the future, keep track of a version per document 
and reuse ids to keep
                         // index from growing too large
                         model.put(docId, 1L);
+                        threadDocIndexes.add(currDoc);
+                      }
+
+                      if (currDoc % 20 == 0) {
+                        int docIndex =
+                            
threadDocIndexes.remove(random.nextInt(threadDocIndexes.size()));
+                        docId = "doc_" + docIndex;
+                        UpdateRequest deleteReq = new UpdateRequest();
+                        deleteReq.deleteById(docId);
+                        ursp = deleteReq.process(client, collectionName);
+                        assertEquals(0, ursp.getStatus()); // for now, don't 
accept any failures
+                        model.remove(docId);
+                        numDocsDeleted.incrementAndGet();
                       }
                     } catch (Exception e) {
                       fail(e.getMessage());
@@ -274,13 +341,22 @@ public class SplitShardTest extends SolrCloudTestCase {
 
       CollectionAdminRequest.SplitShard splitShard =
           
CollectionAdminRequest.splitShard(collectionName).setShardName("shard1");
-      splitShard.process(client);
-      waitForState(
-          "Timed out waiting for sub shards to be active.",
-          collectionName,
-          activeClusterShape(
-              2,
-              3 * repFactor)); // 2 repFactor for the new split shards, 1 
repFactor for old replicas
+      // Set the preferred leaders param either with a request param, or with 
a system property.
+      if (random().nextBoolean()) {
+        splitShard.shouldSetPreferredLeaders(setPreferredLeaders);
+      } else {
+        System.setProperty(AUTO_PREFERRED_LEADERS, 
Boolean.toString(setPreferredLeaders));
+      }
+      try {
+        splitShard.process(client);
+        waitForState(
+            "Timed out waiting for sub shards to be active.",
+            collectionName,
+            // 2 repFactor for the new split shards, 1 repFactor for old 
replicas
+            activeClusterShape(2, 3 * repFactor));
+      } finally {
+        System.clearProperty(AUTO_PREFERRED_LEADERS);
+      }
 
       // make sure that docs were indexed during the split
       assertTrue(model.size() > docCount);
@@ -311,11 +387,47 @@ public class SplitShardTest extends SolrCloudTestCase {
       log.error("MISSING DOCUMENTS: {}", leftover);
     }
 
-    assertEquals("Documents are missing!", docsIndexed.get(), numDocs);
-    log.info("Number of documents indexed and queried : {}", numDocs);
+    assertEquals(
+        "Documents are missing!"
+            + " numDocsAdded="
+            + numDocsAdded.get()
+            + " numDocsDeleted="
+            + numDocsDeleted.get()
+            + " numDocs="
+            + numDocs,
+        numDocsAdded.get() - numDocsDeleted.get(),
+        numDocs);
+    if (log.isInfoEnabled()) {
+      log.info("{} docs added, {} docs deleted", numDocsAdded.get(), 
numDocsDeleted.get());
+    }
+
+    if (setPreferredLeaders) {
+      DocCollection collection =
+          
cluster.getSolrClient().getClusterState().getCollection(collectionName);
+      for (Slice slice : collection.getSlices()) {
+        if (!slice.getState().equals(Slice.State.ACTIVE)) {
+          continue;
+        }
+        boolean preferredLeaderFound = false;
+        for (Replica replica : slice.getReplicas()) {
+          if (replica.getBool(CollectionAdminParams.PROPERTY_PREFIX + 
"preferredleader", false)) {
+            preferredLeaderFound = true;
+            assertEquals(
+                "Replica "
+                    + replica.getName()
+                    + " is the preferred leader but not the leader of shard "
+                    + slice.getName(),
+                slice.getLeader(),
+                replica);
+          }
+        }
+        assertTrue("No preferred leader found for shard " + slice.getName(), 
preferredLeaderFound);
+      }
+    }
   }
 
   public void testShardSplitWithNodeset() throws Exception {
+    setupCluster(1);
     String COLL = "shard_split_nodeset";
 
     CollectionAdminRequest.createCollection(COLL, "conf", 2, 
2).process(cluster.getSolrClient());
@@ -371,16 +483,4 @@ public class SplitShardTest extends SolrCloudTestCase {
 
     return set.isEmpty();
   }
-
-  @Test
-  public void testLiveSplit() throws Exception {
-    // Debugging tips: if this fails, it may be easier to debug by lowering 
the number fo threads to
-    // 1 and looping the test until you get another failure. You may need to 
further instrument
-    // things like DistributedZkUpdateProcessor to display the cluster state 
for the collection,
-    // etc. Using more threads increases the chance to hit a concurrency bug, 
but too many threads
-    // can overwhelm single-threaded buffering replay after the low level 
index split and result in
-    // subShard leaders that can't catch up and become active (a known issue 
that still needs to be
-    // resolved.)
-    doLiveSplitShard("livesplit1", 1, 4);
-  }
 }
diff --git 
a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java 
b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
index e3317c44120..515c387453d 100644
--- 
a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
+++ 
b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
@@ -17,6 +17,7 @@
 package org.apache.solr.cloud.api.collections;
 
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static 
org.apache.solr.common.params.CommonAdminParams.SPLIT_SET_PREFERRED_LEADERS;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
@@ -52,6 +53,7 @@ import org.apache.solr.cloud.BasicDistributedZkTest;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.cloud.StoppableIndexingThread;
 import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.CompositeIdRouter;
 import org.apache.solr.common.cloud.DocCollection;
@@ -711,7 +713,13 @@ public class ShardSplitTest extends BasicDistributedZkTest 
{
         trySplit(collectionName, null, SHARD1, 1);
         fail("expected to fail due to locking but succeeded");
       } catch (Exception e) {
-        log.info("Expected failure: {}", e);
+        // Verify the exception caught.
+        // If the split command sets the preferred leader, that checks that 
the exception is not a
+        // TimeoutException because we don't want the split combined operation 
to wait for the split
+        // completion if the split fails.
+        assertTrue("Unexpected exception " + e, e instanceof SolrException);
+        assertEquals(SolrException.ErrorCode.INVALID_STATE.code, 
((SolrException) e).code());
+        log.info("Expected failure:", e);
       }
 
       // make sure the lock still exists
@@ -1253,6 +1261,9 @@ public class ShardSplitTest extends 
BasicDistributedZkTest {
     if (splitKey != null) {
       params.set("split.key", splitKey);
     }
+    if (random().nextBoolean()) {
+      params.set(SPLIT_SET_PREFERRED_LEADERS, true);
+    }
     QueryRequest request = new QueryRequest(params);
     request.setPath("/admin/collections");
 
diff --git 
a/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java 
b/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java
index e074f0ba40d..dea2b768d08 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java
@@ -36,7 +36,6 @@ import org.apache.solr.api.ApiBag;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.MultiMapSolrParams;
 import org.apache.solr.common.params.SolrParams;
@@ -44,7 +43,6 @@ import org.apache.solr.common.util.CommandOperation;
 import org.apache.solr.common.util.ContentStreamBase;
 import org.apache.solr.common.util.Pair;
 import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.CoreContainer;
 import org.apache.solr.handler.ClusterAPI;
 import org.apache.solr.handler.CollectionsAPI;
 import org.apache.solr.request.LocalSolrQueryRequest;
@@ -310,23 +308,16 @@ public class TestCollectionAPIs extends SolrTestCaseJ4 {
     MockCollectionsHandler() {}
 
     @Override
-    protected CoreContainer checkErrors() {
-      return null;
-    }
+    protected void checkCoreContainer() {}
 
     @Override
     protected void copyFromClusterProp(Map<String, Object> props, String prop) 
{}
 
     @Override
-    void invokeAction(
-        SolrQueryRequest req,
-        SolrQueryResponse rsp,
-        CoreContainer cores,
-        CollectionParams.CollectionAction action,
-        CollectionOperation operation)
+    void invokeOperation(SolrQueryRequest req, SolrQueryResponse rsp, 
CollectionOperation operation)
         throws Exception {
       Map<String, Object> result = null;
-      if (action == CollectionParams.CollectionAction.COLLECTIONPROP) {
+      if (operation.equals(CollectionOperation.COLLECTIONPROP_OP)) {
         // Fake this action, since we don't want to write to ZooKeeper in this 
test
         result = new HashMap<>();
         result.put(NAME, req.getParams().required().get(NAME));
diff --git 
a/solr/solr-ref-guide/modules/deployment-guide/pages/shard-management.adoc 
b/solr/solr-ref-guide/modules/deployment-guide/pages/shard-management.adoc
index d2995f014d5..eb2c35d71af 100644
--- a/solr/solr-ref-guide/modules/deployment-guide/pages/shard-management.adoc
+++ b/solr/solr-ref-guide/modules/deployment-guide/pages/shard-management.adoc
@@ -262,6 +262,18 @@ One simple way to populate `id_prefix` is a copyField in 
the schema:
   </fieldtype>
 ----
 
+`split.setPreferredLeaders`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: `false`
+|===
++
+If `true`, the SPLITSHARD command sets the preferred leader property on one 
replica of each sub-shard,
+automatically when splitting a shard. The preferred leaders are distributed 
evenly among the nodes.
+It is also possible to change the default value of this parameter to `true` by 
setting the system property
+`solr.autoPreferredLeaders` to `true`.
+
 Current implementation details and limitations:
 
 * Prefix size is calculated using number of documents with the prefix.
diff --git 
a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
 
b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 0a41b2c782a..edf39868c06 100644
--- 
a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ 
b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -1567,6 +1567,7 @@ public abstract class CollectionAdminRequest<T extends 
CollectionAdminResponse>
     protected Boolean splitByPrefix;
     protected Integer numSubShards;
     protected Float splitFuzz;
+    protected Boolean setPreferredLeaders;
 
     private Properties properties;
     protected String createNodeSet;
@@ -1649,6 +1650,11 @@ public abstract class CollectionAdminRequest<T extends 
CollectionAdminResponse>
       return this;
     }
 
+    public SplitShard shouldSetPreferredLeaders(Boolean setPreferredLeaders) {
+      this.setPreferredLeaders = setPreferredLeaders;
+      return this;
+    }
+
     @Override
     public SolrParams getParams() {
       ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
@@ -1669,17 +1675,18 @@ public abstract class CollectionAdminRequest<T extends 
CollectionAdminResponse>
       if (splitFuzz != null) {
         params.set(CommonAdminParams.SPLIT_FUZZ, String.valueOf(splitFuzz));
       }
-
       if (splitByPrefix != null) {
         params.set(CommonAdminParams.SPLIT_BY_PREFIX, splitByPrefix);
       }
-
       if (properties != null) {
         addProperties(params, properties);
       }
       if (createNodeSet != null) {
         params.set(CREATE_NODE_SET_PARAM, createNodeSet);
       }
+      if (setPreferredLeaders != null) {
+        params.set(CommonAdminParams.SPLIT_SET_PREFERRED_LEADERS, 
setPreferredLeaders);
+      }
       return params;
     }
   }
diff --git 
a/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java 
b/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java
index 695eaf271cb..e9acc759f75 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java
@@ -36,4 +36,10 @@ public interface CommonAdminParams {
   String TIMEOUT = "timeout";
   /** Inexact shard splitting factor. */
   String SPLIT_FUZZ = "splitFuzz";
+  /**
+   * Boolean param to set the preferred leader property on one replica of each 
sub-shard,
+   * automatically when splitting a shard. The preferred leaders are 
distributed evenly among the
+   * nodes.
+   */
+  String SPLIT_SET_PREFERRED_LEADERS = "split.setPreferredLeaders";
 }

Reply via email to