This is an automated email from the ASF dual-hosted git repository.
tflobbe pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new fd80652355e SOLR-16719: Let AffinityPlacementFactory have a
spread_domain label (#1577)
fd80652355e is described below
commit fd80652355e0aa7d1babd5b0f74febc3da659e5b
Author: Tomas Eduardo Fernandez Lobbe <[email protected]>
AuthorDate: Wed Apr 26 12:49:47 2023 -0700
SOLR-16719: Let AffinityPlacementFactory have a spread_domain label (#1577)
Add the option in AffinityPlacementFactory to spread replicas across
domains within each AZ. Optionally, the request can fail if more than a
configurable number of replicas will be placed in a single spread domain
---------
Co-authored-by: Houston Putman <[email protected]>
---
solr/CHANGES.txt | 3 +
.../placement/plugins/AffinityPlacementConfig.java | 36 +++
.../plugins/AffinityPlacementFactory.java | 329 ++++++++++++++++++---
.../plugins/AffinityPlacementFactoryTest.java | 280 ++++++++++++++++--
4 files changed, 582 insertions(+), 66 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index bec7bdacf68..9bf324b4891 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -17,6 +17,9 @@ New Features
* SOLR-16674: Introduced support for byte vector encoding in DenseVectorField
and KnnQParser (Elia Porciani via Alessandro Benedetti).
+* SOLR-16719: AffinityPlacementFactory now supports spreading replicas across
domains within the availablity zone and
+ optionally fail the request if more than a configurable number of replicas
need to be placed in a single domain. (Houston Putman, Tomás Fernández Löbbe)
+
Improvements
---------------------
diff --git
a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementConfig.java
b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementConfig.java
index 3fa984259be..b561b1f24aa 100644
---
a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementConfig.java
+++
b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementConfig.java
@@ -58,6 +58,14 @@ public class AffinityPlacementConfig implements
PlacementPluginConfig {
*/
public static final String NODE_TYPE_SYSPROP = "node_type";
+ /**
+ * Name of the system property on a node indicating the spread domain group.
This is used (if
+ * {@link #spreadAcrossDomains} is set to true) to indicate this placement
plugin that replicas
+ * for a particular shard should spread across nodes that have different
values for this system
+ * property.
+ */
+ public static final String SPREAD_DOMAIN_SYSPROP = "spread_domain";
+
/**
* This is the "AZ" name for nodes that do not define an AZ. Should not
match a real AZ name (I
* think we're safe)
@@ -96,6 +104,34 @@ public class AffinityPlacementConfig implements
PlacementPluginConfig {
*/
@JsonProperty public Map<String, String> collectionNodeType;
+ /**
+ * When this property is set to {@code true}, Solr will try to place
replicas for the same shard
+ * in nodes that have different value for the {@link #SPREAD_DOMAIN_SYSPROP}
System property. If
+ * more replicas exist (or are being placed) than the number of different
values for {@link
+ * #SPREAD_DOMAIN_SYSPROP} System property in nodes in the cluster, Solr
will attempt to
+ * distribute the placement of the replicas evenly across the domains but
will fail the placement
+ * if more than {@link #maxReplicasPerShardInDomain} are placed within a
single domain. Note that
+ * the domain groups are evaluated within a particular AZ (i.e. Solr will
not consider the
+ * placement of replicas in AZ1 when selecting candidate nodes for replicas
in AZ2). Example
+ * usages for this config are:
+ *
+ * <ul>
+ * <li>Rack diversity: You want replicas in different AZs but also, within
the AZ you want them
+ * in different racks
+ * <li>Host diversity: You are running multiple Solr instances in the same
host physical host.
+ * You want replicas in different AZs but also, within an AZ you want
replicas for the same
+ * shard to go in nodes that run in different hosts
+ * </ul>
+ */
+ @JsonProperty public Boolean spreadAcrossDomains = Boolean.FALSE;
+
+ /**
+ * Determines the maximum number of replicas of a particular type of a
particular shard that can
+ * be placed within a single domain (as defined by the @link
#SPREAD_DOMAIN_SYSPROP} System
+ * property.
+ */
+ @JsonProperty public Integer maxReplicasPerShardInDomain = -1;
+
/** Zero-arguments public constructor required for deserialization - don't
use. */
public AffinityPlacementConfig() {
this(DEFAULT_MINIMAL_FREE_DISK_GB, DEFAULT_PRIORITIZED_FREE_DISK_GB);
diff --git
a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
index 40a3e502b09..1ec0de525c7 100644
---
a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
+++
b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
@@ -36,6 +36,7 @@ import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.solr.cluster.Cluster;
@@ -170,7 +171,8 @@ public class AffinityPlacementFactory implements
PlacementPluginFactory<Affinity
config.minimalFreeDiskGB,
config.prioritizedFreeDiskGB,
config.withCollection,
- config.collectionNodeType);
+ config.collectionNodeType,
+ config.spreadAcrossDomains);
}
@Override
@@ -204,6 +206,8 @@ public class AffinityPlacementFactory implements
PlacementPluginFactory<Affinity
private final Random replicaPlacementRandom =
new Random(); // ok even if random sequence is predictable.
+ private final boolean spreadAcrossDomains;
+
/**
* The factory has decoded the configuration for the plugin instance and
passes it the
* parameters it needs.
@@ -212,11 +216,13 @@ public class AffinityPlacementFactory implements
PlacementPluginFactory<Affinity
long minimalFreeDiskGB,
long prioritizedFreeDiskGB,
Map<String, String> withCollections,
- Map<String, String> collectionNodeTypes) {
+ Map<String, String> collectionNodeTypes,
+ boolean spreadAcrossDomains) {
this.minimalFreeDiskGB = minimalFreeDiskGB;
this.prioritizedFreeDiskGB = prioritizedFreeDiskGB;
Objects.requireNonNull(withCollections, "withCollections must not be
null");
Objects.requireNonNull(collectionNodeTypes, "collectionNodeTypes must
not be null");
+ this.spreadAcrossDomains = spreadAcrossDomains;
this.withCollections = withCollections;
if (withCollections.isEmpty()) {
colocatedWith = Map.of();
@@ -265,7 +271,8 @@ public class AffinityPlacementFactory implements
PlacementPluginFactory<Affinity
attributeFetcher
.requestNodeSystemProperty(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP)
.requestNodeSystemProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP)
-
.requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP);
+
.requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
+
.requestNodeSystemProperty(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
attributeFetcher
.requestNodeMetric(NodeMetricImpl.NUM_CORES)
.requestNodeMetric(NodeMetricImpl.FREE_DISK_GB);
@@ -275,6 +282,8 @@ public class AffinityPlacementFactory implements
PlacementPluginFactory<Affinity
// to not end up always selecting the same node(s). This is used across
placement requests
Map<Node, Integer> allCoresOnNodes = getCoreCountPerNode(allNodes,
attrValues);
+ boolean doSpreadAcrossDomains = shouldSpreadAcrossDomains(allNodes,
attrValues);
+
// Keep track with nodesWithReplicas across requests
Map<String, Map<String, Set<Node>>> allNodesWithReplicas = new
HashMap<>();
for (PlacementRequest request : requests) {
@@ -344,7 +353,8 @@ public class AffinityPlacementFactory implements
PlacementPluginFactory<Affinity
nodesWithReplicas,
allCoresOnNodes,
placementContext.getPlacementPlanFactory(),
- replicaPlacements);
+ replicaPlacements,
+ doSpreadAcrossDomains);
}
}
placementPlans.add(
@@ -356,6 +366,27 @@ public class AffinityPlacementFactory implements
PlacementPluginFactory<Affinity
return placementPlans;
}
+ private boolean shouldSpreadAcrossDomains(Set<Node> allNodes,
AttributeValues attrValues) {
+ boolean doSpreadAcrossDomains =
+ spreadAcrossDomains && spreadDomainPropPresent(allNodes, attrValues);
+ if (spreadAcrossDomains && !doSpreadAcrossDomains) {
+ log.warn(
+ "AffinityPlacementPlugin configured to spread across domains, but
there are nodes in the cluster without the {} system property. Ignoring
spreadAcrossDomains.",
+ AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
+ }
+ return doSpreadAcrossDomains;
+ }
+
+ private boolean spreadDomainPropPresent(Set<Node> allNodes,
AttributeValues attrValues) {
+ // We can only use spread domains if all nodes have the system property
+ return allNodes.stream()
+ .noneMatch(
+ n ->
+ attrValues
+ .getSystemProperty(n,
AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
+ .isEmpty());
+ }
+
@Override
public void verifyAllowedModification(
ModificationRequest modificationRequest, PlacementContext
placementContext)
@@ -503,15 +534,187 @@ public class AffinityPlacementFactory implements
PlacementPluginFactory<Affinity
*/
private static class AzWithNodes {
final String azName;
- List<Node> availableNodesForPlacement;
- boolean hasBeenSorted;
-
- AzWithNodes(String azName, List<Node> availableNodesForPlacement) {
+ private final boolean useSpreadDomains;
+ private boolean listIsSorted = false;
+ private final Comparator<Node> nodeComparator;
+ private final Random random;
+ private final List<Node> availableNodesForPlacement;
+ private final AttributeValues attributeValues;
+ private TreeSet<SpreadDomainWithNodes> sortedSpreadDomains;
+ private final Map<String, Integer> currentSpreadDomainUsageUsage;
+ private int numNodesForPlacement;
+
+ AzWithNodes(
+ String azName,
+ List<Node> availableNodesForPlacement,
+ boolean useSpreadDomains,
+ Comparator<Node> nodeComparator,
+ Random random,
+ AttributeValues attributeValues,
+ Map<String, Integer> currentSpreadDomainUsageUsage) {
this.azName = azName;
this.availableNodesForPlacement = availableNodesForPlacement;
- // Once the list is sorted to an order we're happy with, this flag is
set to true to avoid
- // sorting multiple times unnecessarily.
- this.hasBeenSorted = false;
+ this.useSpreadDomains = useSpreadDomains;
+ this.nodeComparator = nodeComparator;
+ this.random = random;
+ this.attributeValues = attributeValues;
+ this.currentSpreadDomainUsageUsage = currentSpreadDomainUsageUsage;
+ this.numNodesForPlacement = availableNodesForPlacement.size();
+ }
+
+ private boolean hasBeenSorted() {
+ return (useSpreadDomains && sortedSpreadDomains != null)
+ || (!useSpreadDomains && listIsSorted);
+ }
+
+ void ensureSorted() {
+ if (!hasBeenSorted()) {
+ sort();
+ }
+ }
+
+ private void sort() {
+ assert !listIsSorted && sortedSpreadDomains == null
+ : "We shouldn't be sorting this list again";
+
+ // Make sure we do not tend to use always the same nodes (within an
AZ) if all
+ // conditions are identical (well, this likely is not the case since
after having added
+ // a replica to a node its number of cores increases for the next
placement decision,
+ // but let's be defensive here, given that multiple concurrent
placement decisions might
+ // see the same initial cluster state, and we want placement to be
reasonable even in
+ // that case without creating an unnecessary imbalance). For example,
if all nodes have
+ // 0 cores and same amount of free disk space, ideally we want to pick
a random node for
+ // placement, not always the same one due to some internal ordering.
+ Collections.shuffle(availableNodesForPlacement, random);
+
+ if (useSpreadDomains) {
+ // When we use spread domains, we don't just sort the list of nodes,
instead we generate a
+ // TreeSet of SpreadDomainWithNodes,
+ // sorted by the number of times the domain has been used. Each
+ // SpreadDomainWithNodes internally contains the list of nodes that
belong to that
+ // particular domain,
+ // and it's sorted internally by the comparator passed to this
+ // class (which is the same that's used when not using spread
domains).
+ // Whenever a node from a particular SpreadDomainWithNodes is
selected as the best
+ // candidate, the call to "removeBestNode" will:
+ // 1. Remove the SpreadDomainWithNodes instance from the TreeSet
+ // 2. Remove the best node from the list within the
SpreadDomainWithNodes
+ // 3. Increment the count of times the domain has been used
+ // 4. Re-add the SpreadDomainWithNodes instance to the TreeSet if
there are still nodes
+ // available
+ HashMap<String, List<Node>> spreadDomainToListOfNodesMap = new
HashMap<>();
+ for (Node node : availableNodesForPlacement) {
+ spreadDomainToListOfNodesMap
+ .computeIfAbsent(
+ attributeValues
+ .getSystemProperty(node,
AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
+ .get(),
+ k -> new ArrayList<>())
+ .add(node);
+ }
+ sortedSpreadDomains =
+ new TreeSet<>(new
SpreadDomainComparator(currentSpreadDomainUsageUsage));
+
+ int i = 0;
+ for (Map.Entry<String, List<Node>> entry :
spreadDomainToListOfNodesMap.entrySet()) {
+ // Sort the nodes within the spread domain by the provided
comparator
+ entry.getValue().sort(nodeComparator);
+ sortedSpreadDomains.add(
+ new SpreadDomainWithNodes(entry.getKey(), entry.getValue(),
i++, nodeComparator));
+ }
+ } else {
+ availableNodesForPlacement.sort(nodeComparator);
+ listIsSorted = true;
+ }
+ }
+
+ Node getBestNode() {
+ assert hasBeenSorted();
+ if (useSpreadDomains) {
+ return sortedSpreadDomains.first().sortedNodesForPlacement.get(0);
+ } else {
+ return availableNodesForPlacement.get(0);
+ }
+ }
+
+ public Node removeBestNode() {
+ assert hasBeenSorted();
+ this.numNodesForPlacement--;
+ if (useSpreadDomains) {
+ // Since this SpreadDomainWithNodes needs to be re-sorted in the
sortedSpreadDomains, we
+ // remove it and then re-add it, once the best node has been removed.
+ SpreadDomainWithNodes group = sortedSpreadDomains.pollFirst();
+ Node n = group.sortedNodesForPlacement.remove(0);
+ this.currentSpreadDomainUsageUsage.merge(group.spreadDomainName, 1,
Integer::sum);
+ if (!group.sortedNodesForPlacement.isEmpty()) {
+ sortedSpreadDomains.add(group);
+ }
+ return n;
+ } else {
+ return availableNodesForPlacement.remove(0);
+ }
+ }
+
+ public int numNodes() {
+ return this.numNodesForPlacement;
+ }
+ }
+
+ /**
+ * This class represents group of nodes with the same {@link
+ * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label.
+ */
+ static class SpreadDomainWithNodes implements
Comparable<SpreadDomainWithNodes> {
+
+ /**
+ * This is the label that all nodes in this group have in {@link
+ * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label.
+ */
+ final String spreadDomainName;
+
+ /**
+ * The list of all nodes that contain the same {@link
+ * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label. They must be
sorted before creating
+ * this class.
+ */
+ private final List<Node> sortedNodesForPlacement;
+
+ /**
+ * This is used for tie breaking the sort of {@link
SpreadDomainWithNodes}, when the
+ * nodeComparator between the top nodes of each group return 0.
+ */
+ private final int tieBreaker;
+
+ /**
+ * This is the comparator that is used to compare the top nodes in the
{@link
+ * #sortedNodesForPlacement} lists. Must be the same that was used to
sort {@link
+ * #sortedNodesForPlacement}.
+ */
+ private final Comparator<Node> nodeComparator;
+
+ public SpreadDomainWithNodes(
+ String spreadDomainName,
+ List<Node> sortedNodesForPlacement,
+ int tieBreaker,
+ Comparator<Node> nodeComparator) {
+ this.spreadDomainName = spreadDomainName;
+ this.sortedNodesForPlacement = sortedNodesForPlacement;
+ this.tieBreaker = tieBreaker;
+ this.nodeComparator = nodeComparator;
+ }
+
+ @Override
+ public int compareTo(SpreadDomainWithNodes o) {
+ if (o == this) {
+ return 0;
+ }
+ int result =
+ nodeComparator.compare(
+ this.sortedNodesForPlacement.get(0),
o.sortedNodesForPlacement.get(0));
+ if (result == 0) {
+ return Integer.compare(this.tieBreaker, o.tieBreaker);
+ }
+ return result;
}
}
@@ -658,7 +861,8 @@ public class AffinityPlacementFactory implements
PlacementPluginFactory<Affinity
Set<Node> nodesWithReplicas,
Map<Node, Integer> coresOnNodes,
PlacementPlanFactory placementPlanFactory,
- Set<ReplicaPlacement> replicaPlacements)
+ Set<ReplicaPlacement> replicaPlacements,
+ boolean doSpreadAcrossDomains)
throws PlacementException {
// Count existing replicas per AZ. We count only instances of the type
of replica for which we
// need to do placement. If we ever want to balance replicas of any type
across AZ's (and not
@@ -679,6 +883,9 @@ public class AffinityPlacementFactory implements
PlacementPluginFactory<Affinity
// be put on same node)
candidateNodes.removeAll(nodesWithReplicas);
+ // This Map will include the affinity labels for the nodes that are
currently hosting replicas
+ // of this shard. It will be modified with new placement decisions.
+ Map<String, Integer> spreadDomainsInUse = new HashMap<>();
Shard shard = solrCollection.getShard(shardName);
if (shard != null) {
// shard is non null if we're adding replicas to an already existing
collection.
@@ -694,6 +901,15 @@ public class AffinityPlacementFactory implements
PlacementPluginFactory<Affinity
// the dereferencing below can't be assumed as the entry will
not exist in the map.
azToNumReplicas.put(az, azToNumReplicas.get(az) + 1);
}
+ if (doSpreadAcrossDomains) {
+ spreadDomainsInUse.merge(
+ attrValues
+ .getSystemProperty(
+ replica.getNode(),
AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
+ .get(),
+ 1,
+ Integer::sum);
+ }
}
}
}
@@ -703,15 +919,25 @@ public class AffinityPlacementFactory implements
PlacementPluginFactory<Affinity
// consideration how many replicas were per AZ, so we can place (or try
to place) replicas on
// AZ's that have fewer replicas
- // Get the candidate nodes per AZ in order to build (further down) a
mapping of AZ to
- // placement candidates.
Map<String, List<Node>> nodesPerAz = new HashMap<>();
- for (Node node : candidateNodes) {
- String nodeAz = getNodeAZ(node, attrValues);
- List<Node> nodesForAz = nodesPerAz.computeIfAbsent(nodeAz, k -> new
ArrayList<>());
- nodesForAz.add(node);
+ if (availabilityZones.size() == 1) {
+ // If AZs are not being used (all undefined for example) or a single
AZ exists, we add all
+ // nodes
+ // to the same entry
+ nodesPerAz.put(availabilityZones.iterator().next(), new
ArrayList<>(candidateNodes));
+ } else {
+ // Get the candidate nodes per AZ in order to build (further down) a
mapping of AZ to
+ // placement candidates.
+ for (Node node : candidateNodes) {
+ String nodeAz = getNodeAZ(node, attrValues);
+ List<Node> nodesForAz = nodesPerAz.computeIfAbsent(nodeAz, k -> new
ArrayList<>());
+ nodesForAz.add(node);
+ }
}
+ Comparator<Node> interGroupNodeComparator =
+ new CoresAndDiskComparator(attrValues, coresOnNodes,
prioritizedFreeDiskGB);
+
// Build a treeMap sorted by the number of replicas per AZ and including
candidates nodes
// suitable for placement on the AZ, so we can easily select the next AZ
to get a replica
// assignment and quickly (constant time) decide if placement on this AZ
is possible or not.
@@ -720,12 +946,17 @@ public class AffinityPlacementFactory implements
PlacementPluginFactory<Affinity
for (Map.Entry<String, List<Node>> e : nodesPerAz.entrySet()) {
azByExistingReplicas
.computeIfAbsent(azToNumReplicas.get(e.getKey()), k -> new
HashSet<>())
- .add(new AzWithNodes(e.getKey(), e.getValue()));
+ .add(
+ new AzWithNodes(
+ e.getKey(),
+ e.getValue(),
+ doSpreadAcrossDomains,
+ interGroupNodeComparator,
+ replicaPlacementRandom,
+ attrValues,
+ spreadDomainsInUse));
}
- CoresAndDiskComparator coresAndDiskComparator =
- new CoresAndDiskComparator(attrValues, coresOnNodes,
prioritizedFreeDiskGB);
-
for (int i = 0; i < numReplicas; i++) {
// We have for each AZ on which we might have a chance of placing a
replica, the list of
// candidate nodes for replicas (candidate: does not already have a
replica of this shard
@@ -745,7 +976,7 @@ public class AffinityPlacementFactory implements
PlacementPluginFactory<Affinity
Iterator<AzWithNodes> it = mapEntry.getValue().iterator();
while (it.hasNext()) {
Map.Entry<Integer, AzWithNodes> entry =
Map.entry(mapEntry.getKey(), it.next());
- int numberOfNodes =
entry.getValue().availableNodesForPlacement.size();
+ int numberOfNodes = entry.getValue().numNodes();
if (numberOfNodes == 0) {
it.remove();
} else { // AZ does have node(s) for placement
@@ -788,38 +1019,20 @@ public class AffinityPlacementFactory implements
PlacementPluginFactory<Affinity
Node selectedAzBestNode = null;
for (Map.Entry<Integer, AzWithNodes> candidateAzEntry :
candidateAzEntries) {
AzWithNodes azWithNodes = candidateAzEntry.getValue();
- List<Node> nodes = azWithNodes.availableNodesForPlacement;
-
- if (!azWithNodes.hasBeenSorted) {
- // Make sure we do not tend to use always the same nodes (within
an AZ) if all
- // conditions are identical (well, this likely is not the case
since after having added
- // a replica to a node its number of cores increases for the next
placement decision,
- // but let's be defensive here, given that multiple concurrent
placement decisions might
- // see the same initial cluster state, and we want placement to be
reasonable even in
- // that case without creating an unnecessary imbalance). For
example, if all nodes have
- // 0 cores and same amount of free disk space, ideally we want to
pick a random node for
- // placement, not always the same one due to some internal
ordering.
- Collections.shuffle(nodes, replicaPlacementRandom);
-
- // Sort by increasing number of cores but pushing nodes with low
free disk space to the
- // end of the list
- nodes.sort(coresAndDiskComparator);
-
- azWithNodes.hasBeenSorted = true;
- }
+ azWithNodes.ensureSorted();
// Which one is better, the new one or the previous best?
if (selectedAz == null
- || coresAndDiskComparator.compare(nodes.get(0),
selectedAzBestNode) < 0) {
+ || interGroupNodeComparator.compare(azWithNodes.getBestNode(),
selectedAzBestNode)
+ < 0) {
selectedAz = candidateAzEntry;
- selectedAzBestNode = nodes.get(0);
+ selectedAzBestNode = azWithNodes.getBestNode();
}
}
// Now actually remove the selected node from the winning AZ
AzWithNodes azWithNodes = selectedAz.getValue();
- List<Node> nodes = selectedAz.getValue().availableNodesForPlacement;
- Node assignTarget = nodes.remove(0);
+ Node assignTarget = azWithNodes.removeBestNode();
// Insert back all the qualifying but non winning AZ's removed while
searching for the one
for (Map.Entry<Integer, AzWithNodes> removedAzs : candidateAzEntries) {
@@ -971,5 +1184,29 @@ public class AffinityPlacementFactory implements
PlacementPluginFactory<Affinity
return Integer.compare(coresOnNodes.get(a), coresOnNodes.get(b));
}
}
+
+ static class SpreadDomainComparator implements
Comparator<SpreadDomainWithNodes> {
+ private final Map<String, Integer> spreadDomainUsage;
+
+ SpreadDomainComparator(Map<String, Integer> spreadDomainUsage) {
+ this.spreadDomainUsage = spreadDomainUsage;
+ }
+
+ @Override
+ public int compare(SpreadDomainWithNodes group1, SpreadDomainWithNodes
group2) {
+ // This comparator will compare groups by:
+ // 1. The number of usages for the domain they represent: We want
groups that are
+ // less used to be the best ones
+ // 2. On equal number of usages, by the internal comparator (which
uses core count and disk
+ // space) on the best node for each group (which, since the list is
sorted, it's always the
+ // one in the position 0)
+ Integer usagesLabel1 =
spreadDomainUsage.getOrDefault(group1.spreadDomainName, 0);
+ Integer usagesLabel2 =
spreadDomainUsage.getOrDefault(group2.spreadDomainName, 0);
+ if (usagesLabel1.equals(usagesLabel2)) {
+ return group1.compareTo(group2);
+ }
+ return usagesLabel1.compareTo(usagesLabel2);
+ }
+ }
}
}
diff --git
a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
index 7f9c48301e9..0e1665116ac 100644
---
a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
+++
b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
@@ -18,6 +18,8 @@
package org.apache.solr.cluster.placement.plugins;
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -1076,40 +1078,63 @@ public class AffinityPlacementFactoryTest extends
SolrTestCaseJ4 {
int pullReplicas = TEST_NIGHTLY ? 20 : 2;
log.info("==== numNodes ====");
- runTestScalability(1000, numShards, nrtReplicas, tlogReplicas,
pullReplicas);
- runTestScalability(2000, numShards, nrtReplicas, tlogReplicas,
pullReplicas);
- runTestScalability(5000, numShards, nrtReplicas, tlogReplicas,
pullReplicas);
- runTestScalability(10000, numShards, nrtReplicas, tlogReplicas,
pullReplicas);
- runTestScalability(20000, numShards, nrtReplicas, tlogReplicas,
pullReplicas);
+ runTestScalability(1000, numShards, nrtReplicas, tlogReplicas,
pullReplicas, false);
+ runTestScalability(2000, numShards, nrtReplicas, tlogReplicas,
pullReplicas, false);
+ runTestScalability(5000, numShards, nrtReplicas, tlogReplicas,
pullReplicas, false);
+ runTestScalability(10000, numShards, nrtReplicas, tlogReplicas,
pullReplicas, false);
+ runTestScalability(20000, numShards, nrtReplicas, tlogReplicas,
pullReplicas, false);
log.info("==== numShards ====");
int numNodes = TEST_NIGHTLY ? 5000 : 500;
- runTestScalability(numNodes, 100, nrtReplicas, tlogReplicas, pullReplicas);
- runTestScalability(numNodes, 200, nrtReplicas, tlogReplicas, pullReplicas);
- runTestScalability(numNodes, 500, nrtReplicas, tlogReplicas, pullReplicas);
+ runTestScalability(numNodes, 100, nrtReplicas, tlogReplicas, pullReplicas,
false);
+ runTestScalability(numNodes, 200, nrtReplicas, tlogReplicas, pullReplicas,
false);
+ runTestScalability(numNodes, 500, nrtReplicas, tlogReplicas, pullReplicas,
false);
if (TEST_NIGHTLY) {
- runTestScalability(numNodes, 1000, nrtReplicas, tlogReplicas,
pullReplicas);
- runTestScalability(numNodes, 2000, nrtReplicas, tlogReplicas,
pullReplicas);
+ runTestScalability(numNodes, 1000, nrtReplicas, tlogReplicas,
pullReplicas, false);
+ runTestScalability(numNodes, 2000, nrtReplicas, tlogReplicas,
pullReplicas, false);
}
log.info("==== numReplicas ====");
- runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 100 : 10, 0, 0);
- runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 200 : 20, 0, 0);
- runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 500 : 50, 0, 0);
- runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 1000 : 30, 0, 0);
- runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 2000 : 50, 0, 0);
+ runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 100 : 10, 0, 0,
false);
+ runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 200 : 20, 0, 0,
false);
+ runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 500 : 50, 0, 0,
false);
+ runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 1000 : 30, 0, 0,
false);
+ runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 2000 : 50, 0, 0,
false);
+
+ log.info("==== spread domains ====");
+ runTestScalability(numNodes, numShards, nrtReplicas, tlogReplicas,
pullReplicas, false);
+ runTestScalability(numNodes, numShards, nrtReplicas, tlogReplicas,
pullReplicas, true);
}
private void runTestScalability(
- int numNodes, int numShards, int nrtReplicas, int tlogReplicas, int
pullReplicas)
+ int numNodes,
+ int numShards,
+ int nrtReplicas,
+ int tlogReplicas,
+ int pullReplicas,
+ boolean useSpreadDomains)
throws Exception {
String collectionName = "scaleCollection";
+ if (useSpreadDomains) {
+ defaultConfig.spreadAcrossDomains = true;
+ configurePlugin(defaultConfig);
+ } else {
+ defaultConfig.spreadAcrossDomains = false;
+ configurePlugin(defaultConfig);
+ }
+
+ int numSpreadDomains = 5;
+
Builders.ClusterBuilder clusterBuilder =
Builders.newClusterBuilder().initializeLiveNodes(numNodes);
List<Builders.NodeBuilder> nodeBuilders =
clusterBuilder.getLiveNodeBuilders();
for (int i = 0; i < numNodes; i++) {
nodeBuilders.get(i).setCoreCount(0).setFreeDiskGB((double) numNodes);
+ nodeBuilders
+ .get(i)
+ .setSysprop(
+ AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP, String.valueOf(i
% numSpreadDomains));
}
Builders.CollectionBuilder collectionBuilder =
Builders.newCollectionBuilder(collectionName);
@@ -1118,6 +1143,8 @@ public class AffinityPlacementFactoryTest extends
SolrTestCaseJ4 {
PlacementContext placementContext = clusterBuilder.buildPlacementContext();
SolrCollection solrCollection = collectionBuilder.build();
List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+ Set<Node> setNodes = new HashSet<>(liveNodes);
+ assertEquals(setNodes.size(), liveNodes.size());
// Place replicas for all the shards of the (newly created since it has no
replicas yet)
// collection
@@ -1138,10 +1165,11 @@ public class AffinityPlacementFactoryTest extends
SolrTestCaseJ4 {
final int TOTAL_REPLICAS = numShards * REPLICAS_PER_SHARD;
log.info(
- "ComputePlacement: {} nodes, {} shards, {} total replicas, elapsed
time {} ms.",
+ "ComputePlacement: {} nodes, {} shards, {} total replicas,
spread-domains={}, elapsed time {} ms.",
numNodes,
numShards,
TOTAL_REPLICAS,
+ useSpreadDomains,
TimeUnit.NANOSECONDS.toMillis(end - start)); // nowarn
assertEquals(
"incorrect number of calculated placements",
@@ -1169,16 +1197,228 @@ public class AffinityPlacementFactoryTest extends
SolrTestCaseJ4 {
int perNode = TOTAL_REPLICAS > numNodes ? TOTAL_REPLICAS / numNodes : 1;
replicasPerNode.forEach(
(node, count) -> {
- assertEquals(count.get(), perNode);
+ assertEquals(perNode, count.get());
});
shardsPerNode.forEach(
(node, names) -> {
- assertEquals(names.size(), perNode);
+ assertEquals(perNode, names.size());
});
replicasPerShard.forEach(
(shard, count) -> {
- assertEquals(count.get(), REPLICAS_PER_SHARD);
+ assertEquals(REPLICAS_PER_SHARD, count.get());
});
}
+
+ @Test
+ public void testAntiAffinityIsSoft() throws Exception {
+ defaultConfig.spreadAcrossDomains = true;
+ configurePlugin(defaultConfig);
+ String collectionName = "basicCollection";
+
+ Builders.ClusterBuilder clusterBuilder =
Builders.newClusterBuilder().initializeLiveNodes(2);
+ List<Builders.NodeBuilder> nodeBuilders =
clusterBuilder.getLiveNodeBuilders();
+ // Anti-affinity can't be achieved, since all nodes in the cluster have
the same value
+ nodeBuilders
+ .get(0)
+ .setCoreCount(1)
+ .setFreeDiskGB((double) (PRIORITIZED_FREE_DISK_GB + 1))
+ .setSysprop(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP, "A");
+ nodeBuilders
+ .get(1)
+ .setCoreCount(2)
+ .setFreeDiskGB((double) (PRIORITIZED_FREE_DISK_GB + 1))
+ .setSysprop(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP, "A");
+
+ Builders.CollectionBuilder collectionBuilder =
Builders.newCollectionBuilder(collectionName);
+ collectionBuilder.initializeShardsReplicas(1, 0, 0, 0, List.of());
+
+ PlacementContext placementContext = clusterBuilder.buildPlacementContext();
+
+ SolrCollection solrCollection = collectionBuilder.build();
+ List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+ {
+ // Place a new replica for the (only) existing shard of the collection
+ PlacementRequestImpl placementRequest =
+ new PlacementRequestImpl(
+ solrCollection,
+ Set.of(solrCollection.shards().iterator().next().getShardName()),
+ new HashSet<>(liveNodes),
+ 1,
+ 0,
+ 0);
+
+ PlacementPlan pp = plugin.computePlacement(placementRequest,
placementContext);
+
+ assertEquals(1, pp.getReplicaPlacements().size());
+ ReplicaPlacement rp = pp.getReplicaPlacements().iterator().next();
+ assertEquals(liveNodes.get(0), rp.getNode());
+ }
+ {
+ // Place a new replica for the (only) existing shard of the collection
+ PlacementRequestImpl placementRequest =
+ new PlacementRequestImpl(
+ solrCollection,
+ Set.of(solrCollection.shards().iterator().next().getShardName()),
+ new HashSet<>(liveNodes),
+ 2,
+ 0,
+ 0);
+
+ PlacementPlan pp = plugin.computePlacement(placementRequest,
placementContext);
+
+ assertEquals(2, pp.getReplicaPlacements().size());
+ assertEquals(
+ Set.of(liveNodes.get(0), liveNodes.get(1)),
+ pp.getReplicaPlacements().stream()
+ .map(ReplicaPlacement::getNode)
+ .collect(Collectors.toSet()));
+ }
+ }
+
+ @Test
+ public void testSpreadDomainsWithExistingCollection() throws Exception {
+ testSpreadDomains(true);
+ }
+
+ @Test
+ public void testSpreadDomainsWithEmptyCluster() throws Exception {
+ testSpreadDomains(false);
+ }
+
+ private void testSpreadDomains(boolean hasExistingCollection) throws
Exception {
+ defaultConfig.spreadAcrossDomains = true;
+ configurePlugin(defaultConfig);
+ String collectionName = "basicCollection";
+
+ Builders.ClusterBuilder clusterBuilder =
Builders.newClusterBuilder().initializeLiveNodes(3);
+ List<Builders.NodeBuilder> nodeBuilders =
clusterBuilder.getLiveNodeBuilders();
+ nodeBuilders
+ .get(0)
+ .setCoreCount(1)
+ .setFreeDiskGB((double) (PRIORITIZED_FREE_DISK_GB + 1))
+ .setSysprop(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP, "A");
+ nodeBuilders
+ .get(1)
+ .setCoreCount(2)
+ .setFreeDiskGB((double) (PRIORITIZED_FREE_DISK_GB + 1))
+ .setSysprop(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP, "A");
+ nodeBuilders
+ .get(2)
+ .setCoreCount(3)
+ .setFreeDiskGB((double) (PRIORITIZED_FREE_DISK_GB + 1))
+ .setSysprop(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP, "B");
+
+ Builders.CollectionBuilder collectionBuilder =
Builders.newCollectionBuilder(collectionName);
+
+ if (hasExistingCollection) {
+ // Existing collection has replicas for its shards and is visible in the
cluster state
+ collectionBuilder.initializeShardsReplicas(1, 1, 0, 0, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
+ } else {
+ // New collection to create has the shards defined but no replicas and
is not present in
+ // cluster state
+ collectionBuilder.initializeShardsReplicas(1, 0, 0, 0, List.of());
+ }
+
+ PlacementContext placementContext = clusterBuilder.buildPlacementContext();
+
+ SolrCollection solrCollection = collectionBuilder.build();
+ List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+ {
+ // Place a new replica for the (only) existing shard of the collection
+ PlacementRequestImpl placementRequest =
+ new PlacementRequestImpl(
+ solrCollection,
+ Set.of(solrCollection.shards().iterator().next().getShardName()),
+ new HashSet<>(liveNodes),
+ 1,
+ 0,
+ 0);
+
+ PlacementPlan pp = plugin.computePlacement(placementRequest,
placementContext);
+
+ assertEquals(1, pp.getReplicaPlacements().size());
+ ReplicaPlacement rp = pp.getReplicaPlacements().iterator().next();
+ assertEquals(hasExistingCollection ? liveNodes.get(2) :
liveNodes.get(0), rp.getNode());
+ }
+ {
+ // Place a new replica for the (only) existing shard of the collection
+ PlacementRequestImpl placementRequest =
+ new PlacementRequestImpl(
+ solrCollection,
+ Set.of(solrCollection.shards().iterator().next().getShardName()),
+ new HashSet<>(liveNodes),
+ 2,
+ 0,
+ 0);
+
+ PlacementPlan pp = plugin.computePlacement(placementRequest,
placementContext);
+
+ assertEquals(2, pp.getReplicaPlacements().size());
+ assertEquals(
+ hasExistingCollection
+ ? Set.of(liveNodes.get(1), liveNodes.get(2))
+ : Set.of(liveNodes.get(0), liveNodes.get(2)),
+ pp.getReplicaPlacements().stream()
+ .map(ReplicaPlacement::getNode)
+ .collect(Collectors.toSet()));
+ }
+ }
+
+ @Test
+ @SuppressWarnings("SelfComparison")
+ public void testCompareSpreadDomainWithNodes() {
+ Builders.ClusterBuilder clusterBuilder =
Builders.newClusterBuilder().initializeLiveNodes(3);
+ final List<Builders.NodeBuilder> nodeBuilders =
clusterBuilder.getLiveNodeBuilders();
+ nodeBuilders.get(0).setNodeName("nodeA");
+ nodeBuilders.get(1).setNodeName("nodeB");
+ nodeBuilders.get(2).setNodeName("nodeC");
+
+ Cluster cluster = clusterBuilder.build();
+ Node nodeA =
+ cluster.getLiveNodes().stream()
+ .filter((n) -> n.getName().equals("nodeA"))
+ .findFirst()
+ .get();
+ Node nodeB =
+ cluster.getLiveNodes().stream()
+ .filter((n) -> n.getName().equals("nodeB"))
+ .findFirst()
+ .get();
+ Node nodeC =
+ cluster.getLiveNodes().stream()
+ .filter((n) -> n.getName().equals("nodeC"))
+ .findFirst()
+ .get();
+
+ Comparator<Node> nodeComparator = Comparator.comparing(Node::getName);
+ List<Node> listInGroup1 = new ArrayList<>(List.of(nodeC, nodeA));
+ AffinityPlacementFactory.AffinityPlacementPlugin.SpreadDomainWithNodes
group1 =
+ new
AffinityPlacementFactory.AffinityPlacementPlugin.SpreadDomainWithNodes(
+ "foo", listInGroup1, 0, nodeComparator);
+ AffinityPlacementFactory.AffinityPlacementPlugin.SpreadDomainWithNodes
group2 =
+ new
AffinityPlacementFactory.AffinityPlacementPlugin.SpreadDomainWithNodes(
+ "bar", List.of(nodeB), 1, nodeComparator);
+ assertEquals("Comparing to itself should return 0", 0,
group1.compareTo(group1));
+ assertEquals(
+ "group 1 should be greater, since 'nodeC' is greater than 'nodeB",
+ 1,
+ group1.compareTo(group2));
+ assertEquals(
+ "group 1 should be greater, since 'nodeC' is greater than 'nodeB",
+ -1,
+ group2.compareTo(group1));
+ listInGroup1.remove(0);
+ assertEquals(
+ "group 1 should be greater, since 'nodeB' is greater than 'nodeA",
+ -1,
+ group1.compareTo(group2));
+ listInGroup1.remove(0);
+ listInGroup1.add(nodeB);
+ assertEquals(
+ "group 1 should be greater because of the tie breaker", -1,
group1.compareTo(group2));
+ }
}