HoustonPutman commented on code in PR #1650:
URL: https://github.com/apache/solr/pull/1650#discussion_r1224379842
##########
solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java:
##########
@@ -251,1006 +231,463 @@ private AffinityPlacementPlugin(
// We make things reproducible in tests by using test seed if any
String seed = System.getProperty("tests.seed");
- if (seed != null) {
- replicaPlacementRandom.setSeed(seed.hashCode());
- }
- }
-
- @Override
- @SuppressForbidden(
- reason =
- "Ordering.arbitrary() has no equivalent in Comparator class.
Rather reuse than copy.")
- public List<PlacementPlan> computePlacements(
- Collection<PlacementRequest> requests, PlacementContext
placementContext)
- throws PlacementException {
- List<PlacementPlan> placementPlans = new ArrayList<>(requests.size());
- Set<Node> allNodes = new HashSet<>();
- Set<SolrCollection> allCollections = new HashSet<>();
- for (PlacementRequest request : requests) {
- allNodes.addAll(request.getTargetNodes());
- allCollections.add(request.getCollection());
- }
-
- // Fetch attributes for a superset of all nodes requested amongst the
placementRequests
- AttributeFetcher attributeFetcher =
placementContext.getAttributeFetcher();
- attributeFetcher
-
.requestNodeSystemProperty(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP)
- .requestNodeSystemProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP)
-
.requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
-
.requestNodeSystemProperty(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
- attributeFetcher
- .requestNodeMetric(BuiltInMetrics.NODE_NUM_CORES)
- .requestNodeMetric(BuiltInMetrics.NODE_FREE_DISK_GB);
- Set<ReplicaMetric<?>> replicaMetrics =
Set.of(BuiltInMetrics.REPLICA_INDEX_SIZE_GB);
- for (SolrCollection collection : allCollections) {
- attributeFetcher.requestCollectionMetrics(collection, replicaMetrics);
- }
- attributeFetcher.fetchFrom(allNodes);
- final AttributeValues attrValues = attributeFetcher.fetchAttributes();
- // Get the number of currently existing cores per node, so we can update
as we place new cores
- // to not end up always selecting the same node(s). This is used across
placement requests
- Map<Node, Integer> allCoresOnNodes = getCoreCountPerNode(allNodes,
attrValues);
-
- boolean doSpreadAcrossDomains = shouldSpreadAcrossDomains(allNodes,
attrValues);
-
- // Keep track with nodesWithReplicas across requests
- Map<String, Map<String, Set<Node>>> allNodesWithReplicas = new
HashMap<>();
- for (PlacementRequest request : requests) {
- Set<Node> nodes = request.getTargetNodes();
- SolrCollection solrCollection = request.getCollection();
-
- // filter out nodes that don't meet the `withCollection` constraint
- nodes =
- filterNodesWithCollection(placementContext.getCluster(), request,
attrValues, nodes);
- // filter out nodes that don't match the "node types" specified in the
collection props
- nodes = filterNodesByNodeType(placementContext.getCluster(), request,
attrValues, nodes);
-
- // All available zones of live nodes. Due to some nodes not being
candidates for placement,
- // and some existing replicas being one availability zones that might
be offline (i.e. their
- // nodes are not live), this set might contain zones on which it is
impossible to place
- // replicas. That's ok.
- Set<String> availabilityZones = getZonesFromNodes(nodes, attrValues);
-
- // Build the replica placement decisions here
- Set<ReplicaPlacement> replicaPlacements = new HashSet<>();
-
- // Let's now iterate on all shards to create replicas for and start
finding home sweet homes
- // for the replicas
- for (String shardName : request.getShardNames()) {
- ReplicaMetrics leaderMetrics =
- attrValues
- .getCollectionMetrics(solrCollection.getName())
- .flatMap(colMetrics -> colMetrics.getShardMetrics(shardName))
- .flatMap(ShardMetrics::getLeaderMetrics)
- .orElse(null);
-
- // Split the set of nodes into 3 sets of nodes accepting each
replica type (sets can
- // overlap
- // if nodes accept multiple replica types). These subsets sets are
actually maps, because
- // we
- // capture the number of cores (of any replica type) present on each
node.
- //
- // This also filters out nodes that will not satisfy the rules if
the replica is placed
- // there
- EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes =
- getAvailableNodesForReplicaTypes(nodes, attrValues,
leaderMetrics);
-
- // Inventory nodes (if any) that already have a replica of any type
for the shard, because
- // we can't be placing additional replicas on these. This data
structure is updated after
- // each replica to node assign and is used to make sure different
replica types are not
- // allocated to the same nodes (protecting same node assignments
within a given replica
- // type is done "by construction" in makePlacementDecisions()).
- Set<Node> nodesWithReplicas =
- allNodesWithReplicas
- .computeIfAbsent(solrCollection.getName(), col -> new
HashMap<>())
- .computeIfAbsent(
- shardName,
- s -> {
- Set<Node> newNodeSet = new HashSet<>();
- Shard shard = solrCollection.getShard(s);
- if (shard != null) {
- // Prefill the set with the existing replicas
- for (Replica r : shard.replicas()) {
- newNodeSet.add(r.getNode());
- }
- }
- return newNodeSet;
- });
-
- // Iterate on the replica types in the enum order. We place more
strategic replicas first
- // (NRT is more strategic than TLOG more strategic than PULL). This
is in case we
- // eventually decide that less strategic replica placement
impossibility is not a problem
- // that should lead to replica placement computation failure.
Current code does fail if
- // placement is impossible (constraint is at most one replica of a
shard on any node).
- for (Replica.ReplicaType replicaType : Replica.ReplicaType.values())
{
- int numReplicasToCreate =
request.getCountReplicasToCreate(replicaType);
- if (numReplicasToCreate > 0) {
- makePlacementDecisions(
- solrCollection,
- shardName,
- availabilityZones,
- replicaType,
- numReplicasToCreate,
- attrValues,
- leaderMetrics,
- replicaTypeToNodes,
- nodesWithReplicas,
- allCoresOnNodes,
- placementContext.getPlacementPlanFactory(),
- replicaPlacements,
- doSpreadAcrossDomains);
- }
- }
- }
- placementPlans.add(
- placementContext
- .getPlacementPlanFactory()
- .createPlacementPlan(request, replicaPlacements));
- }
-
- return placementPlans;
- }
-
- private boolean shouldSpreadAcrossDomains(Set<Node> allNodes,
AttributeValues attrValues) {
- boolean doSpreadAcrossDomains =
- spreadAcrossDomains && spreadDomainPropPresent(allNodes, attrValues);
- if (spreadAcrossDomains && !doSpreadAcrossDomains) {
- log.warn(
- "AffinityPlacementPlugin configured to spread across domains, but
there are nodes in the cluster without the {} system property. Ignoring
spreadAcrossDomains.",
- AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
- }
- return doSpreadAcrossDomains;
- }
-
- private boolean spreadDomainPropPresent(Set<Node> allNodes,
AttributeValues attrValues) {
- // We can only use spread domains if all nodes have the system property
- return allNodes.stream()
- .noneMatch(
- n ->
- attrValues
- .getSystemProperty(n,
AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
- .isEmpty());
}
@Override
- public void verifyAllowedModification(
- ModificationRequest modificationRequest, PlacementContext
placementContext)
- throws PlacementModificationException {
- if (modificationRequest instanceof DeleteShardsRequest) {
- log.warn("DeleteShardsRequest not implemented yet, skipping: {}",
modificationRequest);
- } else if (modificationRequest instanceof DeleteCollectionRequest) {
- verifyDeleteCollection((DeleteCollectionRequest) modificationRequest,
placementContext);
- } else if (modificationRequest instanceof DeleteReplicasRequest) {
- verifyDeleteReplicas((DeleteReplicasRequest) modificationRequest,
placementContext);
- } else {
- log.warn("unsupported request type, skipping: {}",
modificationRequest);
- }
- }
-
- private void verifyDeleteCollection(
+ protected void verifyDeleteCollection(
DeleteCollectionRequest deleteCollectionRequest, PlacementContext
placementContext)
throws PlacementModificationException {
Cluster cluster = placementContext.getCluster();
- Set<String> colocatedCollections =
-
colocatedWith.getOrDefault(deleteCollectionRequest.getCollection().getName(),
Set.of());
- for (String primaryName : colocatedCollections) {
+ Set<String> collocatedCollections =
+
collocatedWith.getOrDefault(deleteCollectionRequest.getCollection().getName(),
Set.of());
+ for (String primaryName : collocatedCollections) {
try {
if (cluster.getCollection(primaryName) != null) {
// still exists
throw new PlacementModificationException(
- "colocated collection "
+ "collocated collection "
+ primaryName
+ " of "
+ deleteCollectionRequest.getCollection().getName()
+ " still present");
}
} catch (IOException e) {
throw new PlacementModificationException(
- "failed to retrieve colocated collection information", e);
+ "failed to retrieve collocated collection information", e);
}
}
}
- private void verifyDeleteReplicas(
- DeleteReplicasRequest deleteReplicasRequest, PlacementContext
placementContext)
- throws PlacementModificationException {
- Cluster cluster = placementContext.getCluster();
- SolrCollection secondaryCollection =
deleteReplicasRequest.getCollection();
- Set<String> colocatedCollections =
colocatedWith.get(secondaryCollection.getName());
- if (colocatedCollections == null) {
- return;
- }
- Map<Node, Map<String, AtomicInteger>> secondaryNodeShardReplicas = new
HashMap<>();
- secondaryCollection
- .shards()
- .forEach(
- shard ->
- shard
- .replicas()
- .forEach(
- replica ->
- secondaryNodeShardReplicas
- .computeIfAbsent(replica.getNode(), n -> new
HashMap<>())
- .computeIfAbsent(
- replica.getShard().getShardName(), s ->
new AtomicInteger())
- .incrementAndGet()));
+ private static final class AffinityPlacementContext {
+ private final Set<String> allSpreadDomains = new HashSet<>();
+ private final Map<String, Map<String, ReplicaSpread>> spreadDomainUsage
= new HashMap<>();
+ private final Set<String> allAvailabilityZones = new HashSet<>();
+ private final Map<String, Map<String, Map<Replica.ReplicaType,
ReplicaSpread>>>
+ availabilityZoneUsage = new HashMap<>();
+ private boolean doSpreadAcrossDomains;
+ }
- // find the colocated-with collections
- Map<Node, Set<String>> colocatingNodes = new HashMap<>();
- try {
- for (String colocatedCollection : colocatedCollections) {
- SolrCollection coll = cluster.getCollection(colocatedCollection);
- coll.shards()
- .forEach(
- shard ->
- shard
- .replicas()
- .forEach(
- replica ->
- colocatingNodes
- .computeIfAbsent(replica.getNode(), n ->
new HashSet<>())
- .add(coll.getName())));
- }
- } catch (IOException ioe) {
- throw new PlacementModificationException(
- "failed to retrieve colocated collection information", ioe);
- }
- PlacementModificationException exception = null;
- for (Replica replica : deleteReplicasRequest.getReplicas()) {
- if (!colocatingNodes.containsKey(replica.getNode())) {
- continue;
- }
- // check that there will be at least one replica remaining
- AtomicInteger secondaryCount =
- secondaryNodeShardReplicas
- .getOrDefault(replica.getNode(), Map.of())
- .getOrDefault(replica.getShard().getShardName(), new
AtomicInteger());
- if (secondaryCount.get() > 1) {
- // we can delete it - record the deletion
- secondaryCount.decrementAndGet();
- continue;
- }
- // fail - this replica cannot be removed
- if (exception == null) {
- exception = new PlacementModificationException("delete replica(s)
rejected");
+ @Override
+ protected Map<Node, WeightedNode> getBaseWeightedNodes(
+ PlacementContext placementContext,
+ Set<Node> nodes,
+ Iterable<SolrCollection> relevantCollections,
+ boolean skipNodesWithErrors)
+ throws PlacementException {
+ // Fetch attributes for a superset of all nodes requested amongst the
placementRequests
+ AttributeFetcher attributeFetcher =
placementContext.getAttributeFetcher();
+ attributeFetcher
+
.requestNodeSystemProperty(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP)
+ .requestNodeSystemProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP)
+
.requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
+
.requestNodeSystemProperty(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
+ attributeFetcher
+ .requestNodeMetric(BuiltInMetrics.NODE_NUM_CORES)
+ .requestNodeMetric(BuiltInMetrics.NODE_FREE_DISK_GB);
+ Set<ReplicaMetric<?>> replicaMetrics =
Set.of(BuiltInMetrics.REPLICA_INDEX_SIZE_GB);
+ Set<String> requestedCollections = new HashSet<>();
+ for (SolrCollection collection : relevantCollections) {
+ if (requestedCollections.add(collection.getName())) {
+ attributeFetcher.requestCollectionMetrics(collection,
replicaMetrics);
}
- exception.addRejectedModification(
- replica.toString(),
- "co-located with replicas of " +
colocatingNodes.get(replica.getNode()));
- }
- if (exception != null) {
- throw exception;
}
- }
+ attributeFetcher.fetchFrom(nodes);
+ final AttributeValues attrValues = attributeFetcher.fetchAttributes();
- private Set<String> getZonesFromNodes(Set<Node> nodes, final
AttributeValues attrValues) {
- Set<String> azs = new HashSet<>();
+ AffinityPlacementContext affinityPlacementContext = new
AffinityPlacementContext();
+ affinityPlacementContext.doSpreadAcrossDomains = spreadAcrossDomains;
- for (Node n : nodes) {
- azs.add(getNodeAZ(n, attrValues));
+ Map<Node, WeightedNode> affinityNodeMap =
CollectionUtil.newHashMap(nodes.size());
+ for (Node node : nodes) {
+ AffinityNode affinityNode =
+ newNodeFromMetrics(node, attrValues, affinityPlacementContext,
skipNodesWithErrors);
+ if (affinityNode != null) {
+ affinityNodeMap.put(node, affinityNode);
+ }
}
- return Collections.unmodifiableSet(azs);
+ return affinityNodeMap;
}
- /**
- * Resolves the AZ of a node and takes care of nodes that have no defined
AZ in system property
- * {@link AffinityPlacementConfig#AVAILABILITY_ZONE_SYSPROP} to then
return {@link
- * AffinityPlacementConfig#UNDEFINED_AVAILABILITY_ZONE} as the AZ name.
- */
- private String getNodeAZ(Node n, final AttributeValues attrValues) {
- Optional<String> nodeAz =
- attrValues.getSystemProperty(n,
AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP);
- // All nodes with undefined AZ will be considered part of the same AZ.
This also works for
- // deployments that do not care about AZ's
- return
nodeAz.orElse(AffinityPlacementConfig.UNDEFINED_AVAILABILITY_ZONE);
- }
-
- /**
- * This class captures an availability zone and the nodes that are
legitimate targets for
- * replica placement in that Availability Zone. Instances are used as
values in a {@link
- * java.util.TreeMap} in which the total number of already existing
replicas in the AZ is the
- * key. This allows easily picking the set of nodes from which to select a
node for placement in
- * order to balance the number of replicas per AZ. Picking one of the
nodes from the set is done
- * using different criteria unrelated to the Availability Zone (picking
the node is based on the
- * {@link CoresAndDiskComparator} ordering).
- */
- private static class AzWithNodes {
- final String azName;
- private final boolean useSpreadDomains;
- private boolean listIsSorted = false;
- private final Comparator<Node> nodeComparator;
- private final Random random;
- private final List<Node> availableNodesForPlacement;
- private final AttributeValues attributeValues;
- private final ReplicaMetrics leaderMetrics;
- private TreeSet<SpreadDomainWithNodes> sortedSpreadDomains;
- private final Map<String, Integer> currentSpreadDomainUsageUsage;
- private int numNodesForPlacement;
-
- AzWithNodes(
- String azName,
- List<Node> availableNodesForPlacement,
- boolean useSpreadDomains,
- Comparator<Node> nodeComparator,
- Random random,
- AttributeValues attributeValues,
- ReplicaMetrics leaderMetrics,
- Map<String, Integer> currentSpreadDomainUsageUsage) {
- this.azName = azName;
- this.availableNodesForPlacement = availableNodesForPlacement;
- this.useSpreadDomains = useSpreadDomains;
- this.nodeComparator = nodeComparator;
- this.random = random;
- this.attributeValues = attributeValues;
- this.leaderMetrics = leaderMetrics;
- this.currentSpreadDomainUsageUsage = currentSpreadDomainUsageUsage;
- this.numNodesForPlacement = availableNodesForPlacement.size();
- }
-
- private boolean hasBeenSorted() {
- return (useSpreadDomains && sortedSpreadDomains != null)
- || (!useSpreadDomains && listIsSorted);
- }
-
- void ensureSorted() {
- if (!hasBeenSorted()) {
- sort();
- }
+ AffinityNode newNodeFromMetrics(
+ Node node,
+ AttributeValues attrValues,
+ AffinityPlacementContext affinityPlacementContext,
+ boolean skipNodesWithErrors)
+ throws PlacementException {
+ Set<Replica.ReplicaType> supportedReplicaTypes =
+ attrValues.getSystemProperty(node,
AffinityPlacementConfig.REPLICA_TYPE_SYSPROP).stream()
+ .flatMap(s -> Arrays.stream(s.split(",")))
+ .map(String::trim)
+ .map(s -> s.toUpperCase(Locale.ROOT))
+ .map(
+ s -> {
+ try {
+ return Replica.ReplicaType.valueOf(s);
+ } catch (IllegalArgumentException e) {
+ log.warn(
+ "Node {} has an invalid value for the {}
systemProperty: {}",
+ node.getName(),
+ AffinityPlacementConfig.REPLICA_TYPE_SYSPROP,
+ s);
+ return null;
+ }
+ })
+ .collect(Collectors.toSet());
+ if (supportedReplicaTypes.isEmpty()) {
+ // If property not defined or is only whitespace on a node, assuming
node can take any
+ // replica type
+ supportedReplicaTypes = Set.of(Replica.ReplicaType.values());
}
- private void sort() {
- assert !listIsSorted && sortedSpreadDomains == null
- : "We shouldn't be sorting this list again";
-
- // Make sure we do not tend to use always the same nodes (within an
AZ) if all
- // conditions are identical (well, this likely is not the case since
after having added
- // a replica to a node its number of cores increases for the next
placement decision,
- // but let's be defensive here, given that multiple concurrent
placement decisions might
- // see the same initial cluster state, and we want placement to be
reasonable even in
- // that case without creating an unnecessary imbalance). For example,
if all nodes have
- // 0 cores and same amount of free disk space, ideally we want to pick
a random node for
- // placement, not always the same one due to some internal ordering.
- Collections.shuffle(availableNodesForPlacement, random);
-
- if (useSpreadDomains) {
- // When we use spread domains, we don't just sort the list of nodes,
instead we generate a
- // TreeSet of SpreadDomainWithNodes,
- // sorted by the number of times the domain has been used. Each
- // SpreadDomainWithNodes internally contains the list of nodes that
belong to that
- // particular domain,
- // and it's sorted internally by the comparator passed to this
- // class (which is the same that's used when not using spread
domains).
- // Whenever a node from a particular SpreadDomainWithNodes is
selected as the best
- // candidate, the call to "removeBestNode" will:
- // 1. Remove the SpreadDomainWithNodes instance from the TreeSet
- // 2. Remove the best node from the list within the
SpreadDomainWithNodes
- // 3. Increment the count of times the domain has been used
- // 4. Re-add the SpreadDomainWithNodes instance to the TreeSet if
there are still nodes
- // available
- HashMap<String, List<Node>> spreadDomainToListOfNodesMap = new
HashMap<>();
- for (Node node : availableNodesForPlacement) {
- spreadDomainToListOfNodesMap
- .computeIfAbsent(
- attributeValues
- .getSystemProperty(node,
AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
- .get(),
- k -> new ArrayList<>())
- .add(node);
- }
- sortedSpreadDomains =
- new TreeSet<>(new
SpreadDomainComparator(currentSpreadDomainUsageUsage));
-
- int i = 0;
- for (Map.Entry<String, List<Node>> entry :
spreadDomainToListOfNodesMap.entrySet()) {
- // Sort the nodes within the spread domain by the provided
comparator
- entry.getValue().sort(nodeComparator);
- sortedSpreadDomains.add(
- new SpreadDomainWithNodes(entry.getKey(), entry.getValue(),
i++, nodeComparator));
+ Set<String> nodeType;
+ Optional<String> nodePropOpt =
+ attrValues.getSystemProperty(node,
AffinityPlacementConfig.NODE_TYPE_SYSPROP);
+ if (nodePropOpt.isEmpty()) {
+ nodeType = Collections.emptySet();
+ } else {
+ nodeType = new HashSet<>(StrUtils.splitSmart(nodePropOpt.get(), ','));
+ }
+
+ Optional<Double> nodeFreeDiskGB =
+ attrValues.getNodeMetric(node, BuiltInMetrics.NODE_FREE_DISK_GB);
+ Optional<Integer> nodeNumCores =
+ attrValues.getNodeMetric(node, BuiltInMetrics.NODE_NUM_CORES);
+ String az =
+ attrValues
+ .getSystemProperty(node,
AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP)
+ .orElse(AffinityPlacementConfig.UNDEFINED_AVAILABILITY_ZONE);
+ affinityPlacementContext.allAvailabilityZones.add(az);
+ String spreadDomain;
+ if (affinityPlacementContext.doSpreadAcrossDomains) {
+ spreadDomain =
+ attrValues
+ .getSystemProperty(node,
AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
+ .orElse(null);
+ if (spreadDomain == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(
+ "AffinityPlacementPlugin configured to spread across domains,
but node {} does not have the {} system property. Ignoring
spreadAcrossDomains.",
+ node.getName(),
+ AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
}
+ affinityPlacementContext.doSpreadAcrossDomains = false;
+ affinityPlacementContext.allSpreadDomains.clear();
} else {
- availableNodesForPlacement.sort(nodeComparator);
- listIsSorted = true;
+ affinityPlacementContext.allSpreadDomains.add(spreadDomain);
}
+ } else {
+ spreadDomain = null;
}
-
- Node getBestNode() {
- assert hasBeenSorted();
- if (useSpreadDomains) {
- return sortedSpreadDomains.first().sortedNodesForPlacement.get(0);
- } else {
- return availableNodesForPlacement.get(0);
+ if (nodeFreeDiskGB.isEmpty() && skipNodesWithErrors) {
+ if (log.isWarnEnabled()) {
+ log.warn(
+ "Unknown free disk on node {}, excluding it from placement
decisions.",
+ node.getName());
}
- }
-
- public Node removeBestNode() {
- assert hasBeenSorted();
- this.numNodesForPlacement--;
- Node n;
- if (useSpreadDomains) {
- // Since this SpreadDomainWithNodes needs to be re-sorted in the
sortedSpreadDomains, we
- // remove it and then re-add it, once the best node has been removed.
- SpreadDomainWithNodes group = sortedSpreadDomains.pollFirst();
- n = group.sortedNodesForPlacement.remove(0);
- this.currentSpreadDomainUsageUsage.merge(group.spreadDomainName, 1,
Integer::sum);
- if (!group.sortedNodesForPlacement.isEmpty()) {
- sortedSpreadDomains.add(group);
- }
- } else {
- n = availableNodesForPlacement.remove(0);
+ return null;
+ } else if (nodeNumCores.isEmpty() && skipNodesWithErrors) {
+ if (log.isWarnEnabled()) {
+ log.warn(
+ "Unknown number of cores on node {}, excluding it from placement
decisions.",
+ node.getName());
}
- Optional.ofNullable(leaderMetrics)
- .flatMap(lrm ->
lrm.getReplicaMetric(BuiltInMetrics.REPLICA_INDEX_SIZE_GB))
- .ifPresent(
- indexSize ->
- attributeValues.decreaseNodeMetric(
- n, BuiltInMetrics.NODE_FREE_DISK_GB, indexSize));
- attributeValues.increaseNodeMetric(n, BuiltInMetrics.NODE_NUM_CORES,
1);
- return n;
- }
-
- public int numNodes() {
- return this.numNodesForPlacement;
+ return null;
+ } else {
+ return new AffinityNode(
+ node,
+ attrValues,
+ affinityPlacementContext,
+ supportedReplicaTypes,
+ nodeType,
+ nodeNumCores.orElse(0),
+ nodeFreeDiskGB.orElse(0D),
+ az,
+ spreadDomain);
}
}
- /**
- * This class represents group of nodes with the same {@link
- * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label.
- */
- static class SpreadDomainWithNodes implements
Comparable<SpreadDomainWithNodes> {
+ private class AffinityNode extends WeightedNode {
- /**
- * This is the label that all nodes in this group have in {@link
- * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label.
- */
- final String spreadDomainName;
+ private final AttributeValues attrValues;
- /**
- * The list of all nodes that contain the same {@link
- * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label. They must be
sorted before creating
- * this class.
- */
- private final List<Node> sortedNodesForPlacement;
+ private final AffinityPlacementContext affinityPlacementContext;
- /**
- * This is used for tie breaking the sort of {@link
SpreadDomainWithNodes}, when the
- * nodeComparator between the top nodes of each group return 0.
- */
- private final int tieBreaker;
+ private final Set<Replica.ReplicaType> supportedReplicaTypes;
+ private final Set<String> nodeType;
- /**
- * This is the comparator that is used to compare the top nodes in the
{@link
- * #sortedNodesForPlacement} lists. Must be the same that was used to
sort {@link
- * #sortedNodesForPlacement}.
- */
- private final Comparator<Node> nodeComparator;
+ private int coresOnNode;
+ private double nodeFreeDiskGB;
- public SpreadDomainWithNodes(
- String spreadDomainName,
- List<Node> sortedNodesForPlacement,
- int tieBreaker,
- Comparator<Node> nodeComparator) {
- this.spreadDomainName = spreadDomainName;
- this.sortedNodesForPlacement = sortedNodesForPlacement;
- this.tieBreaker = tieBreaker;
- this.nodeComparator = nodeComparator;
+ private final String availabilityZone;
+ private final String spreadDomain;
+
+ AffinityNode(
+ Node node,
+ AttributeValues attrValues,
+ AffinityPlacementContext affinityPlacementContext,
+ Set<Replica.ReplicaType> supportedReplicaTypes,
+ Set<String> nodeType,
+ int coresOnNode,
+ double nodeFreeDiskGB,
+ String az,
+ String spreadDomain) {
+ super(node);
+ this.attrValues = attrValues;
+ this.affinityPlacementContext = affinityPlacementContext;
+ this.supportedReplicaTypes = supportedReplicaTypes;
+ this.nodeType = nodeType;
+ this.coresOnNode = coresOnNode;
+ this.nodeFreeDiskGB = nodeFreeDiskGB;
+ this.availabilityZone = az;
+ this.spreadDomain = spreadDomain;
}
@Override
- public int compareTo(SpreadDomainWithNodes o) {
- if (o == this) {
- return 0;
- }
- int result =
- nodeComparator.compare(
- this.sortedNodesForPlacement.get(0),
o.sortedNodesForPlacement.get(0));
- if (result == 0) {
- return Integer.compare(this.tieBreaker, o.tieBreaker);
- }
- return result;
+ public int calcWeight() {
+ return coresOnNode
+ + 100 * (prioritizedFreeDiskGB > 0 && nodeFreeDiskGB <
prioritizedFreeDiskGB ? 1 : 0)
+ + 10000 * getSpreadDomainWeight()
+ + 1000000 * getAZWeight();
}
- }
-
- /**
- * Builds the number of existing cores on each node returned in the
attrValues. Nodes for which
- * the number of cores is not available for whatever reason are excluded
from acceptable
- * candidate nodes as it would not be possible to make any meaningful
placement decisions.
- *
- * @param nodes all nodes on which this plugin should compute placement
- * @param attrValues attributes fetched for the nodes. This method uses
system property {@link
- * AffinityPlacementConfig#REPLICA_TYPE_SYSPROP} as well as the number
of cores on each
- * node.
- */
- private Map<Node, Integer> getCoreCountPerNode(
- Set<Node> nodes, final AttributeValues attrValues) {
- Map<Node, Integer> coresOnNodes = new HashMap<>();
- for (Node node : nodes) {
- attrValues
- .getNodeMetric(node, BuiltInMetrics.NODE_NUM_CORES)
- .ifPresent(count -> coresOnNodes.put(node, count));
+ @Override
+ public int calcRelevantWeightWithReplica(Replica replica) {
+ return coresOnNode
+ + 100
+ * (prioritizedFreeDiskGB > 0
+ && nodeFreeDiskGB - getProjectedSizeOfReplica(replica)
+ < prioritizedFreeDiskGB
+ ? 1
+ : 0)
+ + 10000 * projectReplicaSpreadWeight(replica)
+ + 1000000 * projectAZWeight(replica);
}
- return coresOnNodes;
- }
-
- /**
- * Given the set of all nodes on which to do placement and fetched
attributes, builds the sets
- * representing candidate nodes for placement of replicas of each replica
type. These sets are
- * packaged and returned in an EnumMap keyed by replica type. Nodes for
which the number of
- * cores is not available for whatever reason are excluded from acceptable
candidate nodes as it
- * would not be possible to make any meaningful placement decisions.
- *
- * @param nodes all nodes on which this plugin should compute placement
- * @param attrValues attributes fetched for the nodes. This method uses
system property {@link
- * AffinityPlacementConfig#REPLICA_TYPE_SYSPROP} as well as the number
of cores on each
- * node.
- */
- private EnumMap<Replica.ReplicaType, Set<Node>>
getAvailableNodesForReplicaTypes(
- Set<Node> nodes, final AttributeValues attrValues, final
ReplicaMetrics leaderMetrics) {
- EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes =
- new EnumMap<>(Replica.ReplicaType.class);
-
- for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
- replicaTypeToNodes.put(replicaType, new HashSet<>());
+ @Override
+ public boolean canAddReplica(Replica replica) {
+ String collection = replica.getShard().getCollection().getName();
+ return
+ // By default, do not allow two replicas of the same shard on a node
+ super.canAddReplica(replica)
+ && supportedReplicaTypes.contains(replica.getType())
+ && Optional.ofNullable(nodeTypes.get(collection))
+ .map(s -> s.stream().anyMatch(nodeType::contains))
+ .orElse(true)
+ && Optional.ofNullable(withCollections.get(collection))
+ .map(this::hasCollectionOnNode)
+ .orElse(true)
+ && (minimalFreeDiskGB <= 0
+ || nodeFreeDiskGB - getProjectedSizeOfReplica(replica) >
minimalFreeDiskGB);
}
- for (Node node : nodes) {
- // Exclude nodes with unknown or too small disk free space
- Optional<Double> nodeFreeDiskGB =
- attrValues.getNodeMetric(node, BuiltInMetrics.NODE_FREE_DISK_GB);
- if (nodeFreeDiskGB.isEmpty()) {
- if (log.isWarnEnabled()) {
- log.warn(
- "Unknown free disk on node {}, excluding it from placement
decisions.",
- node.getName());
- }
- // We rely later on the fact that the free disk optional is present
(see
- // CoresAndDiskComparator), be careful it you change anything here.
- continue;
- }
- double replicaIndexSize =
- Optional.ofNullable(leaderMetrics)
- .flatMap(lm ->
lm.getReplicaMetric(BuiltInMetrics.REPLICA_INDEX_SIZE_GB))
- .orElse(0D);
- double projectedFreeDiskIfPlaced =
- BuiltInMetrics.NODE_FREE_DISK_GB.decrease(nodeFreeDiskGB.get(),
replicaIndexSize);
- if (projectedFreeDiskIfPlaced < minimalFreeDiskGB) {
- if (log.isWarnEnabled()) {
- log.warn(
- "Node {} free disk ({}GB) minus the projected replica size
({}GB) is lower than configured"
- + " minimum {}GB, excluding it from placement decisions.",
- node.getName(),
- nodeFreeDiskGB.get(),
- replicaIndexSize,
- minimalFreeDiskGB);
- }
- continue;
- }
-
- if (attrValues.getNodeMetric(node,
BuiltInMetrics.NODE_NUM_CORES).isEmpty()) {
- if (log.isWarnEnabled()) {
- log.warn(
- "Unknown number of cores on node {}, excluding it from
placement decisions.",
- node.getName());
+ @Override
+ public Map<Replica, String> canRemoveReplicas(Collection<Replica>
replicas) {
+ Map<Replica, String> replicaRemovalExceptions = new HashMap<>();
+ Map<String, Map<String, Set<Replica>>> removals = new HashMap<>();
+ for (Replica replica : replicas) {
+ SolrCollection collection = replica.getShard().getCollection();
+ Set<String> collocatedCollections = new HashSet<>();
+ Optional.ofNullable(collocatedWith.get(collection.getName()))
+ .ifPresent(collocatedCollections::addAll);
+ collocatedCollections.retainAll(getCollectionsOnNode());
+ if (collocatedCollections.isEmpty()) {
+ continue;
}
- // We rely later on the fact that the number of cores optional is
present (see
- // CoresAndDiskComparator), be careful it you change anything here.
- continue;
- }
- String supportedReplicaTypes =
- attrValues
- .getSystemProperty(node,
AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
- .isPresent()
- ? attrValues
- .getSystemProperty(node,
AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
- .get()
- : null;
- // If property not defined or is only whitespace on a node, assuming
node can take any
- // replica type
- if (supportedReplicaTypes == null || supportedReplicaTypes.isBlank()) {
- for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
- replicaTypeToNodes.get(rt).add(node);
- }
- } else {
- Set<String> acceptedTypes =
- Arrays.stream(supportedReplicaTypes.split(","))
- .map(String::trim)
- .map(s -> s.toLowerCase(Locale.ROOT))
- .collect(Collectors.toSet());
- for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
- if (acceptedTypes.contains(rt.name().toLowerCase(Locale.ROOT))) {
- replicaTypeToNodes.get(rt).add(node);
- }
+ // There are collocatedCollections for this shard, so make sure
there is a replica of this
+ // shard left on the node after it is removed
+ Set<Replica> replicasRemovedForShard =
+ removals
+ .computeIfAbsent(
+ replica.getShard().getCollection().getName(), k -> new
HashMap<>())
+ .computeIfAbsent(replica.getShard().getShardName(), k -> new
HashSet<>());
+ replicasRemovedForShard.add(replica);
+
+ if (replicasRemovedForShard.size()
+ >= getReplicasForShardOnNode(replica.getShard()).size()) {
+ replicaRemovalExceptions.put(
+ replica, "co-located with replicas of " +
collocatedCollections);
}
}
+ return replicaRemovalExceptions;
}
- return replicaTypeToNodes;
- }
- /**
- * Picks nodes from {@code targetNodes} for placing {@code numReplicas}
replicas.
- *
- * <p>The criteria used in this method are, in this order:
- *
- * <ol>
- * <li>No more than one replica of a given shard on a given node
(strictly enforced)
- * <li>Balance as much as possible replicas of a given {@link
- * org.apache.solr.cluster.Replica.ReplicaType} over available AZ's.
This balancing takes
- * into account existing replicas <b>of the corresponding replica
type</b>, if any.
- * <li>Place replicas if possible on nodes having more than a certain
amount of free disk
- * space (note that nodes with a too small amount of free disk space
were eliminated as
- * placement targets earlier, in {@link
#getAvailableNodesForReplicaTypes(Set,
- * AttributeValues, ReplicaMetrics)}). There's a threshold here
rather than sorting on the
- * amount of free disk space, because sorting on that value would in
practice lead to
- * never considering the number of cores on a node.
- * <li>Place replicas on nodes having a smaller number of cores (the
number of cores
- * considered for this decision includes previous placement
decisions made during the
- * processing of the placement request)
- * </ol>
- */
- @SuppressForbidden(
- reason =
- "Ordering.arbitrary() has no equivalent in Comparator class.
Rather reuse than copy.")
- private void makePlacementDecisions(
- SolrCollection solrCollection,
- String shardName,
- Set<String> availabilityZones,
- Replica.ReplicaType replicaType,
- int numReplicas,
- final AttributeValues attrValues,
- final ReplicaMetrics leaderMetrics,
- EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes,
- Set<Node> nodesWithReplicas,
- Map<Node, Integer> coresOnNodes,
- PlacementPlanFactory placementPlanFactory,
- Set<ReplicaPlacement> replicaPlacements,
- boolean doSpreadAcrossDomains)
- throws PlacementException {
- // Count existing replicas per AZ. We count only instances of the type
of replica for which we
- // need to do placement. If we ever want to balance replicas of any type
across AZ's (and not
- // each replica type balanced independently), we'd have to move this
data structure to the
- // caller of this method so it can be reused across different replica
type placements for a
- // given shard. Note then that this change would be risky. For example
all NRT's and PULL
- // replicas for a shard my be correctly balanced over three AZ's, but
then all NRT can end up
- // in the same AZ...
- Map<String, Integer> azToNumReplicas = new HashMap<>();
- for (String az : availabilityZones) {
- azToNumReplicas.put(az, 0);
+ @Override
+ protected boolean addProjectedReplicaWeights(Replica replica) {
+ nodeFreeDiskGB -= getProjectedSizeOfReplica(replica);
+ coresOnNode += 1;
+ return addReplicaToAzAndSpread(replica);
}
- // Build the set of candidate nodes for the placement, i.e. nodes that
can accept the replica
- // type
- Set<Node> candidateNodes = new
HashSet<>(replicaTypeToNodes.get(replicaType));
- // Remove nodes that already have a replica for the shard (no two
replicas of same shard can
- // be put on same node)
- candidateNodes.removeAll(nodesWithReplicas);
+ @Override
+ protected void initReplicaWeights(Replica replica) {
+ addReplicaToAzAndSpread(replica);
+ }
- // This Map will include the affinity labels for the nodes that are
currently hosting replicas
- // of this shard. It will be modified with new placement decisions.
- Map<String, Integer> spreadDomainsInUse = new HashMap<>();
- Shard shard = solrCollection.getShard(shardName);
- if (shard != null) {
- // shard is non null if we're adding replicas to an already existing
collection.
- // If we're creating the collection, the shards do not exist yet.
- for (Replica replica : shard.replicas()) {
- // The node's AZ is counted as having a replica if it has a replica
of the same type as
- // the one we need to place here.
- if (replica.getType() == replicaType) {
- final String az = getNodeAZ(replica.getNode(), attrValues);
- if (azToNumReplicas.containsKey(az)) {
- // We do not count replicas on AZ's for which we don't have any
node to place on
- // because it would not help the placement decision. If we did
want to do that, note
- // the dereferencing below can't be assumed as the entry will
not exist in the map.
- azToNumReplicas.put(az, azToNumReplicas.get(az) + 1);
- }
- if (doSpreadAcrossDomains) {
- attrValues
- .getSystemProperty(
- replica.getNode(),
AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
- .ifPresent(nodeDomain ->
spreadDomainsInUse.merge(nodeDomain, 1, Integer::sum));
- }
- }
+ private boolean addReplicaToAzAndSpread(Replica replica) {
+ boolean needsResort = false;
+ needsResort |=
Review Comment:
Instead, I only calculated the azSpread if there are more than one
availabilityZone.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]