This is an automated email from the ASF dual-hosted git repository.
broustant pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new 58d34f10fb0 SOLR-16438: Support optional split.setPreferredLeaders
prop in shard split command.
58d34f10fb0 is described below
commit 58d34f10fb0e933d1fe91f681ebee1f7306df041
Author: Bruno Roustant <[email protected]>
AuthorDate: Fri Dec 23 14:30:19 2022 +0100
SOLR-16438: Support optional split.setPreferredLeaders prop in shard split
command.
---
...istributedCollectionConfigSetCommandRunner.java | 12 +-
.../solr/cloud/api/collections/SplitShardCmd.java | 146 ++++++++++------
.../solr/handler/admin/CollectionsHandler.java | 192 +++++++++++++++++---
.../solr/handler/admin/RebalanceLeaders.java | 90 +++++++++-
.../test/org/apache/solr/cloud/SplitShardTest.java | 194 ++++++++++++++++-----
.../solr/cloud/api/collections/ShardSplitTest.java | 13 +-
.../solr/handler/admin/TestCollectionAPIs.java | 15 +-
.../deployment-guide/pages/shard-management.adoc | 12 ++
.../solrj/request/CollectionAdminRequest.java | 11 +-
.../solr/common/params/CommonAdminParams.java | 6 +
10 files changed, 546 insertions(+), 145 deletions(-)
diff --git
a/solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java
b/solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java
index 07fe08f377b..caf001140ad 100644
---
a/solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java
+++
b/solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java
@@ -163,9 +163,9 @@ public class DistributedCollectionConfigSetCommandRunner {
}
/**
- * When {@link
org.apache.solr.handler.admin.CollectionsHandler#invokeAction} does not enqueue
to
- * overseer queue and instead calls this method, this method is expected to
do the equivalent of
- * what Overseer does in {@link
+ * When {@link
org.apache.solr.handler.admin.CollectionsHandler#invokeOperation} does not
enqueue
+ * to overseer queue and instead calls this method, this method is expected
to do the equivalent
+ * of what Overseer does in {@link
* org.apache.solr.cloud.OverseerConfigSetMessageHandler#processMessage}.
*
* <p>The steps leading to that call in the Overseer execution path are (and
the equivalent is
@@ -235,9 +235,9 @@ public class DistributedCollectionConfigSetCommandRunner {
}
/**
- * When {@link
org.apache.solr.handler.admin.CollectionsHandler#invokeAction} does not enqueue
to
- * overseer queue and instead calls this method, this method is expected to
do the equivalent of
- * what Overseer does in {@link
+ * When {@link
org.apache.solr.handler.admin.CollectionsHandler#invokeOperation} does not
enqueue
+ * to overseer queue and instead calls this method, this method is expected
to do the equivalent
+ * of what Overseer does in {@link
*
org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler#processMessage}.
*
* <p>The steps leading to that call in the Overseer execution path are (and
the equivalent is
diff --git
a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index 23473694b59..3b3ab835359 100644
---
a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++
b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -28,7 +28,9 @@ import static
org.apache.solr.common.params.CollectionParams.CollectionAction.CR
import static
org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.NUM_SUB_SHARDS;
+import static
org.apache.solr.handler.admin.CollectionsHandler.AUTO_PREFERRED_LEADERS;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
@@ -46,6 +48,7 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.VersionedData;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.Overseer;
@@ -114,21 +117,22 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
* <ul>
* <li>1. Verify that there is enough disk space to create sub-shards.
* <li>2. If splitByPrefix is true, make request to get prefix ranges.
- * <li>3. If this split was attempted previously and there are lingering
sub-shards, delete
- * them.
- * <li>4. Create sub-shards in CONSTRUCTION state.
- * <li>5. Add an initial replica to each sub-shard.
- * <li>6. Request that parent shard wait for children to become ACTIVE.
- * <li>7. Execute split: either LINK or REWRITE.
- * <li>8. Apply buffered updates to the sub-shards so they are up-to-date
with parent.
- * <li>9. Determine node placement for additional replicas (but do not
create yet).
- * <li>10. If replicationFactor is more than 1, set shard state for
sub-shards to RECOVERY; else
+ * <li>3. Fill the sub-shards ranges.
+ * <li>4. If this split was attempted previously and there are lingering
INACTIVE sub-shards,
+ * delete them.
+ * <li>5. Create sub-shards in CONSTRUCTION state.
+ * <li>6. Add an initial replica to each sub-shard.
+ * <li>7. Request that parent shard wait for children to become ACTIVE.
+ * <li>8. Execute split: either LINK or REWRITE.
+ * <li>9. Apply buffered updates to the sub-shards so they are up-to-date
with parent.
+ * <li>10. Determine node placement for additional replicas (but do not
create yet).
+ * <li>11. If replicationFactor is more than 1, set shard state for
sub-shards to RECOVERY; else
* mark ACTIVE.
- * <li>11. Create additional replicas of sub-shards.
+ * <li>12. Create additional replicas of sub-shards.
+ * <li>13. If setPreferredLeaders param is true, set the preferred leader
property on one
+ * replica of each sub-shard. Distribute preferred leaders evenly
among the nodes.
* </ul>
*
- * <br>
- *
* <p>There is a shard split doc (dev-docs/shard-split/shard-split.adoc) on
how shard split works;
* illustrated with diagrams.
*/
@@ -192,19 +196,25 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Interrupted.");
}
+ boolean setPreferredLeaders =
+ message.getBool(
+ CommonAdminParams.SPLIT_SET_PREFERRED_LEADERS,
+ Boolean.getBoolean(AUTO_PREFERRED_LEADERS));
+
+ // 1. Verify that there is enough disk space to create sub-shards.
RTimerTree t;
- if (ccc.getCoreContainer().getNodeConfig().getMetricsConfig().isEnabled())
{
- // check disk space for shard split
- if
(Boolean.parseBoolean(System.getProperty(SHARDSPLIT_CHECKDISKSPACE_ENABLED,
"true"))) {
- // 1. verify that there is enough space on disk to create sub-shards
+ if (ccc.getCoreContainer().getNodeConfig().getMetricsConfig().isEnabled()
+ &&
Boolean.parseBoolean(System.getProperty(SHARDSPLIT_CHECKDISKSPACE_ENABLED,
"true"))) {
+ if (log.isDebugEnabled()) {
log.debug(
- "SplitShardCmd: verify that there is enough space on disk to
create sub-shards for slice: {}",
+ "Check disk space before splitting shard {} on replica {}",
+ slice.get(),
parentShardLeader);
- t = timings.sub("checkDiskSpace");
- checkDiskSpace(
- collectionName, slice.get(), parentShardLeader, splitMethod,
ccc.getSolrCloudManager());
- t.stop();
}
+ t = timings.sub("checkDiskSpace");
+ checkDiskSpace(
+ collectionName, slice.get(), parentShardLeader, splitMethod,
ccc.getSolrCloudManager());
+ t.stop();
}
// let's record the ephemeralOwner of the parent leader node
@@ -283,8 +293,7 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
ShardHandler shardHandler = ccc.newShardHandler();
- // 2. if split request has splitByPrefix set to true, make request to
SplitOp to get prefix
- // ranges of sub-shards
+ // 2. If splitByPrefix is true, make request to get prefix ranges.
if (message.getBool(CommonAdminParams.SPLIT_BY_PREFIX, false)) {
t = timings.sub("getRanges");
@@ -328,8 +337,8 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
t.stop();
}
+ // 3. Fill the sub-shards ranges.
t = timings.sub("fillRanges");
-
String rangesStr =
fillRanges(
ccc.getSolrCloudManager(),
@@ -342,8 +351,8 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
firstNrtReplica);
t.stop();
- // 3. if this shard has attempted a split before and failed, there will
be lingering INACTIVE
- // sub-shards. Clean these up before proceeding
+ // 4. If this split was attempted previously and there are lingering
INACTIVE sub-shards,
+ // delete them.
boolean oldShardsDeleted = false;
for (String subSlice : subSlices) {
Slice oSlice = collection.getSlice(subSlice);
@@ -374,16 +383,14 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
}
}
}
-
if (oldShardsDeleted) {
// refresh the locally cached cluster state
// we know we have the latest because otherwise deleteshard would have
failed
clusterState = zkStateReader.getClusterState();
}
- // 4. create the child sub-shards in CONSTRUCTION state
+ // 5. Create sub-shards in CONSTRUCTION state.
String nodeName = parentShardLeader.getNodeName();
-
t = timings.sub("createSubSlicesAndLeadersInState");
for (int i = 0; i < subRanges.size(); i++) {
String subSlice = subSlices.get(i);
@@ -419,7 +426,7 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
CollectionHandlingUtils.waitForNewShard(
collectionName, subSlice, ccc.getZkStateReader());
- // 5. and add the initial replica for each sub-shard
+ // 6. Add an initial replica to each sub-shard.
log.debug(
"Adding first replica {} as part of slice {} of collection {} on
{}",
subShardName,
@@ -459,7 +466,7 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
}
t.stop();
- // 6. request that parent shard wait for children to become active
+ // 7. Request that parent shard wait for children to become ACTIVE.
t = timings.sub("waitForSubSliceLeadersAlive");
{
final ShardRequestTracker shardRequestTracker =
@@ -489,12 +496,6 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
}
t.stop();
- log.debug(
- "Successfully created all sub-shards for collection {} parent shard:
{} on: {}",
- collectionName,
- slice,
- parentShardLeader);
-
if (log.isInfoEnabled()) {
log.info(
"Splitting shard {} as part of slice {} of collection {} on {}",
@@ -504,7 +505,8 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
parentShardLeader);
}
- // 7. execute actual split
+ // 8. Execute split: either LINK or REWRITE.
+ t = timings.sub("splitParentCore");
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION,
CoreAdminParams.CoreAdminAction.SPLIT.toString());
params.set(CommonAdminParams.SPLIT_METHOD, splitMethod.toLower());
@@ -514,8 +516,6 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
params.add(CoreAdminParams.TARGET_CORE, subShardName);
}
params.set(CoreAdminParams.RANGES, rangesStr);
-
- t = timings.sub("splitParentCore");
{
final ShardRequestTracker shardRequestTracker =
CollectionHandlingUtils.asyncRequestTracker(asyncId, ccc);
@@ -526,12 +526,11 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
handleFailureOnAsyncRequest(results, msgOnError);
}
t.stop();
-
if (log.isDebugEnabled()) {
log.debug("Index on shard: {} split into {} successfully", nodeName,
subShardNames.size());
}
- // 8. apply buffered updates on sub-shards
+ // 9. Apply buffered updates to the sub-shards, so they are up-to-date
with parent.
t = timings.sub("applyBufferedUpdates");
{
final ShardRequestTracker shardRequestTracker =
@@ -557,9 +556,9 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
handleFailureOnAsyncRequest(results, msgOnError);
}
t.stop();
-
log.debug("Successfully applied buffered updates on : {}",
subShardNames);
+ // 10. Determine node placement for additional replicas (but do not
create yet).
// TODO: change this to handle sharding a slice into > 2 sub-shards.
// we have already created one subReplica for each subShard on the
parent node.
@@ -737,8 +736,8 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
// this ensures that the logic inside ReplicaMutator to update sub-shard
state to 'active'
// always gets a chance to execute. See SOLR-7673
- // 10. if replicationFactor > 1, set shard state for sub-shards to
RECOVERY; otherwise mark
- // ACTIVE
+ // 11. If replicationFactor is more than 1, set shard state for
sub-shards to RECOVERY; else
+ // mark ACTIVE.
if (repFactor == 1) {
// A commit is needed so that documents are visible when the sub-shard
replicas come up
// (Note: This commit used to be after the state switch, but was
brought here before the
@@ -802,14 +801,12 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
});
}
+ // 12. Create additional replicas of sub-shards.
t = timings.sub("createCoresForReplicas");
- // 11. now actually create replica cores on sub shard nodes
for (Map<String, Object> replica : replicas) {
new AddReplicaCmd(ccc).addReplica(clusterState, new
ZkNodeProps(replica), results, null);
}
-
assert TestInjection.injectSplitFailureAfterReplicaCreation();
-
{
final ShardRequestTracker syncRequestTracker =
CollectionHandlingUtils.syncRequestTracker(ccc);
@@ -818,9 +815,45 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
handleFailureOnAsyncRequest(results, msgOnError);
}
t.stop();
-
log.info("Successfully created all replica shards for all sub-slices
{}", subSlices);
+ // 13. If setPreferredLeaders param is true, set the preferred leader
property on one replica
+ // of each sub-shard. Distribute preferred leaders evenly among the
nodes.
+ if (setPreferredLeaders && repFactor > 1) {
+ t = timings.sub("setPreferredLeaders");
+ log.info("Setting the preferred leaders");
+ clusterState = zkStateReader.getClusterState();
+ collection = clusterState.getCollection(collectionName);
+
+ // Keep the leader on the current node for the first sub-shard.
+ Map<String, Integer> numLeadersPerNode = new HashMap<>();
+ {
+ String subSliceName = subSlices.get(0);
+ Replica replica = collection.getSlice(subSliceName).getLeader();
+ setPreferredLeaderProp(collectionName, subSliceName,
replica.getName());
+ numLeadersPerNode.put(replica.getNodeName(), 1);
+ }
+
+ // Distribute the preferred leaders for the other sub-shards evenly
among the nodes.
+ for (String subSliceName : subSlices.subList(1, subSlices.size())) {
+ Slice subSlice = collection.getSlice(subSliceName);
+ Replica selectedReplica = null;
+ int minNumLeaders = Integer.MAX_VALUE;
+ for (Replica replica : subSlice.getReplicas()) {
+ int numLeaders =
numLeadersPerNode.getOrDefault(replica.getNodeName(), 0);
+ if (numLeaders < minNumLeaders) {
+ selectedReplica = replica;
+ minNumLeaders = numLeaders;
+ }
+ }
+ assert selectedReplica != null;
+ setPreferredLeaderProp(collectionName, subSliceName,
selectedReplica.getName());
+ numLeadersPerNode.compute(
+ selectedReplica.getNodeName(), (__, n) -> n == null ? 1 : n + 1);
+ }
+ t.stop();
+ }
+
// The final commit was added in SOLR-4997 so that documents are visible
// when the sub-shard replicas come up
if (repFactor > 1) {
@@ -832,6 +865,9 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
if (withTiming) {
results.add(CommonParams.TIMING, timings.asNamedList());
}
+ if (log.isInfoEnabled()) {
+ log.info("Timings for split operations: {}", timings.asNamedList());
+ }
success = true;
// don't unlock the shard yet - only do this if the final switch-over in
ReplicaMutator
// succeeds (or fails)
@@ -854,6 +890,15 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
}
}
+ private void setPreferredLeaderProp(String collectionName, String shardName,
String replicaName)
+ throws IOException {
+ log.info("Setting replica {} as the preferred leader of shard {}",
replicaName, shardName);
+ CollectionAdminRequest.AddReplicaProp addProp =
+ CollectionAdminRequest.addReplicaProperty(
+ collectionName, shardName, replicaName, "preferredleader", "true");
+ ccc.getSolrCloudManager().request(addProp);
+ }
+
/**
* In case of async requests, the ShardRequestTracker's processResponses()
does not abort on
* failure (as it should). Handling this here temporarily for now.
@@ -1227,6 +1272,7 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
public static boolean lockForSplit(SolrCloudManager cloudManager, String
collection, String shard)
throws Exception {
+ log.debug("Getting lock for shard {} split", shard);
String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/" +
shard + "-splitting";
final DistribStateManager stateManager =
cloudManager.getDistribStateManager();
synchronized (stateManager) {
@@ -1250,12 +1296,14 @@ public class SplitShardCmd implements
CollApiCmds.CollectionApiCommand {
+ shard,
e);
}
+ log.debug("Obtained lock for shard {} split", shard);
return true;
}
}
public static void unlockForSplit(SolrCloudManager cloudManager, String
collection, String shard)
throws Exception {
+ log.debug("Releasing lock for shard {} split", shard);
if (shard != null) {
String path =
ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/" + shard +
"-splitting";
diff --git
a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 102077b1bc3..750a43b985a 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -106,6 +106,7 @@ import static
org.apache.solr.common.params.CommonAdminParams.NUM_SUB_SHARDS;
import static org.apache.solr.common.params.CommonAdminParams.SPLIT_BY_PREFIX;
import static org.apache.solr.common.params.CommonAdminParams.SPLIT_FUZZ;
import static org.apache.solr.common.params.CommonAdminParams.SPLIT_METHOD;
+import static
org.apache.solr.common.params.CommonAdminParams.SPLIT_SET_PREFERRED_LEADERS;
import static
org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.params.CommonParams.TIMING;
@@ -133,6 +134,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -244,6 +246,13 @@ import org.slf4j.LoggerFactory;
public class CollectionsHandler extends RequestHandlerBase implements
PermissionNameProvider {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ /**
+ * Boolean system property to automatically set the preferred leaders at
collection creation and
+ * during shard split. Default false. Otherwise, the caller needs to set it
for each {@link
+ * CollectionAdminRequest#splitShard(String) SplitShard} request.
+ */
+ public static final String AUTO_PREFERRED_LEADERS =
"solr.autoPreferredLeaders";
+
protected final CoreContainer coreContainer;
private final Optional<DistributedCollectionConfigSetCommandRunner>
distributedCollectionConfigSetCommandRunner;
@@ -306,7 +315,7 @@ public class CollectionsHandler extends RequestHandlerBase
implements Permission
@Override
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp)
throws Exception {
// Make sure the cores is enabled
- CoreContainer cores = checkErrors();
+ checkCoreContainer();
// Pick the action
SolrParams params = req.getParams();
@@ -329,24 +338,24 @@ public class CollectionsHandler extends
RequestHandlerBase implements Permission
}
CollectionOperation operation = CollectionOperation.get(action);
- invokeAction(req, rsp, cores, action, operation);
+ for (CollectionOperation op : operation.getCombinedOps(req, this)) {
+ invokeOperation(req, rsp, op);
+ if (rsp.getException() != null) {
+ log.warn("Operation {} failed with exception, skipping subsequent
operations", op);
+ break;
+ }
+ }
rsp.setHttpCaching(false);
}
- protected CoreContainer checkErrors() {
- CoreContainer cores = getCoreContainer();
- AdminAPIBase.validateZooKeeperAwareCoreContainer(cores);
- return cores;
+ protected void checkCoreContainer() {
+ AdminAPIBase.validateZooKeeperAwareCoreContainer(getCoreContainer());
}
@SuppressWarnings({"unchecked"})
- void invokeAction(
- SolrQueryRequest req,
- SolrQueryResponse rsp,
- CoreContainer cores,
- CollectionAction action,
- CollectionOperation operation)
+ void invokeOperation(SolrQueryRequest req, SolrQueryResponse rsp,
CollectionOperation operation)
throws Exception {
+ log.debug("Invoking {}", operation);
if (!coreContainer.isZooKeeperAware()) {
throw new SolrException(
BAD_REQUEST, "Invalid request. collections can be accessed only in
SolrCloud mode");
@@ -377,10 +386,10 @@ public class CollectionsHandler extends
RequestHandlerBase implements Permission
// Even if Overseer does wait for the collection to be created, it sees a
different cluster
// state than this node, so this wait is required to make sure the local
node Zookeeper watches
// fired and now see the collection.
- if (action.equals(CollectionAction.CREATE) && asyncId == null) {
- if (rsp.getException() == null) {
- waitForActiveCollection(zkProps.getStr(NAME), cores, overseerResponse);
- }
+ if (operation.equals(CollectionOperation.CREATE_OP)
+ && asyncId == null
+ && rsp.getException() == null) {
+ waitForActiveCollection(zkProps.getStr(NAME), getCoreContainer(),
overseerResponse);
}
}
@@ -886,7 +895,6 @@ public class CollectionsHandler extends RequestHandlerBase
implements Permission
SPLITSHARD,
DEFAULT_COLLECTION_OP_TIMEOUT * 5,
(req, rsp, h) -> {
- String name = req.getParams().required().get(COLLECTION_PROP);
// TODO : add support for multiple shards
String shard = req.getParams().get(SHARD_ID_PROP);
String rangesStr = req.getParams().get(CoreAdminParams.RANGES);
@@ -932,8 +940,75 @@ public class CollectionsHandler extends RequestHandlerBase
implements Permission
SPLIT_FUZZ,
SPLIT_BY_PREFIX,
FOLLOW_ALIASES,
- CREATE_NODE_SET_PARAM);
+ CREATE_NODE_SET_PARAM,
+ SPLIT_SET_PREFERRED_LEADERS);
return copyPropertiesWithPrefix(req.getParams(), map,
PROPERTY_PREFIX);
+ },
+ SplitShardHelper.OP_COMBINER),
+ /**
+ * Waits for a shard split to complete. Waits until the shard state is
switched to INACTIVE (in
+ * ReplicaMutator.checkAndCompleteShardSplit). At the same time, the
sub-shards states become
+ * ACTIVE.
+ */
+ WAIT_FOR_SHARD_SPLIT_OP(
+ null,
+ (req, rsp, h) -> {
+ String collectionName = req.getParams().get(COLLECTION_PROP);
+ String shardName = req.getParams().get(SHARD_ID_PROP);
+ log.info("Waiting for shard {} split to complete", shardName);
+ long startTime = System.nanoTime();
+ h.coreContainer
+ .getZkController()
+ .getZkStateReader()
+ .waitForState(
+ collectionName,
+ 1,
+ TimeUnit.HOURS,
+ collection -> {
+ Slice splitSlice = collection.getSlice(shardName);
+ boolean splitComplete =
+ splitSlice == null ||
splitSlice.getState().equals(Slice.State.INACTIVE);
+ if (splitComplete) {
+ if (log.isInfoEnabled()) {
+ log.info(
+ "Shard {} split completed in {} ms",
+ shardName,
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startTime));
+ }
+ }
+ return splitComplete;
+ });
+ return null;
+ }),
+ /** Waits for the shard preferred leader to become the leader. */
+ WAIT_FOR_PREFERRED_LEADER_OP(
+ null,
+ (req, rsp, h) -> {
+ String collectionName = req.getParams().get(COLLECTION_PROP);
+ String shardName = req.getParams().get(SHARD_ID_PROP);
+ log.info("Waiting for shard {} preferred leader to become the
leader", shardName);
+ long startTime = System.nanoTime();
+ h.coreContainer
+ .getZkController()
+ .getZkStateReader()
+ .waitForState(
+ collectionName,
+ 5,
+ TimeUnit.MINUTES,
+ collection -> {
+ boolean isLeader =
+
SplitShardHelper.isPreferredLeaderCurrentLeader(collection, shardName, h);
+ if (isLeader) {
+ if (log.isInfoEnabled()) {
+ log.info(
+ "Shard {} preferred leader is leader in {} ms",
+ shardName,
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startTime));
+ }
+ }
+ return isLeader;
+ });
+ return null;
}),
DELETESHARD_OP(
DELETESHARD,
@@ -1860,22 +1935,33 @@ public class CollectionsHandler extends
RequestHandlerBase implements Permission
public final CollectionOp fun;
final CollectionAction action;
final long timeOut;
+ final CollectionOpCombiner opCombiner;
CollectionOperation(CollectionAction action, CollectionOp fun) {
- this(action, DEFAULT_COLLECTION_OP_TIMEOUT, fun);
+ this(action, DEFAULT_COLLECTION_OP_TIMEOUT, fun,
CollectionOpCombiner.SINGLE_OP);
}
- CollectionOperation(CollectionAction action, long timeOut, CollectionOp
fun) {
+ CollectionOperation(
+ CollectionAction action, long timeOut, CollectionOp fun,
CollectionOpCombiner opCombiner) {
this.action = action;
this.timeOut = timeOut;
this.fun = fun;
+ this.opCombiner = opCombiner;
+ if (action != null) {
+ OperationMap.map.put(action, this);
+ }
}
public static CollectionOperation get(CollectionAction action) {
- for (CollectionOperation op : values()) {
- if (op.action == action) return op;
+ CollectionOperation op = OperationMap.map.get(action);
+ if (op == null) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "No such action " +
action);
}
- throw new SolrException(ErrorCode.SERVER_ERROR, "No such action " +
action);
+ return op;
+ }
+
+ List<CollectionOperation> getCombinedOps(SolrQueryRequest req,
CollectionsHandler h) {
+ return opCombiner.getCombinedOps(this, req, h);
}
@Override
@@ -1883,6 +1969,58 @@ public class CollectionsHandler extends
RequestHandlerBase implements Permission
SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h)
throws Exception {
return fun.execute(req, rsp, h);
}
+
+ private static class OperationMap {
+ static final Map<CollectionAction, CollectionOperation> map =
+ new EnumMap<>(CollectionAction.class);
+ }
+
+ private static class SplitShardHelper {
+ static final CollectionOpCombiner OP_COMBINER =
+ (op, req, h) -> {
+ if (!req.getParams()
+ .getBool(SPLIT_SET_PREFERRED_LEADERS,
Boolean.getBoolean(AUTO_PREFERRED_LEADERS))) {
+ return Collections.singletonList(op);
+ }
+ // The split.setPreferredLeader prop is true.
+ List<CollectionOperation> opSequence = new ArrayList<>();
+ String collectionName = req.getParams().get(COLLECTION_PROP);
+ String shardName = req.getParams().get(SHARD_ID_PROP);
+ DocCollection collection =
+ h.coreContainer
+ .getZkController()
+ .getZkStateReader()
+ .getClusterState()
+ .getCollection(collectionName);
+ // Ensure we split a preferred leader to help cluster balancing.
+ if (!isPreferredLeaderCurrentLeader(collection, shardName, h)) {
+ // A replica of the shard is defined as preferred leader, but is
not the current
+ // leader yet.
+ opSequence.add(REBALANCELEADERS_OP); // Rebalance the leader on
shard.
+ opSequence.add(WAIT_FOR_PREFERRED_LEADER_OP); // Wait for the
rebalancing completion.
+ }
+ opSequence.add(op); // Split the shard and set the sub-shards
preferred leaders.
+ opSequence.add(WAIT_FOR_SHARD_SPLIT_OP); // Wait for the shard
split completion.
+ opSequence.add(REBALANCELEADERS_OP); // Rebalance the leaders on
the sub-shards.
+ return opSequence;
+ };
+
+ static boolean isPreferredLeaderCurrentLeader(
+ DocCollection collection, String shardName, CollectionsHandler h) {
+ Slice slice = collection.getSlice(shardName);
+ if (slice == null) {
+ throw new SolrException(
+ ErrorCode.BAD_REQUEST, "Shard '" + shardName + "' does not
exist, no action taken.");
+ }
+ for (Replica replica : slice.getReplicas()) {
+ if (replica.getBool(PROPERTY_PREFIX + "preferredleader", false)) {
+ return replica.equals(slice.getLeader());
+ }
+ }
+ // No replicas have the preferred leader property.
+ return true;
+ }
+ }
}
private static void forceLeaderElection(SolrQueryRequest req,
CollectionsHandler handler) {
@@ -2069,6 +2207,14 @@ public class CollectionsHandler extends
RequestHandlerBase implements Permission
throws Exception;
}
+ interface CollectionOpCombiner {
+
+ CollectionOpCombiner SINGLE_OP = (op, req, h) ->
Collections.singletonList(op);
+
+ List<CollectionOperation> getCombinedOps(
+ CollectionOperation op, SolrQueryRequest req, CollectionsHandler h);
+ }
+
@Override
public Boolean registerV2() {
return Boolean.TRUE;
diff --git
a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
index 42aa03c54ea..f354ed18589 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
@@ -127,6 +127,9 @@ class RebalanceLeaders {
void execute() throws KeeperException, InterruptedException {
DocCollection dc = checkParams();
+ if (log.isInfoEnabled()) {
+ log.info("Rebalancing leaders for collection {}", dc.getName());
+ }
int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE);
if (max <= 0) max = Integer.MAX_VALUE;
@@ -139,7 +142,9 @@ class RebalanceLeaders {
for (Slice slice : dc.getSlices()) {
ensurePreferredIsLeader(slice);
if (asyncRequests.size() == max) {
- log.info("Queued {} leader reassignments, waiting for some to
complete.", max);
+ log.info(
+ "Max number '{}' of election queue movements requested, waiting
for some to complete.",
+ max);
keepGoing = waitAsyncRequests(maxWaitSecs, false);
if (keepGoing == false) {
break; // If we've waited longer than specified, don't continue to
wait!
@@ -147,13 +152,14 @@ class RebalanceLeaders {
}
}
if (keepGoing == true) {
+ log.info("Waiting for all election queue movements to complete");
keepGoing = waitAsyncRequests(maxWaitSecs, true);
}
if (keepGoing == true) {
- log.info("All leader reassignments completed.");
+ log.info("All election queue movements completed.");
} else {
log.warn(
- "Exceeded specified timeout of '{}' all leaders may not have been
reassigned'",
+ "Exceeded specified timeout of '{}' while waiting for election queue
movements to complete",
maxWaitSecs);
}
@@ -197,6 +203,8 @@ class RebalanceLeaders {
// Once we've done all the fiddling with the queues, check on the way out to
see if all the active
// preferred leaders that we intended to change are in fact the leaders.
private void checkLeaderStatus() throws InterruptedException,
KeeperException {
+ log.info("Waiting for all leader reassignments to complete");
+ long startTime = System.nanoTime();
for (int idx = 0; pendingOps.size() > 0 && idx < 600; ++idx) {
ClusterState clusterState =
coreContainer.getZkController().getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
@@ -210,6 +218,12 @@ class RebalanceLeaders {
// Record for return that the leader changed successfully
pendingOps.remove(slice.getName());
addToSuccesses(slice, replica);
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Preferred leader replica {} is the leader of shard {}",
+ replica.getName(),
+ slice.getName());
+ }
break;
}
}
@@ -220,6 +234,13 @@ class RebalanceLeaders {
coreContainer.getZkController().getZkStateReader().forciblyRefreshAllClusterStateSlow();
}
addAnyFailures();
+ if (log.isInfoEnabled()) {
+ log.info(
+ "Finished waiting {} ms for leader reassignments with {} pending ops
{}",
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime),
+ pendingOps.size(),
+ pendingOps);
+ }
}
// The process is:
@@ -232,25 +253,47 @@ class RebalanceLeaders {
// node in the leader election queue is removed and the only remaining node
watching it is
// triggered to become leader.
private void ensurePreferredIsLeader(Slice slice) throws KeeperException,
InterruptedException {
+ if (log.isInfoEnabled()) {
+ log.info("Ensuring preferred leader is leader for shard {}",
slice.getName());
+ }
for (Replica replica : slice.getReplicas()) {
// Tell the replica to become the leader if we're the preferred leader
AND active AND not the
// leader already
if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false)
{
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Replica {} of shard {} is not preferred leader",
replica.getName(), slice.getName());
+ }
continue;
}
// OK, we are the preferred leader, are we the actual leader?
if (replica.getBool(LEADER_PROP, false)) {
// We're a preferred leader, but we're _also_ the leader, don't need
to do anything.
addAlreadyLeaderToResults(slice, replica);
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Replica {} of shard {} is preferred leader and already leader",
+ replica.getName(),
+ slice.getName());
+ }
return; // already the leader, do nothing.
}
ZkStateReader zkStateReader =
coreContainer.getZkController().getZkStateReader();
// We're the preferred leader, but someone else is leader. Only become
leader if we're active.
if (replica.isActive(zkStateReader.getClusterState().getLiveNodes()) ==
false) {
addInactiveToResults(slice, replica);
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Replica {} of shard {} is preferred leader but not active",
+ replica.getName(),
+ slice.getName());
+ }
return; // Don't try to become the leader if we're not active!
}
+ if (log.isDebugEnabled()) {
+ log.debug("Getting the sorted election nodes for shard {}",
slice.getName());
+ }
List<String> electionNodes =
OverseerTaskProcessor.getSortedElectionNodes(
zkStateReader.getZkClient(),
@@ -273,6 +316,12 @@ class RebalanceLeaders {
String firstWatcher = electionNodes.get(1);
if (LeaderElector.getNodeName(firstWatcher).equals(replica.getName()) ==
false) {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Re-enqueue replica {} to become the first watcher in election
for shard {}",
+ replica.getName(),
+ slice.getName());
+ }
makeReplicaFirstWatcher(slice, replica);
}
@@ -280,10 +329,31 @@ class RebalanceLeaders {
// to check at the end
pendingOps.put(slice.getName(), replica.getName());
String leaderElectionNode = electionNodes.get(0);
- String coreName =
-
slice.getReplica(LeaderElector.getNodeName(leaderElectionNode)).getStr(CORE_NAME_PROP);
+ Replica leaderReplica =
slice.getReplica(LeaderElector.getNodeName(leaderElectionNode));
+ String coreName = leaderReplica.getStr(CORE_NAME_PROP);
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Move replica {} node {} core {} at the end of the election queue
for shard {}",
+ leaderReplica.getName(),
+ leaderElectionNode,
+ coreName,
+ slice.getName());
+ }
rejoinElectionQueue(slice, leaderElectionNode, coreName, false);
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Waiting for replica {} node {} change in the election queue for
shard {}",
+ leaderReplica.getName(),
+ leaderElectionNode,
+ slice.getName());
+ }
waitForNodeChange(slice, leaderElectionNode);
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Preferred leader {} is now the first in line for leader election
for shard {}",
+ replica.getName(),
+ slice.getName());
+ }
return; // Done with this slice, skip the rest of the replicas.
}
@@ -419,6 +489,13 @@ class RebalanceLeaders {
// that any requeueing we've done has happened.
int waitForNodeChange(Slice slice, String electionNode)
throws InterruptedException, KeeperException {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Waiting for node {} to rejoin the election queue for shard {}",
+ electionNode,
+ slice.getName());
+ }
+ long startTime = System.nanoTime();
String nodeName = LeaderElector.getNodeName(electionNode);
int oldSeq = LeaderElector.getSeq(electionNode);
for (int idx = 0; idx < 600; ++idx) {
@@ -437,6 +514,9 @@ class RebalanceLeaders {
TimeUnit.MILLISECONDS.sleep(100);
zkStateReader.forciblyRefreshAllClusterStateSlow();
}
+ log.warn(
+ "Timeout waiting for node change after {} ms",
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
return -1;
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
b/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
index dd20903deed..ddcd5c57d55 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
@@ -17,22 +17,23 @@
package org.apache.solr.cloud;
+import static
org.apache.solr.handler.admin.CollectionsHandler.AUTO_PREFERRED_LEADERS;
import static org.hamcrest.core.StringContains.containsString;
-import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -44,12 +45,11 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.embedded.JettySolrRunner;
import org.hamcrest.MatcherAssert;
import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,27 +59,51 @@ public class SplitShardTest extends SolrCloudTestCase {
private final String COLLECTION_NAME = "splitshardtest-collection";
- @BeforeClass
- public static void setupCluster() throws Exception {
- System.setProperty("metricsEnabled", "true");
- configureCluster(1).addConfig("conf",
configset("cloud-minimal")).configure();
- }
-
- @Before
- @Override
- public void setUp() throws Exception {
- super.setUp();
- }
-
@After
@Override
public void tearDown() throws Exception {
super.tearDown();
cluster.deleteAllCollections();
+ shutdownCluster();
}
@Test
- public void doTest() throws IOException, SolrServerException {
+ public void testSplitOneHostTwoSubShardsTwoReplicas() throws Exception {
+ setupCluster(1);
+ innerTestSplitTwoSubShardsTwoReplicas();
+ }
+
+ @Test
+ public void testSplitTwoHostsTwoSubShardsTwoReplicas() throws Exception {
+ setupCluster(2);
+ innerTestSplitTwoSubShardsTwoReplicas();
+ }
+
+ @Test
+ public void testSplitThreeHostsTwoSubShardsTwoReplicas() throws Exception {
+ setupCluster(3);
+ innerTestSplitTwoSubShardsTwoReplicas();
+ }
+
+ private void innerTestSplitTwoSubShardsTwoReplicas() throws Exception {
+ CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 1, 2)
+ .process(cluster.getSolrClient());
+
+ cluster.waitForActiveCollection(COLLECTION_NAME, 1, 2);
+
+ CollectionAdminRequest.SplitShard splitShard =
+ CollectionAdminRequest.splitShard(COLLECTION_NAME)
+ .setNumSubShards(2)
+ .setShardName("shard1");
+ splitShard.process(cluster.getSolrClient());
+ waitForState(
+ "Timed out waiting for sub shards to be active", COLLECTION_NAME,
activeClusterShape(2, 6));
+ }
+
+ @Test
+ public void testSplitOneHostFiveSubShardsOneReplica() throws Exception {
+ setupCluster(1);
+
CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 1)
.process(cluster.getSolrClient());
@@ -130,7 +154,8 @@ public class SplitShardTest extends SolrCloudTestCase {
}
@Test
- public void multipleOptionsSplitTest() {
+ public void multipleOptionsSplitTest() throws Exception {
+ setupCluster(1);
CollectionAdminRequest.SplitShard splitShard =
CollectionAdminRequest.splitShard(COLLECTION_NAME)
.setNumSubShards(5)
@@ -145,6 +170,7 @@ public class SplitShardTest extends SolrCloudTestCase {
@Test
public void testSplitFuzz() throws Exception {
+ setupCluster(1);
String collectionName = "splitFuzzCollection";
CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1)
.process(cluster.getSolrClient());
@@ -176,7 +202,11 @@ public class SplitShardTest extends SolrCloudTestCase {
assertEquals("wrong range in s1_1", expected1, delta1);
}
- CloudSolrClient createCollection(String collectionName, int repFactor)
throws Exception {
+ private void setupCluster(int nodeCount) throws Exception {
+ configureCluster(nodeCount).addConfig("conf",
configset("cloud-minimal")).configure();
+ }
+
+ private CloudSolrClient createCollection(String collectionName, int
repFactor) throws Exception {
CollectionAdminRequest.createCollection(collectionName, "conf", 1,
repFactor)
.process(cluster.getSolrClient());
@@ -188,7 +218,7 @@ public class SplitShardTest extends SolrCloudTestCase {
return client;
}
- long getNumDocs(CloudSolrClient client) throws Exception {
+ private long getNumDocs(CloudSolrClient client) throws Exception {
String collectionName = client.getDefaultCollection();
DocCollection collection =
client.getClusterState().getCollection(collectionName);
Collection<Slice> slices = collection.getSlices();
@@ -225,13 +255,35 @@ public class SplitShardTest extends SolrCloudTestCase {
return totCount;
}
- void doLiveSplitShard(String collectionName, int repFactor, int nThreads)
throws Exception {
+ @Test
+ public void testConcurrentSplitOneHostRepFactorOne() throws Exception {
+ setupCluster(1);
+ // Debugging tips: if this fails, it may be easier to debug by lowering
the number fo threads to
+ // 1 and looping the test until you get another failure.
+ // You may need to further instrument things like
DistributedZkUpdateProcessor to display the
+ // cluster state for the collection, etc. Using more threads increases the
chance to hit a
+ // concurrency bug, but too many threads can overwhelm single-threaded
buffering replay after
+ // the low level index split and result in subShard leaders that can't
catch up and become
+ // active (a known issue that still needs to be resolved.)
+ splitWithConcurrentUpdates("livesplit-1-1", 1, 4, false);
+ }
+
+ @Test
+ public void testConcurrentSplitThreeHostsRepFactorTwo() throws Exception {
+ setupCluster(3);
+ splitWithConcurrentUpdates("livesplit-3-2", 2, 4, true);
+ }
+
+ private void splitWithConcurrentUpdates(
+ String collectionName, int repFactor, int nThreads, boolean
setPreferredLeaders)
+ throws Exception {
final CloudSolrClient client = createCollection(collectionName, repFactor);
final ConcurrentHashMap<String, Long> model =
new ConcurrentHashMap<>(); // what the index should contain
final AtomicBoolean doIndex = new AtomicBoolean(true);
- final AtomicInteger docsIndexed = new AtomicInteger();
+ final AtomicInteger numDocsAdded = new AtomicInteger();
+ final AtomicInteger numDocsDeleted = new AtomicInteger();
Thread[] indexThreads = new Thread[nThreads];
try {
@@ -239,23 +291,38 @@ public class SplitShardTest extends SolrCloudTestCase {
indexThreads[i] =
new Thread(
() -> {
+ Random random = random();
+ List<Integer> threadDocIndexes = new ArrayList<>();
while (doIndex.get()) {
try {
// Thread.sleep(10); // cap indexing rate at 100 docs
per second, per thread
- int currDoc = docsIndexed.incrementAndGet();
+ int currDoc = numDocsAdded.incrementAndGet();
String docId = "doc_" + currDoc;
// Try all docs in the same update request
- UpdateRequest updateReq = new UpdateRequest();
- updateReq.add(sdoc("id", docId));
+ UpdateRequest addReq = new UpdateRequest();
+ addReq.add(sdoc("id", docId));
// UpdateResponse ursp = updateReq.commit(client,
collectionName);
// uncomment this if you want a commit each time
- UpdateResponse ursp = updateReq.process(client,
collectionName);
+ UpdateResponse ursp = addReq.process(client,
collectionName);
assertEquals(0, ursp.getStatus()); // for now, don't
accept any failures
if (ursp.getStatus() == 0) {
// in the future, keep track of a version per document
and reuse ids to keep
// index from growing too large
model.put(docId, 1L);
+ threadDocIndexes.add(currDoc);
+ }
+
+ if (currDoc % 20 == 0) {
+ int docIndex =
+
threadDocIndexes.remove(random.nextInt(threadDocIndexes.size()));
+ docId = "doc_" + docIndex;
+ UpdateRequest deleteReq = new UpdateRequest();
+ deleteReq.deleteById(docId);
+ ursp = deleteReq.process(client, collectionName);
+ assertEquals(0, ursp.getStatus()); // for now, don't
accept any failures
+ model.remove(docId);
+ numDocsDeleted.incrementAndGet();
}
} catch (Exception e) {
fail(e.getMessage());
@@ -274,13 +341,22 @@ public class SplitShardTest extends SolrCloudTestCase {
CollectionAdminRequest.SplitShard splitShard =
CollectionAdminRequest.splitShard(collectionName).setShardName("shard1");
- splitShard.process(client);
- waitForState(
- "Timed out waiting for sub shards to be active.",
- collectionName,
- activeClusterShape(
- 2,
- 3 * repFactor)); // 2 repFactor for the new split shards, 1
repFactor for old replicas
+ // Set the preferred leaders param either with a request param, or with
a system property.
+ if (random().nextBoolean()) {
+ splitShard.shouldSetPreferredLeaders(setPreferredLeaders);
+ } else {
+ System.setProperty(AUTO_PREFERRED_LEADERS,
Boolean.toString(setPreferredLeaders));
+ }
+ try {
+ splitShard.process(client);
+ waitForState(
+ "Timed out waiting for sub shards to be active.",
+ collectionName,
+ // 2 repFactor for the new split shards, 1 repFactor for old
replicas
+ activeClusterShape(2, 3 * repFactor));
+ } finally {
+ System.clearProperty(AUTO_PREFERRED_LEADERS);
+ }
// make sure that docs were indexed during the split
assertTrue(model.size() > docCount);
@@ -311,11 +387,47 @@ public class SplitShardTest extends SolrCloudTestCase {
log.error("MISSING DOCUMENTS: {}", leftover);
}
- assertEquals("Documents are missing!", docsIndexed.get(), numDocs);
- log.info("Number of documents indexed and queried : {}", numDocs);
+ assertEquals(
+ "Documents are missing!"
+ + " numDocsAdded="
+ + numDocsAdded.get()
+ + " numDocsDeleted="
+ + numDocsDeleted.get()
+ + " numDocs="
+ + numDocs,
+ numDocsAdded.get() - numDocsDeleted.get(),
+ numDocs);
+ if (log.isInfoEnabled()) {
+ log.info("{} docs added, {} docs deleted", numDocsAdded.get(),
numDocsDeleted.get());
+ }
+
+ if (setPreferredLeaders) {
+ DocCollection collection =
+
cluster.getSolrClient().getClusterState().getCollection(collectionName);
+ for (Slice slice : collection.getSlices()) {
+ if (!slice.getState().equals(Slice.State.ACTIVE)) {
+ continue;
+ }
+ boolean preferredLeaderFound = false;
+ for (Replica replica : slice.getReplicas()) {
+ if (replica.getBool(CollectionAdminParams.PROPERTY_PREFIX +
"preferredleader", false)) {
+ preferredLeaderFound = true;
+ assertEquals(
+ "Replica "
+ + replica.getName()
+ + " is the preferred leader but not the leader of shard "
+ + slice.getName(),
+ slice.getLeader(),
+ replica);
+ }
+ }
+ assertTrue("No preferred leader found for shard " + slice.getName(),
preferredLeaderFound);
+ }
+ }
}
public void testShardSplitWithNodeset() throws Exception {
+ setupCluster(1);
String COLL = "shard_split_nodeset";
CollectionAdminRequest.createCollection(COLL, "conf", 2,
2).process(cluster.getSolrClient());
@@ -371,16 +483,4 @@ public class SplitShardTest extends SolrCloudTestCase {
return set.isEmpty();
}
-
- @Test
- public void testLiveSplit() throws Exception {
- // Debugging tips: if this fails, it may be easier to debug by lowering
the number fo threads to
- // 1 and looping the test until you get another failure. You may need to
further instrument
- // things like DistributedZkUpdateProcessor to display the cluster state
for the collection,
- // etc. Using more threads increases the chance to hit a concurrency bug,
but too many threads
- // can overwhelm single-threaded buffering replay after the low level
index split and result in
- // subShard leaders that can't catch up and become active (a known issue
that still needs to be
- // resolved.)
- doLiveSplitShard("livesplit1", 1, 4);
- }
}
diff --git
a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
index e3317c44120..515c387453d 100644
---
a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
+++
b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
@@ -17,6 +17,7 @@
package org.apache.solr.cloud.api.collections;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static
org.apache.solr.common.params.CommonAdminParams.SPLIT_SET_PREFERRED_LEADERS;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@@ -52,6 +53,7 @@ import org.apache.solr.cloud.BasicDistributedZkTest;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cloud.StoppableIndexingThread;
import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocCollection;
@@ -711,7 +713,13 @@ public class ShardSplitTest extends BasicDistributedZkTest
{
trySplit(collectionName, null, SHARD1, 1);
fail("expected to fail due to locking but succeeded");
} catch (Exception e) {
- log.info("Expected failure: {}", e);
+ // Verify the exception caught.
+ // If the split command sets the preferred leader, that checks that
the exception is not a
+ // TimeoutException because we don't want the split combined operation
to wait for the split
+ // completion if the split fails.
+ assertTrue("Unexpected exception " + e, e instanceof SolrException);
+ assertEquals(SolrException.ErrorCode.INVALID_STATE.code,
((SolrException) e).code());
+ log.info("Expected failure:", e);
}
// make sure the lock still exists
@@ -1253,6 +1261,9 @@ public class ShardSplitTest extends
BasicDistributedZkTest {
if (splitKey != null) {
params.set("split.key", splitKey);
}
+ if (random().nextBoolean()) {
+ params.set(SPLIT_SET_PREFERRED_LEADERS, true);
+ }
QueryRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
diff --git
a/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java
b/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java
index e074f0ba40d..dea2b768d08 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java
@@ -36,7 +36,6 @@ import org.apache.solr.api.ApiBag;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.MultiMapSolrParams;
import org.apache.solr.common.params.SolrParams;
@@ -44,7 +43,6 @@ import org.apache.solr.common.util.CommandOperation;
import org.apache.solr.common.util.ContentStreamBase;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.ClusterAPI;
import org.apache.solr.handler.CollectionsAPI;
import org.apache.solr.request.LocalSolrQueryRequest;
@@ -310,23 +308,16 @@ public class TestCollectionAPIs extends SolrTestCaseJ4 {
MockCollectionsHandler() {}
@Override
- protected CoreContainer checkErrors() {
- return null;
- }
+ protected void checkCoreContainer() {}
@Override
protected void copyFromClusterProp(Map<String, Object> props, String prop)
{}
@Override
- void invokeAction(
- SolrQueryRequest req,
- SolrQueryResponse rsp,
- CoreContainer cores,
- CollectionParams.CollectionAction action,
- CollectionOperation operation)
+ void invokeOperation(SolrQueryRequest req, SolrQueryResponse rsp,
CollectionOperation operation)
throws Exception {
Map<String, Object> result = null;
- if (action == CollectionParams.CollectionAction.COLLECTIONPROP) {
+ if (operation.equals(CollectionOperation.COLLECTIONPROP_OP)) {
// Fake this action, since we don't want to write to ZooKeeper in this
test
result = new HashMap<>();
result.put(NAME, req.getParams().required().get(NAME));
diff --git
a/solr/solr-ref-guide/modules/deployment-guide/pages/shard-management.adoc
b/solr/solr-ref-guide/modules/deployment-guide/pages/shard-management.adoc
index d2995f014d5..eb2c35d71af 100644
--- a/solr/solr-ref-guide/modules/deployment-guide/pages/shard-management.adoc
+++ b/solr/solr-ref-guide/modules/deployment-guide/pages/shard-management.adoc
@@ -262,6 +262,18 @@ One simple way to populate `id_prefix` is a copyField in
the schema:
</fieldtype>
----
+`split.setPreferredLeaders`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: `false`
+|===
++
+If `true`, the SPLITSHARD command sets the preferred leader property on one
replica of each sub-shard,
+automatically when splitting a shard. The preferred leaders are distributed
evenly among the nodes.
+It is also possible to change the default value of this parameter to `true` by
setting the system property
+`solr.autoPreferredLeaders` to `true`.
+
Current implementation details and limitations:
* Prefix size is calculated using number of documents with the prefix.
diff --git
a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 0a41b2c782a..edf39868c06 100644
---
a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++
b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -1567,6 +1567,7 @@ public abstract class CollectionAdminRequest<T extends
CollectionAdminResponse>
protected Boolean splitByPrefix;
protected Integer numSubShards;
protected Float splitFuzz;
+ protected Boolean setPreferredLeaders;
private Properties properties;
protected String createNodeSet;
@@ -1649,6 +1650,11 @@ public abstract class CollectionAdminRequest<T extends
CollectionAdminResponse>
return this;
}
+ public SplitShard shouldSetPreferredLeaders(Boolean setPreferredLeaders) {
+ this.setPreferredLeaders = setPreferredLeaders;
+ return this;
+ }
+
@Override
public SolrParams getParams() {
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
@@ -1669,17 +1675,18 @@ public abstract class CollectionAdminRequest<T extends
CollectionAdminResponse>
if (splitFuzz != null) {
params.set(CommonAdminParams.SPLIT_FUZZ, String.valueOf(splitFuzz));
}
-
if (splitByPrefix != null) {
params.set(CommonAdminParams.SPLIT_BY_PREFIX, splitByPrefix);
}
-
if (properties != null) {
addProperties(params, properties);
}
if (createNodeSet != null) {
params.set(CREATE_NODE_SET_PARAM, createNodeSet);
}
+ if (setPreferredLeaders != null) {
+ params.set(CommonAdminParams.SPLIT_SET_PREFERRED_LEADERS,
setPreferredLeaders);
+ }
return params;
}
}
diff --git
a/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java
b/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java
index 695eaf271cb..e9acc759f75 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java
@@ -36,4 +36,10 @@ public interface CommonAdminParams {
String TIMEOUT = "timeout";
/** Inexact shard splitting factor. */
String SPLIT_FUZZ = "splitFuzz";
+ /**
+ * Boolean param to set the preferred leader property on one replica of each
sub-shard,
+ * automatically when splitting a shard. The preferred leaders are
distributed evenly among the
+ * nodes.
+ */
+ String SPLIT_SET_PREFERRED_LEADERS = "split.setPreferredLeaders";
}