Hello everyone,
I was going to open a jira ticket with the following content, but Jira
forwarded me to send an email to the Solr mail list instead.
I had the following problems in Solr that I fixed with the attached patch.
the nodes are in the following way:
If node1 is the leader you have
node1(1) <- node2(2) <- node3(3) <- node4(4)
where <- means "watches". and the node Sequence is between parenthesis.
Now, node4 becomes the first watcher:
node1(1)<- Node2(2)
<-Node4(2) <- Node 3(3)
When node1 goes down, Node2 will try to set itself as the first watcher.
For that, it will delete it's current node:
Node4(2)<-Node3(3)
And set itself as the first watcher.
Node2(2)
Node4(2) <-Node3(3)
To verify that the node has been set at the front of the queue, Solr will
check that the sequence number has changed.
And there lies the problem: since the Sequence number is the same, solr
won't detect the node and the Node will be unable to become leader.
Instead, solr could check if the Node is already the first watcher and if
it is, just send the nodes with the duplicate Sequence Id to the back of
the queue. Then, since the node is already the first watcher, there will be
no need to position it as the first watcher again before trying to become
leader.
Index: solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java (date 1510919343000)
+++ solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java (date 1540199795000)
@@ -16,38 +16,12 @@
*/
package org.apache.solr.handler.admin;
-import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-
-import java.lang.invoke.MethodHandles;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
import org.apache.commons.lang.StringUtils;
import org.apache.solr.cloud.LeaderElector;
import org.apache.solr.cloud.OverseerTaskProcessor;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.*;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.request.SolrQueryRequest;
@@ -56,6 +30,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.invoke.MethodHandles;
+import java.util.*;
+
+import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.common.cloud.ZkStateReader.*;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
class RebalanceLeaders {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -79,8 +61,12 @@
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the Rebalance Leaders command."));
}
+ log.debug("Forcing update collection");
coreContainer.getZkController().getZkStateReader().forceUpdateCollection(collectionName);
+ log.debug("Reading cluster state");
ClusterState clusterState = coreContainer.getZkController().getClusterState();
+ log.debug("Cluster state is: {}", clusterState);
+
DocCollection dc = clusterState.getCollection(collectionName);
if (dc == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
@@ -120,11 +106,14 @@
final String alreadyLeaders = "alreadyLeaders";
String collectionName = req.getParams().get(COLLECTION_PROP);
+ log.debug("Working on Slice {}", slice.getName());
for (Replica replica : slice.getReplicas()) {
+ log.debug("Checking if replica {} is leader", replica.getName());
// Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) {
continue;
}
+ log.debug(" we are the preferred leader, are we the actual leader?");
// OK, we are the preferred leader, are we the actual leader?
if (replica.getBool(LEADER_PROP, false)) {
//We're a preferred leader, but we're _also_ the leader, don't need to do anything.
@@ -158,6 +147,8 @@
return; // Don't try to become the leader if we're not active!
}
+ log.debug("We are preferred and Active but not leader. We should be leader");
+
// Replica is the preferred leader but not the actual leader, do something about that.
// "Something" is
// 1> if the preferred leader isn't first in line, tell it to re-queue itself.
@@ -181,12 +172,30 @@
String firstWatcher = electionNodes.get(1);
- if (LeaderElector.getNodeName(firstWatcher).equals(replica.getName()) == false) {
+ final Integer firstWatcherSeq = LeaderElector.getSeq(firstWatcher);
+
+ final boolean sameSeqAsFirstWatcher = electionNodes.stream()
+ .filter(watcher-> LeaderElector.getNodeName(watcher).equals(replica.getName()))
+ .findFirst()
+ .map(LeaderElector::getSeq)
+ .map(firstWatcherSeq::equals)
+ .orElse(false);
+
+ if(sameSeqAsFirstWatcher){
+ log.debug("We {} might be the first watcher. Sending to tail any nodes with the same Sequence", replica.getName());
+ this.queueNodesWithSameSequence(collectionName, slice, replica, firstWatcherSeq);
+ }
+ //if we aren't the first watcher, become the first watcher
+ else if (!LeaderElector.getNodeName(firstWatcher).equals(replica.getName())) {
+ log.debug("We {} are not the first watcher. Becoming first watcher", replica.getName());
makeReplicaFirstWatcher(collectionName, slice, replica);
}
+ else log.debug("We {} are already the first watcher. Adding Core1 to the tail", replica.getName());
String coreName = slice.getReplica(LeaderElector.getNodeName(electionNodes.get(0))).getStr(CORE_NAME_PROP);
+ log.debug("Core {} rejoining election", coreName);
rejoinElection(collectionName, slice, electionNodes.get(0), coreName, false);
+ log.debug("Core {} waiting for node change", coreName);
waitForNodeChange(collectionName, slice, electionNodes.get(0));
@@ -196,40 +205,56 @@
// Put the replica in at the head of the queue and send all nodes with the same sequence number to the back of the list
void makeReplicaFirstWatcher(String collectionName, Slice slice, Replica replica)
throws KeeperException, InterruptedException {
-
+ log.debug(" Put the replica {} in at the head of the queue and send all nodes with the same sequence number to the back of the list", replica.getNodeName() + " " + replica.getName());
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
- // First, queue up the preferred leader at the head of the queue.
- int newSeq = -1;
+ log.debug("First, queue up the preferred leader at the head of the queue");
+ int newSeq = -1;
+ String myElectionNode = "";
for (String electionNode : electionNodes) {
+ myElectionNode = electionNode;
if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) {
+ log.debug("Rejoining election for node {}", myElectionNode);
String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP);
rejoinElection(collectionName, slice, electionNode, coreName, true);
- newSeq = waitForNodeChange(collectionName, slice, electionNode);
+ log.debug("Waiting for node {} to change", myElectionNode);
+ newSeq = waitForNodeChange(collectionName, slice, electionNode);
break;
}
}
if (newSeq == -1) {
+ log.debug("We were waiting for electionNode {} to change but it timed out. Are we offline?", myElectionNode);
return; // let's not continue if we didn't get what we expect. Possibly we're offline etc..
}
+ log.debug("Got new Sequence {} for node {}", newSeq, myElectionNode);
+ queueNodesWithSameSequence(collectionName, slice, replica, newSeq);
+ }
- // Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue.
+ private void queueNodesWithSameSequence(String collectionName, Slice slice, Replica replica, int newSeq) throws KeeperException, InterruptedException {
+ final ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
+ List<String> electionNodes;// Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue.
electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
for (String thisNode : electionNodes) {
if (LeaderElector.getSeq(thisNode) > newSeq) {
+ log.debug("Current node sequence is higher than our node. We are done");
break;
}
if (LeaderElector.getNodeName(thisNode).equals(replica.getName())) {
+ log.debug("Node {} equals the leader replica. Skipping....", thisNode);
continue;
}
if (LeaderElector.getSeq(thisNode) == newSeq) {
+ log.debug("The node {} has the same sequence number as the leader. Requeing it...", thisNode);
String coreName = slice.getReplica(LeaderElector.getNodeName(thisNode)).getStr(CORE_NAME_PROP);
+ log.debug("Rejoining election for node {}", thisNode);
rejoinElection(collectionName, slice, thisNode, coreName, false);
+ log.debug("Waiting for node {} to change", thisNode);
waitForNodeChange(collectionName, slice, thisNode);
+ log.debug("Got node change");
}
}
}
@@ -237,19 +262,21 @@
int waitForNodeChange(String collectionName, Slice slice, String electionNode) throws InterruptedException, KeeperException {
String nodeName = LeaderElector.getNodeName(electionNode);
int oldSeq = LeaderElector.getSeq(electionNode);
+ log.debug("Waiting for sequence to be different from {}", oldSeq);
+ final String shardLeadersElectPath = ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName());
for (int idx = 0; idx < 600; ++idx) {
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
- ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+ shardLeadersElectPath);
for (String testNode : electionNodes) {
if (LeaderElector.getNodeName(testNode).equals(nodeName) && oldSeq != LeaderElector.getSeq(testNode)) {
return LeaderElector.getSeq(testNode);
}
}
-
- Thread.sleep(100);
- }
- return -1;
+ Thread.sleep(100);
+ }
+ log.debug("Waited long enough. Couldn't find updated node");
+ return -1;
}
private void rejoinElection(String collectionName, Slice slice, String electionNode, String core,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]