This is an automated email from the ASF dual-hosted git repository.
broustant pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new 89d911119dd SOLR-17054: Remove unused and duplicate code in
DistributedZkUpdateProcessor (#2038)
89d911119dd is described below
commit 89d911119dd0bd7de47644031257047cb505e482
Author: Vincent P <[email protected]>
AuthorDate: Thu Oct 26 10:58:35 2023 +0200
SOLR-17054: Remove unused and duplicate code in
DistributedZkUpdateProcessor (#2038)
Co-authored-by: Vincent Primault <[email protected]>
---
.../processor/DistributedZkUpdateProcessor.java | 98 ++++------------------
1 file changed, 17 insertions(+), 81 deletions(-)
diff --git
a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index a1e491c82bb..35d96a7c14a 100644
---
a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++
b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -179,15 +179,12 @@ public class DistributedZkUpdateProcessor extends
DistributedUpdateProcessor {
}
isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
- nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG,
Replica.Type.NRT), true);
+ nodes = getCollectionUrls(collection);
if (nodes == null) {
// This could happen if there are only pull replicas
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
- "Unable to distribute commit operation. No replicas available of
types "
- + Replica.Type.TLOG
- + " or "
- + Replica.Type.NRT);
+ "Unable to distribute commit operation. No leader replicas
available.");
}
nodes.removeIf(
@@ -212,13 +209,11 @@ public class DistributedZkUpdateProcessor extends
DistributedUpdateProcessor {
useNodes = nodes;
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
params.set(COMMIT_END_POINT, "leaders");
- if (useNodes != null) {
- params.set(
- DISTRIB_FROM,
- ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(),
req.getCore().getName()));
- cmdDistrib.distribCommit(cmd, useNodes, params);
- issuedDistribCommit = true;
- }
+ params.set(
+ DISTRIB_FROM,
+ ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(),
req.getCore().getName()));
+ cmdDistrib.distribCommit(cmd, useNodes, params);
+ issuedDistribCommit = true;
}
if (isLeader) {
@@ -232,7 +227,7 @@ public class DistributedZkUpdateProcessor extends
DistributedUpdateProcessor {
params.set(COMMIT_END_POINT, "replicas");
- useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(),
leaderReplica);
+ useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(),
leaderReplica, 0);
if (useNodes != null) {
params.set(
@@ -786,52 +781,7 @@ public class DistributedZkUpdateProcessor extends
DistributedUpdateProcessor {
// that means I want to forward onto my replicas...
// so get the replicas...
forwardToLeader = false;
- String leaderCoreNodeName = leaderReplica.getName();
- List<Replica> replicas =
- clusterState
- .getCollection(collection)
- .getSlice(shardId)
- .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
- replicas.removeIf((replica) ->
replica.getName().equals(leaderCoreNodeName));
- if (replicas.isEmpty()) {
- return null;
- }
-
- // check for test param that lets us miss replicas
- String[] skipList =
req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS);
- Set<String> skipListSet = null;
- if (skipList != null) {
- skipListSet = CollectionUtil.newHashSet(skipList.length);
- skipListSet.addAll(Arrays.asList(skipList));
- log.info("test.distrib.skip.servers was found and contains:{}",
skipListSet);
- }
-
- List<SolrCmdDistributor.Node> nodes = new ArrayList<>(replicas.size());
- skippedCoreNodeNames = new HashSet<>();
- ZkShardTerms zkShardTerms = zkController.getShardTerms(collection,
shardId);
- for (Replica replica : replicas) {
- String coreNodeName = replica.getName();
- if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
- if (log.isInfoEnabled()) {
- log.info("check url:{} against:{} result:true",
replica.getCoreUrl(), skipListSet);
- }
- } else if (zkShardTerms.registered(coreNodeName)
- && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
- if (log.isDebugEnabled()) {
- log.debug("skip url:{} cause its term is less than leader",
replica.getCoreUrl());
- }
- skippedCoreNodeNames.add(replica.getName());
- } else if
(!clusterState.getLiveNodes().contains(replica.getNodeName())
- || replica.getState() == Replica.State.DOWN) {
- skippedCoreNodeNames.add(replica.getName());
- } else {
- nodes.add(
- new SolrCmdDistributor.StdNode(
- new ZkCoreNodeProps(replica), collection, shardId,
maxRetriesToFollowers));
- }
- }
- return nodes;
-
+ return getReplicaNodesForLeader(shardId, leaderReplica,
maxRetriesToFollowers);
} else {
// I need to forward on to the leader...
forwardToLeader = true;
@@ -883,8 +833,7 @@ public class DistributedZkUpdateProcessor extends
DistributedUpdateProcessor {
}
}
- private List<SolrCmdDistributor.Node> getCollectionUrls(
- String collection, EnumSet<Replica.Type> types, boolean onlyLeaders) {
+ private List<SolrCmdDistributor.Node> getCollectionUrls(String collection) {
final DocCollection docCollection =
clusterState.getCollectionOrNull(collection);
if (docCollection == null || docCollection.getSlicesMap() == null) {
throw new ZooKeeperException(
@@ -894,24 +843,10 @@ public class DistributedZkUpdateProcessor extends
DistributedUpdateProcessor {
final List<SolrCmdDistributor.Node> urls = new ArrayList<>(slices.size());
for (Map.Entry<String, Slice> sliceEntry : slices.entrySet()) {
Slice replicas = slices.get(sliceEntry.getKey());
- if (onlyLeaders) {
- Replica replica = docCollection.getLeader(replicas.getName());
- if (replica != null) {
- ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(replica);
- urls.add(new SolrCmdDistributor.StdNode(nodeProps, collection,
replicas.getName()));
- }
- continue;
- }
- Map<String, Replica> shardMap = replicas.getReplicasMap();
-
- for (Map.Entry<String, Replica> entry : shardMap.entrySet()) {
- if (!types.contains(entry.getValue().getType())) {
- continue;
- }
- ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
- if (clusterState.liveNodesContain(nodeProps.getNodeName())) {
- urls.add(new SolrCmdDistributor.StdNode(nodeProps, collection,
replicas.getName()));
- }
+ Replica replica = docCollection.getLeader(replicas.getName());
+ if (replica != null) {
+ ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(replica);
+ urls.add(new SolrCmdDistributor.StdNode(nodeProps, collection,
replicas.getName()));
}
}
if (urls.isEmpty()) {
@@ -959,7 +894,7 @@ public class DistributedZkUpdateProcessor extends
DistributedUpdateProcessor {
}
protected List<SolrCmdDistributor.Node> getReplicaNodesForLeader(
- String shardId, Replica leaderReplica) {
+ String shardId, Replica leaderReplica, int maxRetries) {
String leaderCoreNodeName = leaderReplica.getName();
List<Replica> replicas =
clusterState
@@ -1000,7 +935,8 @@ public class DistributedZkUpdateProcessor extends
DistributedUpdateProcessor {
skippedCoreNodeNames.add(replica.getName());
} else {
nodes.add(
- new SolrCmdDistributor.StdNode(new ZkCoreNodeProps(replica),
collection, shardId));
+ new SolrCmdDistributor.StdNode(
+ new ZkCoreNodeProps(replica), collection, shardId,
maxRetries));
}
}
return nodes;