This is an automated email from the ASF dual-hosted git repository.

tflobbe pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new fd80652355e SOLR-16719: Let AffinityPlacementFactory have a 
spread_domain label (#1577)
fd80652355e is described below

commit fd80652355e0aa7d1babd5b0f74febc3da659e5b
Author: Tomas Eduardo Fernandez Lobbe <[email protected]>
AuthorDate: Wed Apr 26 12:49:47 2023 -0700

    SOLR-16719: Let AffinityPlacementFactory have a spread_domain label (#1577)
    
    Add the option in AffinityPlacementFactory to spread replicas across 
domains within each AZ. Optionally, the request can fail if more than a 
configurable number of replicas will be placed in a single spread domain
    
    ---------
    
    Co-authored-by: Houston Putman <[email protected]>
---
 solr/CHANGES.txt                                   |   3 +
 .../placement/plugins/AffinityPlacementConfig.java |  36 +++
 .../plugins/AffinityPlacementFactory.java          | 329 ++++++++++++++++++---
 .../plugins/AffinityPlacementFactoryTest.java      | 280 ++++++++++++++++--
 4 files changed, 582 insertions(+), 66 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index bec7bdacf68..9bf324b4891 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -17,6 +17,9 @@ New Features
 
 * SOLR-16674: Introduced support for byte vector encoding in DenseVectorField 
and KnnQParser (Elia Porciani via Alessandro Benedetti).
 
+* SOLR-16719: AffinityPlacementFactory now supports spreading replicas across 
domains within the availablity zone and
+  optionally fail the request if more than a configurable number of replicas 
need to be placed in a single domain. (Houston Putman, Tomás Fernández Löbbe)
+
 Improvements
 ---------------------
 
diff --git 
a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementConfig.java
 
b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementConfig.java
index 3fa984259be..b561b1f24aa 100644
--- 
a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementConfig.java
+++ 
b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementConfig.java
@@ -58,6 +58,14 @@ public class AffinityPlacementConfig implements 
PlacementPluginConfig {
    */
   public static final String NODE_TYPE_SYSPROP = "node_type";
 
+  /**
+   * Name of the system property on a node indicating the spread domain group. 
This is used (if
+   * {@link #spreadAcrossDomains} is set to true) to indicate this placement 
plugin that replicas
+   * for a particular shard should spread across nodes that have different 
values for this system
+   * property.
+   */
+  public static final String SPREAD_DOMAIN_SYSPROP = "spread_domain";
+
   /**
    * This is the "AZ" name for nodes that do not define an AZ. Should not 
match a real AZ name (I
    * think we're safe)
@@ -96,6 +104,34 @@ public class AffinityPlacementConfig implements 
PlacementPluginConfig {
    */
   @JsonProperty public Map<String, String> collectionNodeType;
 
+  /**
+   * When this property is set to {@code true}, Solr will try to place 
replicas for the same shard
+   * in nodes that have different value for the {@link #SPREAD_DOMAIN_SYSPROP} 
System property. If
+   * more replicas exist (or are being placed) than the number of different 
values for {@link
+   * #SPREAD_DOMAIN_SYSPROP} System property in nodes in the cluster, Solr 
will attempt to
+   * distribute the placement of the replicas evenly across the domains but 
will fail the placement
+   * if more than {@link #maxReplicasPerShardInDomain} are placed within a 
single domain. Note that
+   * the domain groups are evaluated within a particular AZ (i.e. Solr will 
not consider the
+   * placement of replicas in AZ1 when selecting candidate nodes for replicas 
in AZ2). Example
+   * usages for this config are:
+   *
+   * <ul>
+   *   <li>Rack diversity: You want replicas in different AZs but also, within 
the AZ you want them
+   *       in different racks
+   *   <li>Host diversity: You are running multiple Solr instances in the same 
host physical host.
+   *       You want replicas in different AZs but also, within an AZ you want 
replicas for the same
+   *       shard to go in nodes that run in different hosts
+   * </ul>
+   */
+  @JsonProperty public Boolean spreadAcrossDomains = Boolean.FALSE;
+
+  /**
+   * Determines the maximum number of replicas of a particular type of a 
particular shard that can
+   * be placed within a single domain (as defined by the @link 
#SPREAD_DOMAIN_SYSPROP} System
+   * property.
+   */
+  @JsonProperty public Integer maxReplicasPerShardInDomain = -1;
+
   /** Zero-arguments public constructor required for deserialization - don't 
use. */
   public AffinityPlacementConfig() {
     this(DEFAULT_MINIMAL_FREE_DISK_GB, DEFAULT_PRIORITIZED_FREE_DISK_GB);
diff --git 
a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
 
b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
index 40a3e502b09..1ec0de525c7 100644
--- 
a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
+++ 
b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
@@ -36,6 +36,7 @@ import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.solr.cluster.Cluster;
@@ -170,7 +171,8 @@ public class AffinityPlacementFactory implements 
PlacementPluginFactory<Affinity
         config.minimalFreeDiskGB,
         config.prioritizedFreeDiskGB,
         config.withCollection,
-        config.collectionNodeType);
+        config.collectionNodeType,
+        config.spreadAcrossDomains);
   }
 
   @Override
@@ -204,6 +206,8 @@ public class AffinityPlacementFactory implements 
PlacementPluginFactory<Affinity
     private final Random replicaPlacementRandom =
         new Random(); // ok even if random sequence is predictable.
 
+    private final boolean spreadAcrossDomains;
+
     /**
      * The factory has decoded the configuration for the plugin instance and 
passes it the
      * parameters it needs.
@@ -212,11 +216,13 @@ public class AffinityPlacementFactory implements 
PlacementPluginFactory<Affinity
         long minimalFreeDiskGB,
         long prioritizedFreeDiskGB,
         Map<String, String> withCollections,
-        Map<String, String> collectionNodeTypes) {
+        Map<String, String> collectionNodeTypes,
+        boolean spreadAcrossDomains) {
       this.minimalFreeDiskGB = minimalFreeDiskGB;
       this.prioritizedFreeDiskGB = prioritizedFreeDiskGB;
       Objects.requireNonNull(withCollections, "withCollections must not be 
null");
       Objects.requireNonNull(collectionNodeTypes, "collectionNodeTypes must 
not be null");
+      this.spreadAcrossDomains = spreadAcrossDomains;
       this.withCollections = withCollections;
       if (withCollections.isEmpty()) {
         colocatedWith = Map.of();
@@ -265,7 +271,8 @@ public class AffinityPlacementFactory implements 
PlacementPluginFactory<Affinity
       attributeFetcher
           
.requestNodeSystemProperty(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP)
           .requestNodeSystemProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP)
-          
.requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP);
+          
.requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
+          
.requestNodeSystemProperty(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
       attributeFetcher
           .requestNodeMetric(NodeMetricImpl.NUM_CORES)
           .requestNodeMetric(NodeMetricImpl.FREE_DISK_GB);
@@ -275,6 +282,8 @@ public class AffinityPlacementFactory implements 
PlacementPluginFactory<Affinity
       // to not end up always selecting the same node(s). This is used across 
placement requests
       Map<Node, Integer> allCoresOnNodes = getCoreCountPerNode(allNodes, 
attrValues);
 
+      boolean doSpreadAcrossDomains = shouldSpreadAcrossDomains(allNodes, 
attrValues);
+
       // Keep track with nodesWithReplicas across requests
       Map<String, Map<String, Set<Node>>> allNodesWithReplicas = new 
HashMap<>();
       for (PlacementRequest request : requests) {
@@ -344,7 +353,8 @@ public class AffinityPlacementFactory implements 
PlacementPluginFactory<Affinity
                 nodesWithReplicas,
                 allCoresOnNodes,
                 placementContext.getPlacementPlanFactory(),
-                replicaPlacements);
+                replicaPlacements,
+                doSpreadAcrossDomains);
           }
         }
         placementPlans.add(
@@ -356,6 +366,27 @@ public class AffinityPlacementFactory implements 
PlacementPluginFactory<Affinity
       return placementPlans;
     }
 
+    private boolean shouldSpreadAcrossDomains(Set<Node> allNodes, 
AttributeValues attrValues) {
+      boolean doSpreadAcrossDomains =
+          spreadAcrossDomains && spreadDomainPropPresent(allNodes, attrValues);
+      if (spreadAcrossDomains && !doSpreadAcrossDomains) {
+        log.warn(
+            "AffinityPlacementPlugin configured to spread across domains, but 
there are nodes in the cluster without the {} system property. Ignoring 
spreadAcrossDomains.",
+            AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
+      }
+      return doSpreadAcrossDomains;
+    }
+
+    private boolean spreadDomainPropPresent(Set<Node> allNodes, 
AttributeValues attrValues) {
+      // We can only use spread domains if all nodes have the system property
+      return allNodes.stream()
+          .noneMatch(
+              n ->
+                  attrValues
+                      .getSystemProperty(n, 
AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
+                      .isEmpty());
+    }
+
     @Override
     public void verifyAllowedModification(
         ModificationRequest modificationRequest, PlacementContext 
placementContext)
@@ -503,15 +534,187 @@ public class AffinityPlacementFactory implements 
PlacementPluginFactory<Affinity
      */
     private static class AzWithNodes {
       final String azName;
-      List<Node> availableNodesForPlacement;
-      boolean hasBeenSorted;
-
-      AzWithNodes(String azName, List<Node> availableNodesForPlacement) {
+      private final boolean useSpreadDomains;
+      private boolean listIsSorted = false;
+      private final Comparator<Node> nodeComparator;
+      private final Random random;
+      private final List<Node> availableNodesForPlacement;
+      private final AttributeValues attributeValues;
+      private TreeSet<SpreadDomainWithNodes> sortedSpreadDomains;
+      private final Map<String, Integer> currentSpreadDomainUsageUsage;
+      private int numNodesForPlacement;
+
+      AzWithNodes(
+          String azName,
+          List<Node> availableNodesForPlacement,
+          boolean useSpreadDomains,
+          Comparator<Node> nodeComparator,
+          Random random,
+          AttributeValues attributeValues,
+          Map<String, Integer> currentSpreadDomainUsageUsage) {
         this.azName = azName;
         this.availableNodesForPlacement = availableNodesForPlacement;
-        // Once the list is sorted to an order we're happy with, this flag is 
set to true to avoid
-        // sorting multiple times unnecessarily.
-        this.hasBeenSorted = false;
+        this.useSpreadDomains = useSpreadDomains;
+        this.nodeComparator = nodeComparator;
+        this.random = random;
+        this.attributeValues = attributeValues;
+        this.currentSpreadDomainUsageUsage = currentSpreadDomainUsageUsage;
+        this.numNodesForPlacement = availableNodesForPlacement.size();
+      }
+
+      private boolean hasBeenSorted() {
+        return (useSpreadDomains && sortedSpreadDomains != null)
+            || (!useSpreadDomains && listIsSorted);
+      }
+
+      void ensureSorted() {
+        if (!hasBeenSorted()) {
+          sort();
+        }
+      }
+
+      private void sort() {
+        assert !listIsSorted && sortedSpreadDomains == null
+            : "We shouldn't be sorting this list again";
+
+        // Make sure we do not tend to use always the same nodes (within an 
AZ) if all
+        // conditions are identical (well, this likely is not the case since 
after having added
+        // a replica to a node its number of cores increases for the next 
placement decision,
+        // but let's be defensive here, given that multiple concurrent 
placement decisions might
+        // see the same initial cluster state, and we want placement to be 
reasonable even in
+        // that case without creating an unnecessary imbalance). For example, 
if all nodes have
+        // 0 cores and same amount of free disk space, ideally we want to pick 
a random node for
+        // placement, not always the same one due to some internal ordering.
+        Collections.shuffle(availableNodesForPlacement, random);
+
+        if (useSpreadDomains) {
+          // When we use spread domains, we don't just sort the list of nodes, 
instead we generate a
+          // TreeSet of SpreadDomainWithNodes,
+          // sorted by the number of times the domain has been used. Each
+          // SpreadDomainWithNodes internally contains the list of nodes that 
belong to that
+          // particular domain,
+          // and it's sorted internally by the comparator passed to this
+          // class (which is the same that's used when not using spread 
domains).
+          // Whenever a node from a particular SpreadDomainWithNodes is 
selected as the best
+          // candidate, the call to "removeBestNode" will:
+          // 1. Remove the SpreadDomainWithNodes instance from the TreeSet
+          // 2. Remove the best node from the list within the 
SpreadDomainWithNodes
+          // 3. Increment the count of times the domain has been used
+          // 4. Re-add the SpreadDomainWithNodes instance to the TreeSet if 
there are still nodes
+          // available
+          HashMap<String, List<Node>> spreadDomainToListOfNodesMap = new 
HashMap<>();
+          for (Node node : availableNodesForPlacement) {
+            spreadDomainToListOfNodesMap
+                .computeIfAbsent(
+                    attributeValues
+                        .getSystemProperty(node, 
AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
+                        .get(),
+                    k -> new ArrayList<>())
+                .add(node);
+          }
+          sortedSpreadDomains =
+              new TreeSet<>(new 
SpreadDomainComparator(currentSpreadDomainUsageUsage));
+
+          int i = 0;
+          for (Map.Entry<String, List<Node>> entry : 
spreadDomainToListOfNodesMap.entrySet()) {
+            // Sort the nodes within the spread domain by the provided 
comparator
+            entry.getValue().sort(nodeComparator);
+            sortedSpreadDomains.add(
+                new SpreadDomainWithNodes(entry.getKey(), entry.getValue(), 
i++, nodeComparator));
+          }
+        } else {
+          availableNodesForPlacement.sort(nodeComparator);
+          listIsSorted = true;
+        }
+      }
+
+      Node getBestNode() {
+        assert hasBeenSorted();
+        if (useSpreadDomains) {
+          return sortedSpreadDomains.first().sortedNodesForPlacement.get(0);
+        } else {
+          return availableNodesForPlacement.get(0);
+        }
+      }
+
+      public Node removeBestNode() {
+        assert hasBeenSorted();
+        this.numNodesForPlacement--;
+        if (useSpreadDomains) {
+          // Since this SpreadDomainWithNodes needs to be re-sorted in the 
sortedSpreadDomains, we
+          // remove it and then re-add it, once the best node has been removed.
+          SpreadDomainWithNodes group = sortedSpreadDomains.pollFirst();
+          Node n = group.sortedNodesForPlacement.remove(0);
+          this.currentSpreadDomainUsageUsage.merge(group.spreadDomainName, 1, 
Integer::sum);
+          if (!group.sortedNodesForPlacement.isEmpty()) {
+            sortedSpreadDomains.add(group);
+          }
+          return n;
+        } else {
+          return availableNodesForPlacement.remove(0);
+        }
+      }
+
+      public int numNodes() {
+        return this.numNodesForPlacement;
+      }
+    }
+
+    /**
+     * This class represents group of nodes with the same {@link
+     * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label.
+     */
+    static class SpreadDomainWithNodes implements 
Comparable<SpreadDomainWithNodes> {
+
+      /**
+       * This is the label that all nodes in this group have in {@link
+       * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label.
+       */
+      final String spreadDomainName;
+
+      /**
+       * The list of all nodes that contain the same {@link
+       * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label. They must be 
sorted before creating
+       * this class.
+       */
+      private final List<Node> sortedNodesForPlacement;
+
+      /**
+       * This is used for tie breaking the sort of {@link 
SpreadDomainWithNodes}, when the
+       * nodeComparator between the top nodes of each group return 0.
+       */
+      private final int tieBreaker;
+
+      /**
+       * This is the comparator that is used to compare the top nodes in the 
{@link
+       * #sortedNodesForPlacement} lists. Must be the same that was used to 
sort {@link
+       * #sortedNodesForPlacement}.
+       */
+      private final Comparator<Node> nodeComparator;
+
+      public SpreadDomainWithNodes(
+          String spreadDomainName,
+          List<Node> sortedNodesForPlacement,
+          int tieBreaker,
+          Comparator<Node> nodeComparator) {
+        this.spreadDomainName = spreadDomainName;
+        this.sortedNodesForPlacement = sortedNodesForPlacement;
+        this.tieBreaker = tieBreaker;
+        this.nodeComparator = nodeComparator;
+      }
+
+      @Override
+      public int compareTo(SpreadDomainWithNodes o) {
+        if (o == this) {
+          return 0;
+        }
+        int result =
+            nodeComparator.compare(
+                this.sortedNodesForPlacement.get(0), 
o.sortedNodesForPlacement.get(0));
+        if (result == 0) {
+          return Integer.compare(this.tieBreaker, o.tieBreaker);
+        }
+        return result;
       }
     }
 
@@ -658,7 +861,8 @@ public class AffinityPlacementFactory implements 
PlacementPluginFactory<Affinity
         Set<Node> nodesWithReplicas,
         Map<Node, Integer> coresOnNodes,
         PlacementPlanFactory placementPlanFactory,
-        Set<ReplicaPlacement> replicaPlacements)
+        Set<ReplicaPlacement> replicaPlacements,
+        boolean doSpreadAcrossDomains)
         throws PlacementException {
       // Count existing replicas per AZ. We count only instances of the type 
of replica for which we
       // need to do placement. If we ever want to balance replicas of any type 
across AZ's (and not
@@ -679,6 +883,9 @@ public class AffinityPlacementFactory implements 
PlacementPluginFactory<Affinity
       // be put on same node)
       candidateNodes.removeAll(nodesWithReplicas);
 
+      // This Map will include the affinity labels for the nodes that are 
currently hosting replicas
+      // of this shard. It will be modified with new placement decisions.
+      Map<String, Integer> spreadDomainsInUse = new HashMap<>();
       Shard shard = solrCollection.getShard(shardName);
       if (shard != null) {
         // shard is non null if we're adding replicas to an already existing 
collection.
@@ -694,6 +901,15 @@ public class AffinityPlacementFactory implements 
PlacementPluginFactory<Affinity
               // the dereferencing below can't be assumed as the entry will 
not exist in the map.
               azToNumReplicas.put(az, azToNumReplicas.get(az) + 1);
             }
+            if (doSpreadAcrossDomains) {
+              spreadDomainsInUse.merge(
+                  attrValues
+                      .getSystemProperty(
+                          replica.getNode(), 
AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
+                      .get(),
+                  1,
+                  Integer::sum);
+            }
           }
         }
       }
@@ -703,15 +919,25 @@ public class AffinityPlacementFactory implements 
PlacementPluginFactory<Affinity
       // consideration how many replicas were per AZ, so we can place (or try 
to place) replicas on
       // AZ's that have fewer replicas
 
-      // Get the candidate nodes per AZ in order to build (further down) a 
mapping of AZ to
-      // placement candidates.
       Map<String, List<Node>> nodesPerAz = new HashMap<>();
-      for (Node node : candidateNodes) {
-        String nodeAz = getNodeAZ(node, attrValues);
-        List<Node> nodesForAz = nodesPerAz.computeIfAbsent(nodeAz, k -> new 
ArrayList<>());
-        nodesForAz.add(node);
+      if (availabilityZones.size() == 1) {
+        // If AZs are not being used (all undefined for example) or a single 
AZ exists, we add all
+        // nodes
+        // to the same entry
+        nodesPerAz.put(availabilityZones.iterator().next(), new 
ArrayList<>(candidateNodes));
+      } else {
+        // Get the candidate nodes per AZ in order to build (further down) a 
mapping of AZ to
+        // placement candidates.
+        for (Node node : candidateNodes) {
+          String nodeAz = getNodeAZ(node, attrValues);
+          List<Node> nodesForAz = nodesPerAz.computeIfAbsent(nodeAz, k -> new 
ArrayList<>());
+          nodesForAz.add(node);
+        }
       }
 
+      Comparator<Node> interGroupNodeComparator =
+          new CoresAndDiskComparator(attrValues, coresOnNodes, 
prioritizedFreeDiskGB);
+
       // Build a treeMap sorted by the number of replicas per AZ and including 
candidates nodes
       // suitable for placement on the AZ, so we can easily select the next AZ 
to get a replica
       // assignment and quickly (constant time) decide if placement on this AZ 
is possible or not.
@@ -720,12 +946,17 @@ public class AffinityPlacementFactory implements 
PlacementPluginFactory<Affinity
       for (Map.Entry<String, List<Node>> e : nodesPerAz.entrySet()) {
         azByExistingReplicas
             .computeIfAbsent(azToNumReplicas.get(e.getKey()), k -> new 
HashSet<>())
-            .add(new AzWithNodes(e.getKey(), e.getValue()));
+            .add(
+                new AzWithNodes(
+                    e.getKey(),
+                    e.getValue(),
+                    doSpreadAcrossDomains,
+                    interGroupNodeComparator,
+                    replicaPlacementRandom,
+                    attrValues,
+                    spreadDomainsInUse));
       }
 
-      CoresAndDiskComparator coresAndDiskComparator =
-          new CoresAndDiskComparator(attrValues, coresOnNodes, 
prioritizedFreeDiskGB);
-
       for (int i = 0; i < numReplicas; i++) {
         // We have for each AZ on which we might have a chance of placing a 
replica, the list of
         // candidate nodes for replicas (candidate: does not already have a 
replica of this shard
@@ -745,7 +976,7 @@ public class AffinityPlacementFactory implements 
PlacementPluginFactory<Affinity
           Iterator<AzWithNodes> it = mapEntry.getValue().iterator();
           while (it.hasNext()) {
             Map.Entry<Integer, AzWithNodes> entry = 
Map.entry(mapEntry.getKey(), it.next());
-            int numberOfNodes = 
entry.getValue().availableNodesForPlacement.size();
+            int numberOfNodes = entry.getValue().numNodes();
             if (numberOfNodes == 0) {
               it.remove();
             } else { // AZ does have node(s) for placement
@@ -788,38 +1019,20 @@ public class AffinityPlacementFactory implements 
PlacementPluginFactory<Affinity
         Node selectedAzBestNode = null;
         for (Map.Entry<Integer, AzWithNodes> candidateAzEntry : 
candidateAzEntries) {
           AzWithNodes azWithNodes = candidateAzEntry.getValue();
-          List<Node> nodes = azWithNodes.availableNodesForPlacement;
-
-          if (!azWithNodes.hasBeenSorted) {
-            // Make sure we do not tend to use always the same nodes (within 
an AZ) if all
-            // conditions are identical (well, this likely is not the case 
since after having added
-            // a replica to a node its number of cores increases for the next 
placement decision,
-            // but let's be defensive here, given that multiple concurrent 
placement decisions might
-            // see the same initial cluster state, and we want placement to be 
reasonable even in
-            // that case without creating an unnecessary imbalance). For 
example, if all nodes have
-            // 0 cores and same amount of free disk space, ideally we want to 
pick a random node for
-            // placement, not always the same one due to some internal 
ordering.
-            Collections.shuffle(nodes, replicaPlacementRandom);
-
-            // Sort by increasing number of cores but pushing nodes with low 
free disk space to the
-            // end of the list
-            nodes.sort(coresAndDiskComparator);
-
-            azWithNodes.hasBeenSorted = true;
-          }
+          azWithNodes.ensureSorted();
 
           // Which one is better, the new one or the previous best?
           if (selectedAz == null
-              || coresAndDiskComparator.compare(nodes.get(0), 
selectedAzBestNode) < 0) {
+              || interGroupNodeComparator.compare(azWithNodes.getBestNode(), 
selectedAzBestNode)
+                  < 0) {
             selectedAz = candidateAzEntry;
-            selectedAzBestNode = nodes.get(0);
+            selectedAzBestNode = azWithNodes.getBestNode();
           }
         }
 
         // Now actually remove the selected node from the winning AZ
         AzWithNodes azWithNodes = selectedAz.getValue();
-        List<Node> nodes = selectedAz.getValue().availableNodesForPlacement;
-        Node assignTarget = nodes.remove(0);
+        Node assignTarget = azWithNodes.removeBestNode();
 
         // Insert back all the qualifying but non winning AZ's removed while 
searching for the one
         for (Map.Entry<Integer, AzWithNodes> removedAzs : candidateAzEntries) {
@@ -971,5 +1184,29 @@ public class AffinityPlacementFactory implements 
PlacementPluginFactory<Affinity
         return Integer.compare(coresOnNodes.get(a), coresOnNodes.get(b));
       }
     }
+
+    static class SpreadDomainComparator implements 
Comparator<SpreadDomainWithNodes> {
+      private final Map<String, Integer> spreadDomainUsage;
+
+      SpreadDomainComparator(Map<String, Integer> spreadDomainUsage) {
+        this.spreadDomainUsage = spreadDomainUsage;
+      }
+
+      @Override
+      public int compare(SpreadDomainWithNodes group1, SpreadDomainWithNodes 
group2) {
+        // This comparator will compare groups by:
+        // 1. The number of usages for the domain they represent: We want 
groups that are
+        // less used to be the best ones
+        // 2. On equal number of usages, by the internal comparator (which 
uses core count and disk
+        // space) on the best node for each group (which, since the list is 
sorted, it's always the
+        // one in the position 0)
+        Integer usagesLabel1 = 
spreadDomainUsage.getOrDefault(group1.spreadDomainName, 0);
+        Integer usagesLabel2 = 
spreadDomainUsage.getOrDefault(group2.spreadDomainName, 0);
+        if (usagesLabel1.equals(usagesLabel2)) {
+          return group1.compareTo(group2);
+        }
+        return usagesLabel1.compareTo(usagesLabel2);
+      }
+    }
   }
 }
diff --git 
a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
 
b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
index 7f9c48301e9..0e1665116ac 100644
--- 
a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
+++ 
b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
@@ -18,6 +18,8 @@
 package org.apache.solr.cluster.placement.plugins;
 
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -1076,40 +1078,63 @@ public class AffinityPlacementFactoryTest extends 
SolrTestCaseJ4 {
     int pullReplicas = TEST_NIGHTLY ? 20 : 2;
 
     log.info("==== numNodes ====");
-    runTestScalability(1000, numShards, nrtReplicas, tlogReplicas, 
pullReplicas);
-    runTestScalability(2000, numShards, nrtReplicas, tlogReplicas, 
pullReplicas);
-    runTestScalability(5000, numShards, nrtReplicas, tlogReplicas, 
pullReplicas);
-    runTestScalability(10000, numShards, nrtReplicas, tlogReplicas, 
pullReplicas);
-    runTestScalability(20000, numShards, nrtReplicas, tlogReplicas, 
pullReplicas);
+    runTestScalability(1000, numShards, nrtReplicas, tlogReplicas, 
pullReplicas, false);
+    runTestScalability(2000, numShards, nrtReplicas, tlogReplicas, 
pullReplicas, false);
+    runTestScalability(5000, numShards, nrtReplicas, tlogReplicas, 
pullReplicas, false);
+    runTestScalability(10000, numShards, nrtReplicas, tlogReplicas, 
pullReplicas, false);
+    runTestScalability(20000, numShards, nrtReplicas, tlogReplicas, 
pullReplicas, false);
 
     log.info("==== numShards ====");
     int numNodes = TEST_NIGHTLY ? 5000 : 500;
-    runTestScalability(numNodes, 100, nrtReplicas, tlogReplicas, pullReplicas);
-    runTestScalability(numNodes, 200, nrtReplicas, tlogReplicas, pullReplicas);
-    runTestScalability(numNodes, 500, nrtReplicas, tlogReplicas, pullReplicas);
+    runTestScalability(numNodes, 100, nrtReplicas, tlogReplicas, pullReplicas, 
false);
+    runTestScalability(numNodes, 200, nrtReplicas, tlogReplicas, pullReplicas, 
false);
+    runTestScalability(numNodes, 500, nrtReplicas, tlogReplicas, pullReplicas, 
false);
     if (TEST_NIGHTLY) {
-      runTestScalability(numNodes, 1000, nrtReplicas, tlogReplicas, 
pullReplicas);
-      runTestScalability(numNodes, 2000, nrtReplicas, tlogReplicas, 
pullReplicas);
+      runTestScalability(numNodes, 1000, nrtReplicas, tlogReplicas, 
pullReplicas, false);
+      runTestScalability(numNodes, 2000, nrtReplicas, tlogReplicas, 
pullReplicas, false);
     }
 
     log.info("==== numReplicas ====");
-    runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 100 : 10, 0, 0);
-    runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 200 : 20, 0, 0);
-    runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 500 : 50, 0, 0);
-    runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 1000 : 30, 0, 0);
-    runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 2000 : 50, 0, 0);
+    runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 100 : 10, 0, 0, 
false);
+    runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 200 : 20, 0, 0, 
false);
+    runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 500 : 50, 0, 0, 
false);
+    runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 1000 : 30, 0, 0, 
false);
+    runTestScalability(numNodes, numShards, TEST_NIGHTLY ? 2000 : 50, 0, 0, 
false);
+
+    log.info("==== spread domains ====");
+    runTestScalability(numNodes, numShards, nrtReplicas, tlogReplicas, 
pullReplicas, false);
+    runTestScalability(numNodes, numShards, nrtReplicas, tlogReplicas, 
pullReplicas, true);
   }
 
   private void runTestScalability(
-      int numNodes, int numShards, int nrtReplicas, int tlogReplicas, int 
pullReplicas)
+      int numNodes,
+      int numShards,
+      int nrtReplicas,
+      int tlogReplicas,
+      int pullReplicas,
+      boolean useSpreadDomains)
       throws Exception {
     String collectionName = "scaleCollection";
 
+    if (useSpreadDomains) {
+      defaultConfig.spreadAcrossDomains = true;
+      configurePlugin(defaultConfig);
+    } else {
+      defaultConfig.spreadAcrossDomains = false;
+      configurePlugin(defaultConfig);
+    }
+
+    int numSpreadDomains = 5;
+
     Builders.ClusterBuilder clusterBuilder =
         Builders.newClusterBuilder().initializeLiveNodes(numNodes);
     List<Builders.NodeBuilder> nodeBuilders = 
clusterBuilder.getLiveNodeBuilders();
     for (int i = 0; i < numNodes; i++) {
       nodeBuilders.get(i).setCoreCount(0).setFreeDiskGB((double) numNodes);
+      nodeBuilders
+          .get(i)
+          .setSysprop(
+              AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP, String.valueOf(i 
% numSpreadDomains));
     }
 
     Builders.CollectionBuilder collectionBuilder = 
Builders.newCollectionBuilder(collectionName);
@@ -1118,6 +1143,8 @@ public class AffinityPlacementFactoryTest extends 
SolrTestCaseJ4 {
     PlacementContext placementContext = clusterBuilder.buildPlacementContext();
     SolrCollection solrCollection = collectionBuilder.build();
     List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+    Set<Node> setNodes = new HashSet<>(liveNodes);
+    assertEquals(setNodes.size(), liveNodes.size());
 
     // Place replicas for all the shards of the (newly created since it has no 
replicas yet)
     // collection
@@ -1138,10 +1165,11 @@ public class AffinityPlacementFactoryTest extends 
SolrTestCaseJ4 {
     final int TOTAL_REPLICAS = numShards * REPLICAS_PER_SHARD;
 
     log.info(
-        "ComputePlacement: {} nodes, {} shards, {} total replicas, elapsed 
time {} ms.",
+        "ComputePlacement: {} nodes, {} shards, {} total replicas, 
spread-domains={}, elapsed time {} ms.",
         numNodes,
         numShards,
         TOTAL_REPLICAS,
+        useSpreadDomains,
         TimeUnit.NANOSECONDS.toMillis(end - start)); // nowarn
     assertEquals(
         "incorrect number of calculated placements",
@@ -1169,16 +1197,228 @@ public class AffinityPlacementFactoryTest extends 
SolrTestCaseJ4 {
     int perNode = TOTAL_REPLICAS > numNodes ? TOTAL_REPLICAS / numNodes : 1;
     replicasPerNode.forEach(
         (node, count) -> {
-          assertEquals(count.get(), perNode);
+          assertEquals(perNode, count.get());
         });
     shardsPerNode.forEach(
         (node, names) -> {
-          assertEquals(names.size(), perNode);
+          assertEquals(perNode, names.size());
         });
 
     replicasPerShard.forEach(
         (shard, count) -> {
-          assertEquals(count.get(), REPLICAS_PER_SHARD);
+          assertEquals(REPLICAS_PER_SHARD, count.get());
         });
   }
+
+  @Test
+  public void testAntiAffinityIsSoft() throws Exception {
+    defaultConfig.spreadAcrossDomains = true;
+    configurePlugin(defaultConfig);
+    String collectionName = "basicCollection";
+
+    Builders.ClusterBuilder clusterBuilder = 
Builders.newClusterBuilder().initializeLiveNodes(2);
+    List<Builders.NodeBuilder> nodeBuilders = 
clusterBuilder.getLiveNodeBuilders();
+    // Anti-affinity can't be achieved, since all nodes in the cluster have 
the same value
+    nodeBuilders
+        .get(0)
+        .setCoreCount(1)
+        .setFreeDiskGB((double) (PRIORITIZED_FREE_DISK_GB + 1))
+        .setSysprop(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP, "A");
+    nodeBuilders
+        .get(1)
+        .setCoreCount(2)
+        .setFreeDiskGB((double) (PRIORITIZED_FREE_DISK_GB + 1))
+        .setSysprop(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP, "A");
+
+    Builders.CollectionBuilder collectionBuilder = 
Builders.newCollectionBuilder(collectionName);
+    collectionBuilder.initializeShardsReplicas(1, 0, 0, 0, List.of());
+
+    PlacementContext placementContext = clusterBuilder.buildPlacementContext();
+
+    SolrCollection solrCollection = collectionBuilder.build();
+    List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+    {
+      // Place a new replica for the (only) existing shard of the collection
+      PlacementRequestImpl placementRequest =
+          new PlacementRequestImpl(
+              solrCollection,
+              Set.of(solrCollection.shards().iterator().next().getShardName()),
+              new HashSet<>(liveNodes),
+              1,
+              0,
+              0);
+
+      PlacementPlan pp = plugin.computePlacement(placementRequest, 
placementContext);
+
+      assertEquals(1, pp.getReplicaPlacements().size());
+      ReplicaPlacement rp = pp.getReplicaPlacements().iterator().next();
+      assertEquals(liveNodes.get(0), rp.getNode());
+    }
+    {
+      // Place a new replica for the (only) existing shard of the collection
+      PlacementRequestImpl placementRequest =
+          new PlacementRequestImpl(
+              solrCollection,
+              Set.of(solrCollection.shards().iterator().next().getShardName()),
+              new HashSet<>(liveNodes),
+              2,
+              0,
+              0);
+
+      PlacementPlan pp = plugin.computePlacement(placementRequest, 
placementContext);
+
+      assertEquals(2, pp.getReplicaPlacements().size());
+      assertEquals(
+          Set.of(liveNodes.get(0), liveNodes.get(1)),
+          pp.getReplicaPlacements().stream()
+              .map(ReplicaPlacement::getNode)
+              .collect(Collectors.toSet()));
+    }
+  }
+
+  @Test
+  public void testSpreadDomainsWithExistingCollection() throws Exception {
+    testSpreadDomains(true);
+  }
+
+  @Test
+  public void testSpreadDomainsWithEmptyCluster() throws Exception {
+    testSpreadDomains(false);
+  }
+
+  private void testSpreadDomains(boolean hasExistingCollection) throws 
Exception {
+    defaultConfig.spreadAcrossDomains = true;
+    configurePlugin(defaultConfig);
+    String collectionName = "basicCollection";
+
+    Builders.ClusterBuilder clusterBuilder = 
Builders.newClusterBuilder().initializeLiveNodes(3);
+    List<Builders.NodeBuilder> nodeBuilders = 
clusterBuilder.getLiveNodeBuilders();
+    nodeBuilders
+        .get(0)
+        .setCoreCount(1)
+        .setFreeDiskGB((double) (PRIORITIZED_FREE_DISK_GB + 1))
+        .setSysprop(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP, "A");
+    nodeBuilders
+        .get(1)
+        .setCoreCount(2)
+        .setFreeDiskGB((double) (PRIORITIZED_FREE_DISK_GB + 1))
+        .setSysprop(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP, "A");
+    nodeBuilders
+        .get(2)
+        .setCoreCount(3)
+        .setFreeDiskGB((double) (PRIORITIZED_FREE_DISK_GB + 1))
+        .setSysprop(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP, "B");
+
+    Builders.CollectionBuilder collectionBuilder = 
Builders.newCollectionBuilder(collectionName);
+
+    if (hasExistingCollection) {
+      // Existing collection has replicas for its shards and is visible in the 
cluster state
+      collectionBuilder.initializeShardsReplicas(1, 1, 0, 0, nodeBuilders);
+      clusterBuilder.addCollection(collectionBuilder);
+    } else {
+      // New collection to create has the shards defined but no replicas and 
is not present in
+      // cluster state
+      collectionBuilder.initializeShardsReplicas(1, 0, 0, 0, List.of());
+    }
+
+    PlacementContext placementContext = clusterBuilder.buildPlacementContext();
+
+    SolrCollection solrCollection = collectionBuilder.build();
+    List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+    {
+      // Place a new replica for the (only) existing shard of the collection
+      PlacementRequestImpl placementRequest =
+          new PlacementRequestImpl(
+              solrCollection,
+              Set.of(solrCollection.shards().iterator().next().getShardName()),
+              new HashSet<>(liveNodes),
+              1,
+              0,
+              0);
+
+      PlacementPlan pp = plugin.computePlacement(placementRequest, 
placementContext);
+
+      assertEquals(1, pp.getReplicaPlacements().size());
+      ReplicaPlacement rp = pp.getReplicaPlacements().iterator().next();
+      assertEquals(hasExistingCollection ? liveNodes.get(2) : 
liveNodes.get(0), rp.getNode());
+    }
+    {
+      // Place a new replica for the (only) existing shard of the collection
+      PlacementRequestImpl placementRequest =
+          new PlacementRequestImpl(
+              solrCollection,
+              Set.of(solrCollection.shards().iterator().next().getShardName()),
+              new HashSet<>(liveNodes),
+              2,
+              0,
+              0);
+
+      PlacementPlan pp = plugin.computePlacement(placementRequest, 
placementContext);
+
+      assertEquals(2, pp.getReplicaPlacements().size());
+      assertEquals(
+          hasExistingCollection
+              ? Set.of(liveNodes.get(1), liveNodes.get(2))
+              : Set.of(liveNodes.get(0), liveNodes.get(2)),
+          pp.getReplicaPlacements().stream()
+              .map(ReplicaPlacement::getNode)
+              .collect(Collectors.toSet()));
+    }
+  }
+
+  @Test
+  @SuppressWarnings("SelfComparison")
+  public void testCompareSpreadDomainWithNodes() {
+    Builders.ClusterBuilder clusterBuilder = 
Builders.newClusterBuilder().initializeLiveNodes(3);
+    final List<Builders.NodeBuilder> nodeBuilders = 
clusterBuilder.getLiveNodeBuilders();
+    nodeBuilders.get(0).setNodeName("nodeA");
+    nodeBuilders.get(1).setNodeName("nodeB");
+    nodeBuilders.get(2).setNodeName("nodeC");
+
+    Cluster cluster = clusterBuilder.build();
+    Node nodeA =
+        cluster.getLiveNodes().stream()
+            .filter((n) -> n.getName().equals("nodeA"))
+            .findFirst()
+            .get();
+    Node nodeB =
+        cluster.getLiveNodes().stream()
+            .filter((n) -> n.getName().equals("nodeB"))
+            .findFirst()
+            .get();
+    Node nodeC =
+        cluster.getLiveNodes().stream()
+            .filter((n) -> n.getName().equals("nodeC"))
+            .findFirst()
+            .get();
+
+    Comparator<Node> nodeComparator = Comparator.comparing(Node::getName);
+    List<Node> listInGroup1 = new ArrayList<>(List.of(nodeC, nodeA));
+    AffinityPlacementFactory.AffinityPlacementPlugin.SpreadDomainWithNodes 
group1 =
+        new 
AffinityPlacementFactory.AffinityPlacementPlugin.SpreadDomainWithNodes(
+            "foo", listInGroup1, 0, nodeComparator);
+    AffinityPlacementFactory.AffinityPlacementPlugin.SpreadDomainWithNodes 
group2 =
+        new 
AffinityPlacementFactory.AffinityPlacementPlugin.SpreadDomainWithNodes(
+            "bar", List.of(nodeB), 1, nodeComparator);
+    assertEquals("Comparing to itself should return 0", 0, 
group1.compareTo(group1));
+    assertEquals(
+        "group 1 should be greater, since 'nodeC' is greater than 'nodeB",
+        1,
+        group1.compareTo(group2));
+    assertEquals(
+        "group 1 should be greater, since 'nodeC' is greater than 'nodeB",
+        -1,
+        group2.compareTo(group1));
+    listInGroup1.remove(0);
+    assertEquals(
+        "group 1 should be greater, since 'nodeB' is greater than 'nodeA",
+        -1,
+        group1.compareTo(group2));
+    listInGroup1.remove(0);
+    listInGroup1.add(nodeB);
+    assertEquals(
+        "group 1 should be greater because of the tie breaker", -1, 
group1.compareTo(group2));
+  }
 }


Reply via email to