Ethanlm commented on a change in pull request #3379:
URL: https://github.com/apache/storm/pull/3379#discussion_r580371328



##########
File path: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
##########
@@ -384,14 +391,29 @@ public double 
calculateMinPercentageUsedBy(NormalizedResources used, double tota
 
     /**
      * If a node or rack has a kind of resource not in a request, make that 
resource negative so when sorting that node or rack will
-     * be less likely to be selected.
+     * be less likely to be selected. If the resource is in the request, make 
that resource positive.
      * @param request the requested resources.
      */
     public void updateForRareResourceAffinity(NormalizedResources request) {
         int length = Math.min(this.otherResources.length, 
request.otherResources.length);
         for (int i = 0; i < length; i++) {
-            if (request.getResourceAt(i) == 0.0) {
-                this.otherResources[i] = -1 * this.otherResources[i];
+            if (request.getResourceAt(i) == 0.0 && this.otherResources[i] > 
0.0) {
+                this.otherResources[i] = -this.otherResources[i]; // make 
negative
+            } else if (request.getResourceAt(i) > 0.0 && 
this.otherResources[i] < 0.0) {
+                this.otherResources[i] = -this.otherResources[i]; // make 
positive
+            }
+        }
+    }
+
+    /**
+     * If the resource is negative, then make that resource positive.
+     * This will undo effects of {@link 
#updateForRareResourceAffinity(NormalizedResources)}.
+     */
+    public void revertUpdatesForRareResourceAffinity() {

Review comment:
       Can we remove this? It doesn't seem to be used anywhere 

##########
File path: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
##########
@@ -384,14 +391,29 @@ public double 
calculateMinPercentageUsedBy(NormalizedResources used, double tota
 
     /**
      * If a node or rack has a kind of resource not in a request, make that 
resource negative so when sorting that node or rack will
-     * be less likely to be selected.
+     * be less likely to be selected. If the resource is in the request, make 
that resource positive.
      * @param request the requested resources.
      */
     public void updateForRareResourceAffinity(NormalizedResources request) {
         int length = Math.min(this.otherResources.length, 
request.otherResources.length);
         for (int i = 0; i < length; i++) {
-            if (request.getResourceAt(i) == 0.0) {
-                this.otherResources[i] = -1 * this.otherResources[i];
+            if (request.getResourceAt(i) == 0.0 && this.otherResources[i] > 
0.0) {
+                this.otherResources[i] = -this.otherResources[i]; // make 
negative
+            } else if (request.getResourceAt(i) > 0.0 && 
this.otherResources[i] < 0.0) {

Review comment:
       Can you please help me understand why is this change necessary? Did you 
find some bugs here? Thanks

##########
File path: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
##########
@@ -0,0 +1,768 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nonnull;
+import org.apache.storm.Config;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RasNode;
+import org.apache.storm.scheduler.resource.RasNodes;
+import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesSummary;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NodeSorterHostProximity implements INodeSorter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(NodeSorterHostProximity.class);
+
+    // instance variables from class instantiation
+    protected final BaseResourceAwareStrategy.NodeSortType nodeSortType;
+
+    protected Cluster cluster;
+    protected TopologyDetails topologyDetails;
+
+    // Instance variables derived from Cluster.
+    private final Map<String, List<String>> networkTopography;
+    private final Map<String, String> superIdToRack = new HashMap<>();
+    private final Map<String, List<RasNode>> hostnameToNodes = new HashMap<>();
+    private final Map<String, String> nodeIdToHostname = new HashMap<>();
+    private final Map<String, Set<String>> rackIdToHosts;
+    protected List<String> greyListedSupervisorIds;
+
+    // Instance variables from Cluster and TopologyDetails.
+    protected List<String> favoredNodeIds;
+    protected List<String> unFavoredNodeIds;
+
+    // Updated in prepare method
+    ExecutorDetails exec;
+
+    public NodeSorterHostProximity(Cluster cluster, TopologyDetails 
topologyDetails) {
+        this(cluster, topologyDetails, 
BaseResourceAwareStrategy.NodeSortType.COMMON);
+    }
+
+    /**
+     * Initialize for the default implementation node sorting.
+     *
+     * <p>
+     *  <li>{@link BaseResourceAwareStrategy.NodeSortType#GENERIC_RAS} sorting 
implemented in
+     *  {@link #sortObjectResourcesGeneric(ObjectResourcesSummary, 
ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+     *  <li>{@link BaseResourceAwareStrategy.NodeSortType#DEFAULT_RAS} sorting 
implemented in
+     *  {@link #sortObjectResourcesDefault(ObjectResourcesSummary, 
NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+     *  <li>{@link BaseResourceAwareStrategy.NodeSortType#COMMON} sorting 
implemented in
+     *  {@link #sortObjectResourcesCommon(ObjectResourcesSummary, 
ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+     * </p>
+     *
+     * @param cluster for which nodes will be sorted.
+     * @param topologyDetails the topology to sort for.
+     * @param nodeSortType type of sorting to be applied to object resource 
collection {@link BaseResourceAwareStrategy.NodeSortType}.
+     */
+    public NodeSorterHostProximity(Cluster cluster, TopologyDetails 
topologyDetails, BaseResourceAwareStrategy.NodeSortType nodeSortType) {
+        this.cluster = cluster;
+        this.topologyDetails = topologyDetails;
+        this.nodeSortType = nodeSortType;
+
+        // from Cluster
+        networkTopography = cluster.getNetworkTopography();
+        greyListedSupervisorIds = cluster.getGreyListedSupervisors();
+        Map<String, String> hostToRack = cluster.getHostToRack();
+        RasNodes nodes = new RasNodes(cluster);
+        for (RasNode node: nodes.getNodes()) {
+            String superId = node.getId();
+            String hostName = node.getHostname();
+            if (hostName == null) {
+                // ignore supervisors on blacklistedHosts
+                continue;
+            }
+            String rackId = hostToRack.getOrDefault(hostName, 
DNSToSwitchMapping.DEFAULT_RACK);
+            superIdToRack.put(superId, rackId);
+            hostnameToNodes.computeIfAbsent(hostName, (hn) -> new 
ArrayList<>()).add(node);
+            nodeIdToHostname.put(superId, hostName);
+        }
+        rackIdToHosts = computeRackToHosts(cluster, superIdToRack);
+
+        // from TopologyDetails
+        Map<String, Object> topoConf = topologyDetails.getConf();
+
+        // From Cluster and TopologyDetails - and cleaned-up
+        favoredNodeIds = makeHostToNodeIds((List<String>) 
topoConf.get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
+        unFavoredNodeIds = makeHostToNodeIds((List<String>) 
topoConf.get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
+        favoredNodeIds.removeAll(greyListedSupervisorIds);
+        unFavoredNodeIds.removeAll(greyListedSupervisorIds);
+        unFavoredNodeIds.removeAll(favoredNodeIds);
+    }
+
+    @VisibleForTesting
+    public Map<String, Set<String>> getRackIdToHosts() {
+        return rackIdToHosts;
+    }
+
+    @Override
+    public void prepare(ExecutorDetails exec) {
+        this.exec = exec;
+    }
+
+    /**
+     * Get a key corresponding to this executor resource request. The key 
contains all non-zero resources requested by
+     * the executor.
+     *
+     * <p>This key can be used to retrieve pre-sorted list of 
racks/hosts/nodes. So executors with similar set of
+     * rare resource request will be reuse the same cached list instead of 
creating a new sorted list of nodes.</p>
+     *
+     * @param exec executor whose requested resources are being examined.
+     * @return a set of resource names that have values greater that zero.
+     */
+    public Set<String> getExecRequestKey(@Nonnull ExecutorDetails exec) {

Review comment:
       Seems not used anywhere. Can we delete it?

##########
File path: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
##########
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nonnull;
+import org.apache.storm.Config;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RasNode;
+import org.apache.storm.scheduler.resource.RasNodes;
+import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesSummary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NodeSorterHostProximity implements INodeSorter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(NodeSorterHostProximity.class);
+
+    // instance variables from class instantiation
+    protected final BaseResourceAwareStrategy.NodeSortType nodeSortType;
+
+    protected Cluster cluster;
+    protected TopologyDetails topologyDetails;
+
+    // Instance variables derived from Cluster.
+    private final Map<String, List<String>> networkTopography;
+    private final Map<String, String> superIdToRack = new HashMap<>();
+    private final Map<String, List<RasNode>> hostnameToNodes = new HashMap<>();
+    private final Map<String, String> nodeIdToHostname = new HashMap<>();
+    //private final Map<String, List<RasNode>> rackIdToNodes = new HashMap<>();
+    private final Map<String, List<String>> rackIdToHosts = new HashMap<>();
+    protected List<String> greyListedSupervisorIds;
+
+    // Instance variables from Cluster and TopologyDetails.
+    protected List<String> favoredNodeIds;
+    protected List<String> unFavoredNodeIds;
+
+    // Updated in prepare method
+    ExecutorDetails exec;
+
+    public NodeSorterHostProximity(Cluster cluster, TopologyDetails 
topologyDetails) {
+        this(cluster, topologyDetails, 
BaseResourceAwareStrategy.NodeSortType.COMMON);
+    }
+
+    /**
+     * Initialize for the default implementation node sorting.
+     *
+     * <p>
+     *  <li>{@link BaseResourceAwareStrategy.NodeSortType#GENERIC_RAS} sorting 
implemented in
+     *  {@link #sortObjectResourcesGeneric(ObjectResourcesSummary, 
ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+     *  <li>{@link BaseResourceAwareStrategy.NodeSortType#DEFAULT_RAS} sorting 
implemented in
+     *  {@link #sortObjectResourcesDefault(ObjectResourcesSummary, 
NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+     *  <li>{@link BaseResourceAwareStrategy.NodeSortType#COMMON} sorting 
implemented in
+     *  {@link #sortObjectResourcesCommon(ObjectResourcesSummary, 
ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+     * </p>
+     *
+     * @param cluster for which nodes will be sorted.
+     * @param topologyDetails the topology to sort for.
+     * @param nodeSortType type of sorting to be applied to object resource 
collection {@link BaseResourceAwareStrategy.NodeSortType}.
+     */
+    public NodeSorterHostProximity(Cluster cluster, TopologyDetails 
topologyDetails, BaseResourceAwareStrategy.NodeSortType nodeSortType) {
+        this.cluster = cluster;
+        this.topologyDetails = topologyDetails;
+        this.nodeSortType = nodeSortType;
+
+        // from Cluster
+        networkTopography = cluster.getNetworkTopography();
+        Map<String, String> hostToRack = cluster.getHostToRack();
+        RasNodes nodes = new RasNodes(cluster);
+        for (RasNode node: nodes.getNodes()) {
+            String superId = node.getId();
+            String hostName = node.getHostname();
+            String rackId = hostToRack.getOrDefault(hostName, 
DNSToSwitchMapping.DEFAULT_RACK);
+            superIdToRack.put(superId, rackId);
+            hostnameToNodes.computeIfAbsent(hostName, (hn) -> new 
ArrayList<>()).add(node);
+            nodeIdToHostname.put(superId, hostName);
+            rackIdToHosts.computeIfAbsent(rackId, id -> new 
ArrayList<>()).add(hostName);
+        }
+        this.greyListedSupervisorIds = cluster.getGreyListedSupervisors();
+
+        // from TopologyDetails
+        Map<String, Object> topoConf = topologyDetails.getConf();
+
+        // From Cluster and TopologyDetails - and cleaned-up
+        favoredNodeIds = makeHostToNodeIds((List<String>) 
topoConf.get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
+        unFavoredNodeIds = makeHostToNodeIds((List<String>) 
topoConf.get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
+        favoredNodeIds.removeAll(greyListedSupervisorIds);
+        unFavoredNodeIds.removeAll(greyListedSupervisorIds);
+        unFavoredNodeIds.removeAll(favoredNodeIds);
+    }
+
+    @Override
+    public void prepare(ExecutorDetails exec) {
+        this.exec = exec;
+    }
+
+    /**
+     * Get a key corresponding to this executor resource request. The key 
contains all non-zero resources requested by
+     * the executor.
+     *
+     * <p>This key can be used to retrieve pre-sorted list of 
racks/hosts/nodes. So executors with similar set of
+     * rare resource request will be reuse the same cached list instead of 
creating a new sorted list of nodes.</p>
+     *
+     * @param exec executor whose requested resources are being examined.
+     * @return a set of resource names that have values greater that zero.
+     */
+    public Set<String> getExecRequestKey(@Nonnull ExecutorDetails exec) {
+        NormalizedResourceRequest requestedResources = 
topologyDetails.getTotalResources(exec);
+        Set<String> retVal = new HashSet<>();
+        requestedResources.toNormalizedMap().entrySet().forEach(
+            e -> {
+                if (e.getValue() > 0.0) {
+                    retVal.add(e.getKey());
+                }
+            });
+        return retVal;
+    }
+
+    /**
+     * Scheduling uses {@link #sortAllNodes()} which eventually
+     * calls this method whose behavior can altered by setting {@link 
#nodeSortType}.
+     *
+     * @param resourcesSummary     contains all individual {@link 
ObjectResourcesItem} as well as cumulative stats
+     * @param exec                 executor for which the sorting is done
+     * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+     */
+    protected Iterable<ObjectResourcesItem> sortObjectResources(
+            ObjectResourcesSummary resourcesSummary, ExecutorDetails exec, 
ExistingScheduleFunc existingScheduleFunc) {
+        switch (nodeSortType) {
+            case DEFAULT_RAS:
+                return sortObjectResourcesDefault(resourcesSummary, 
existingScheduleFunc);
+            case GENERIC_RAS:
+                return sortObjectResourcesGeneric(resourcesSummary, exec, 
existingScheduleFunc);
+            case COMMON:
+                return sortObjectResourcesCommon(resourcesSummary, exec, 
existingScheduleFunc);
+            default:
+                return null;
+        }
+    }
+
+    /**
+     * Sort objects by the following three criteria.
+     *
+     * <li>
+     *     The number executors of the topology that needs to be scheduled is 
already on the object (node or rack)
+     *     in descending order. The reasoning to sort based on criterion 1 is 
so we schedule the rest of a topology on
+     *     the same object (node or rack) as the existing executors of the 
topology.
+     * </li>
+     *
+     * <li>
+     *     The subordinate/subservient resource availability percentage of a 
rack in descending order We calculate the
+     *     resource availability percentage by dividing the resource 
availability of the object (node or rack) by the
+     *     resource availability of the entire rack or cluster depending on if 
object references a node or a rack.
+     *     How this differs from the DefaultResourceAwareStrategy is that the 
percentage boosts the node or rack if it is
+     *     requested by the executor that the sorting is being done for and 
pulls it down if it is not.
+     *     By doing this calculation, objects (node or rack) that have 
exhausted or little of one of the resources mentioned
+     *     above will be ranked after racks that have more balanced resource 
availability and nodes or racks that have
+     *     resources that are not requested will be ranked below . So we will 
be less likely to pick a rack that
+     *     have a lot of one resource but a low amount of another and have a 
lot of resources that are not requested by the executor.
+     *     This is similar to logic used {@link 
#sortObjectResourcesGeneric(ObjectResourcesSummary, ExecutorDetails, 
ExistingScheduleFunc)}.
+     * </li>
+     *
+     * <li>
+     *     The tie between two nodes with same resource availability is broken 
by using the node with lower minimum
+     *     percentage used. This comparison was used in {@link 
#sortObjectResourcesDefault(ObjectResourcesSummary, ExistingScheduleFunc)}
+     *     but here it is made subservient to modified resource availbility 
used in
+     *     {@link #sortObjectResourcesGeneric(ObjectResourcesSummary, 
ExecutorDetails, ExistingScheduleFunc)}.
+     *
+     * </li>
+     *
+     * @param allResources         contains all individual ObjectResources as 
well as cumulative stats
+     * @param exec                 executor for which the sorting is done
+     * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+     */
+    private Iterable<ObjectResourcesItem> sortObjectResourcesCommon(
+            final ObjectResourcesSummary allResources, final ExecutorDetails 
exec,
+            final ExistingScheduleFunc existingScheduleFunc) {
+        // Copy and modify allResources
+        ObjectResourcesSummary affinityBasedAllResources = new 
ObjectResourcesSummary(allResources);
+        final NormalizedResourceOffer availableResourcesOverall = 
allResources.getAvailableResourcesOverall();
+        final NormalizedResourceRequest requestedResources = (exec != null) ? 
topologyDetails.getTotalResources(exec) : null;
+        affinityBasedAllResources.getObjectResources().forEach(
+            x -> {
+                if (requestedResources != null) {
+                    // negate unrequested resources
+                    
x.availableResources.updateForRareResourceAffinity(requestedResources);
+                }
+                x.minResourcePercent = 
availableResourcesOverall.calculateMinPercentageUsedBy(x.availableResources);
+                x.avgResourcePercent = 
availableResourcesOverall.calculateAveragePercentageUsedBy(x.availableResources);
+
+                LOG.trace("for {}: minResourcePercent={}, 
avgResourcePercent={}, numExistingSchedule={}",
+                        x.id, x.minResourcePercent, x.avgResourcePercent,
+                        existingScheduleFunc.getNumExistingSchedule(x.id));
+            }
+        );
+
+        // Use the following comparator to sort
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            if (o1.minResourcePercent > o2.minResourcePercent) {
+                return -1;
+            } else if (o1.minResourcePercent < o2.minResourcePercent) {
+                return 1;
+            }
+            double o1Avg = o1.avgResourcePercent;
+            double o2Avg = o2.avgResourcePercent;
+            if (o1Avg > o2Avg) {
+                return -1;
+            } else if (o1Avg < o2Avg) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
+        TreeSet<ObjectResourcesItem> sortedObjectResources = new 
TreeSet(comparator);
+        
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+        LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+        return sortedObjectResources;
+    }
+
+    /**
+     * Sort objects by the following two criteria.
+     *
+     * <li>the number executors of the topology that needs to be scheduled is 
already on the
+     * object (node or rack) in descending order. The reasoning to sort based 
on criterion 1 is so we schedule the rest
+     * of a topology on the same object (node or rack) as the existing 
executors of the topology.</li>
+     *
+     * <li>the subordinate/subservient resource availability percentage of a 
rack in descending order We calculate the
+     * resource availability percentage by dividing the resource availability 
of the object (node or rack) by the
+     * resource availability of the entire rack or cluster depending on if 
object references a node or a rack.
+     * How this differs from the DefaultResourceAwareStrategy is that the 
percentage boosts the node or rack if it is
+     * requested by the executor that the sorting is being done for and pulls 
it down if it is not.
+     * By doing this calculation, objects (node or rack) that have exhausted 
or little of one of the resources mentioned
+     * above will be ranked after racks that have more balanced resource 
availability and nodes or racks that have
+     * resources that are not requested will be ranked below . So we will be 
less likely to pick a rack that
+     * have a lot of one resource but a low amount of another and have a lot 
of resources that are not requested by the executor.</li>
+     *
+     * @param allResources         contains all individual ObjectResources as 
well as cumulative stats
+     * @param exec                 executor for which the sorting is done
+     * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+     */
+    @Deprecated
+    private Iterable<ObjectResourcesItem> sortObjectResourcesGeneric(
+            final ObjectResourcesSummary allResources, ExecutorDetails exec,
+            final ExistingScheduleFunc existingScheduleFunc) {
+        ObjectResourcesSummary affinityBasedAllResources = new 
ObjectResourcesSummary(allResources);
+        NormalizedResourceRequest requestedResources = 
topologyDetails.getTotalResources(exec);
+        affinityBasedAllResources.getObjectResources()
+                .forEach(x -> 
x.availableResources.updateForRareResourceAffinity(requestedResources));
+        final NormalizedResourceOffer availableResourcesOverall = 
allResources.getAvailableResourcesOverall();
+
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            double o1Avg = 
availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
+            double o2Avg = 
availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
+            if (o1Avg > o2Avg) {
+                return -1;
+            } else if (o1Avg < o2Avg) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
+        TreeSet<ObjectResourcesItem> sortedObjectResources = new 
TreeSet<>(comparator);
+        
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+        LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+        return sortedObjectResources;
+    }
+
+    /**
+     * Sort objects by the following two criteria.
+     *
+     * <li>the number executors of the topology that needs to be scheduled is 
already on the
+     * object (node or rack) in descending order. The reasoning to sort based 
on criterion 1 is so we schedule the rest
+     * of a topology on the same object (node or rack) as the existing 
executors of the topology.</li>
+     *
+     * <li>the subordinate/subservient resource availability percentage of a 
rack in descending order We calculate the
+     * resource availability percentage by dividing the resource availability 
of the object (node or rack) by the
+     * resource availability of the entire rack or cluster depending on if 
object references a node or a rack.
+     * By doing this calculation, objects (node or rack) that have exhausted 
or little of one of the resources mentioned
+     * above will be ranked after racks that have more balanced resource 
availability. So we will be less likely to pick
+     * a rack that have a lot of one resource but a low amount of another.</li>
+     *
+     * @param allResources         contains all individual ObjectResources as 
well as cumulative stats
+     * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+     */
+    @Deprecated
+    private Iterable<ObjectResourcesItem> sortObjectResourcesDefault(
+            final ObjectResourcesSummary allResources,
+            final ExistingScheduleFunc existingScheduleFunc) {
+
+        final NormalizedResourceOffer availableResourcesOverall = 
allResources.getAvailableResourcesOverall();
+        for (ObjectResourcesItem objectResources : 
allResources.getObjectResources()) {
+            objectResources.minResourcePercent =
+                    
availableResourcesOverall.calculateMinPercentageUsedBy(objectResources.availableResources);
+            objectResources.avgResourcePercent =
+                    
availableResourcesOverall.calculateAveragePercentageUsedBy(objectResources.availableResources);
+            LOG.trace("for {}: minResourcePercent={}, avgResourcePercent={}, 
numExistingSchedule={}",
+                    objectResources.id, objectResources.minResourcePercent, 
objectResources.avgResourcePercent,
+                    
existingScheduleFunc.getNumExistingSchedule(objectResources.id));
+        }
+
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            if (o1.minResourcePercent > o2.minResourcePercent) {
+                return -1;
+            } else if (o1.minResourcePercent < o2.minResourcePercent) {
+                return 1;
+            }
+            double diff = o1.avgResourcePercent - o2.avgResourcePercent;
+            if (diff > 0.0) {
+                return -1;
+            } else if (diff < 0.0) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
+        TreeSet<ObjectResourcesItem> sortedObjectResources = new 
TreeSet<>(comparator);
+        sortedObjectResources.addAll(allResources.getObjectResources());
+        LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+        return sortedObjectResources;
+    }
+
+    /**
+     * Nodes are sorted by two criteria.
+     *
+     * <p>1) the number executors of the topology that needs to be scheduled 
is already on the node in
+     * descending order. The reasoning to sort based on criterion 1 is so we 
schedule the rest of a topology on the same node as the
+     * existing executors of the topology.
+     *
+     * <p>2) the subordinate/subservient resource availability percentage of a 
node in descending
+     * order We calculate the resource availability percentage by dividing the 
resource availability that have exhausted or little of one of
+     * the resources mentioned above will be ranked after on the node by the 
resource availability of the entire rack By doing this
+     * calculation, nodes nodes that have more balanced resource availability. 
So we will be less likely to pick a node that have a lot of
+     * one resource but a low amount of another.
+     *
+     * @param availHosts a list of all the hosts we want to sort
+     * @param rackId     the rack id availNodes are a part of
+     * @return an iterable of sorted hosts.
+     */
+    private Iterable<ObjectResourcesItem> sortHosts(
+            List<String> availHosts, ExecutorDetails exec, String rackId,
+            Map<String, AtomicInteger> scheduledCount) {
+        ObjectResourcesSummary rackResourcesSummary = new 
ObjectResourcesSummary("RACK");
+        availHosts.forEach(h -> {
+            ObjectResourcesItem hostItem = new ObjectResourcesItem(h);
+            for (RasNode x : hostnameToNodes.get(h)) {
+                hostItem.add(new ObjectResourcesItem(x.getId(), 
x.getTotalAvailableResources(), x.getTotalResources(), 0, 0));
+            }
+            rackResourcesSummary.addObjectResourcesItem(hostItem);
+        });
+
+        LOG.debug(
+                "Rack {}: Overall Avail [ {} ] Total [ {} ]",
+                rackId,
+                rackResourcesSummary.getAvailableResourcesOverall(),
+                rackResourcesSummary.getTotalResourcesOverall());
+
+        return sortObjectResources(
+            rackResourcesSummary,
+            exec,
+            (hostId) -> {
+                AtomicInteger count = scheduledCount.get(hostId);
+                if (count == null) {
+                    return 0;
+                }
+                return count.get();
+            });
+    }
+
+    /**
+     * Nodes are sorted by two criteria.
+     *
+     * <p>1) the number executors of the topology that needs to be scheduled 
is already on the node in
+     * descending order. The reasoning to sort based on criterion 1 is so we 
schedule the rest of a topology on the same node as the
+     * existing executors of the topology.
+     *
+     * <p>2) the subordinate/subservient resource availability percentage of a 
node in descending
+     * order We calculate the resource availability percentage by dividing the 
resource availability that have exhausted or little of one of
+     * the resources mentioned above will be ranked after on the node by the 
resource availability of the entire rack By doing this
+     * calculation, nodes nodes that have more balanced resource availability. 
So we will be less likely to pick a node that have a lot of
+     * one resource but a low amount of another.
+     *
+     * @param availRasNodes a list of all the nodes we want to sort
+     * @param hostId     the host-id that availNodes are a part of
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem} for 
nodes.
+     */
+    private Iterable<ObjectResourcesItem> sortNodes(
+            List<RasNode> availRasNodes, ExecutorDetails exec, String hostId,
+            Map<String, AtomicInteger> scheduledCount) {
+        ObjectResourcesSummary hostResourcesSummary = new 
ObjectResourcesSummary("HOST");
+        availRasNodes.forEach(x ->
+                hostResourcesSummary.addObjectResourcesItem(
+                        new ObjectResourcesItem(x.getId(), 
x.getTotalAvailableResources(), x.getTotalResources(), 0, 0)
+                )
+        );
+
+        LOG.debug(
+            "Host {}: Overall Avail [ {} ] Total [ {} ]",
+            hostId,
+            hostResourcesSummary.getAvailableResourcesOverall(),
+            hostResourcesSummary.getTotalResourcesOverall());
+
+        return sortObjectResources(
+            hostResourcesSummary,
+            exec,
+            (superId) -> {
+                AtomicInteger count = scheduledCount.get(superId);
+                if (count == null) {
+                    return 0;
+                }
+                return count.get();
+            });
+    }
+
+    protected List<String> makeHostToNodeIds(List<String> hosts) {
+        if (hosts == null) {
+            return Collections.emptyList();
+        }
+        List<String> ret = new ArrayList<>(hosts.size());
+        for (String host: hosts) {
+            List<RasNode> nodes = hostnameToNodes.get(host);
+            if (nodes != null) {
+                for (RasNode node : nodes) {
+                    ret.add(node.getId());
+                }
+            }
+        }
+        return ret;
+    }
+
+    private class LazyNodeSortingIterator implements Iterator<String> {
+        private final LazyNodeSorting parent;
+        private final Iterator<ObjectResourcesItem> rackIterator;
+        private Iterator<ObjectResourcesItem> hostIterator;
+        private Iterator<ObjectResourcesItem> nodeIterator;
+        private String nextValueFromNode = null;
+        private final Iterator<String> pre;
+        private final Iterator<String> post;
+        private final Set<String> skip;
+
+        LazyNodeSortingIterator(LazyNodeSorting parent, 
Iterable<ObjectResourcesItem> sortedRacks) {
+            this.parent = parent;
+            rackIterator = sortedRacks.iterator();
+            pre = favoredNodeIds.iterator();
+            post = Stream.concat(unFavoredNodeIds.stream(), 
greyListedSupervisorIds.stream())
+                            .collect(Collectors.toList())
+                            .iterator();
+            skip = parent.skippedNodeIds;
+        }
+
+        private Iterator<ObjectResourcesItem> getNodeIterator() {
+            if (nodeIterator != null && nodeIterator.hasNext()) {
+                return nodeIterator;
+            }
+            //need to get the next host/node iterator
+            if (hostIterator != null && hostIterator.hasNext()) {
+                ObjectResourcesItem host = hostIterator.next();
+                final String hostId = host.id;
+                nodeIterator = parent.getSortedNodesForHost(hostId).iterator();
+                return nodeIterator;
+            }
+            if (rackIterator.hasNext()) {
+                ObjectResourcesItem rack = rackIterator.next();
+                final String rackId = rack.id;
+                hostIterator = parent.getSortedHostsForRack(rackId).iterator();
+                ObjectResourcesItem host = hostIterator.next();
+                final String hostId = host.id;
+                nodeIterator = parent.getSortedNodesForHost(hostId).iterator();
+                return nodeIterator;
+            }
+
+            return null;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (pre.hasNext()) {
+                return true;
+            }
+            if (nextValueFromNode != null) {
+                return true;
+            }
+            while (true) {
+                //For the node we don't know if we have another one unless we 
look at the contents
+                Iterator<ObjectResourcesItem> nodeIterator = getNodeIterator();
+                if (nodeIterator == null || !nodeIterator.hasNext()) {
+                    break;
+                }
+                String tmp = nodeIterator.next().id;
+                if (!skip.contains(tmp)) {
+                    nextValueFromNode = tmp;
+                    return true;
+                }
+            }
+            return post.hasNext();
+        }
+
+        @Override
+        public String next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            if (pre.hasNext()) {
+                return pre.next();
+            }
+            if (nextValueFromNode != null) {
+                String tmp = nextValueFromNode;
+                nextValueFromNode = null;
+                return tmp;
+            }
+            return post.next();
+        }
+    }
+
+    private class LazyNodeSorting implements Iterable<String> {
+        private final Map<String, AtomicInteger> perHostScheduledCount = new 
HashMap<>();
+        private final Map<String, AtomicInteger> perNodeScheduledCount = new 
HashMap<>();
+        private final Iterable<ObjectResourcesItem> sortedRacks;
+        private final Map<String, Iterable<ObjectResourcesItem>> cachedHosts = 
new HashMap<>();
+        private final Map<String, Iterable<ObjectResourcesItem>> 
cachedNodesByHost = new HashMap<>();
+        private final ExecutorDetails exec;
+        private final Set<String> skippedNodeIds = new HashSet<>();
+
+        LazyNodeSorting(ExecutorDetails exec) {
+            this.exec = exec;
+            skippedNodeIds.addAll(favoredNodeIds);
+            skippedNodeIds.addAll(unFavoredNodeIds);
+            skippedNodeIds.addAll(greyListedSupervisorIds);
+
+            String topoId = topologyDetails.getId();
+            SchedulerAssignment assignment = cluster.getAssignmentById(topoId);
+            if (assignment != null) {
+                for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
+                    assignment.getSlotToExecutors().entrySet()) {
+                    String superId = entry.getKey().getNodeId();
+                    String hostId = nodeIdToHostname.get(superId);
+                    perHostScheduledCount.computeIfAbsent(hostId, id -> new 
AtomicInteger(0))
+                        .getAndAdd(entry.getValue().size());
+                    perNodeScheduledCount.computeIfAbsent(superId, id -> new 
AtomicInteger(0))
+                        .getAndAdd(entry.getValue().size());
+                }
+            }
+            sortedRacks = getSortedRacks();
+        }
+
+        private Iterable<ObjectResourcesItem> getSortedHostsForRack(String 
rackId) {
+            return cachedHosts.computeIfAbsent(rackId,
+                id -> sortHosts(rackIdToHosts.getOrDefault(id, 
Collections.emptyList()), exec, id, perHostScheduledCount));
+        }
+
+        private Iterable<ObjectResourcesItem> getSortedNodesForHost(String 
hostId) {
+            return cachedNodesByHost.computeIfAbsent(hostId,
+                id -> sortNodes(hostnameToNodes.getOrDefault(id, 
Collections.emptyList()), exec, id, perNodeScheduledCount));
+        }
+
+        @Override
+        public Iterator<String> iterator() {
+            return new LazyNodeSortingIterator(this, sortedRacks);
+        }
+    }
+
+    @Override
+    public Iterable<String> sortAllNodes() {
+        //LOG.info("sortAllNodes():cachedLazyNodeIterators.size={}, 
execRequestResourceKey={}",
+        //            cachedLazyNodeIterators.size(), execRequestResourcesKey);
+        //return 
cachedLazyNodeIterators.computeIfAbsent(execRequestResourcesKey, k -> new 
LazyNodeSorting(exec));
+        return new LazyNodeSorting(exec);
+    }
+
+    private ObjectResourcesSummary createClusterSummarizedResources() {
+        ObjectResourcesSummary clusterResourcesSummary = new 
ObjectResourcesSummary("Cluster");
+
+        //This is the first time so initialize the resources.
+        for (Map.Entry<String, List<String>> entry : 
networkTopography.entrySet()) {
+            String rackId = entry.getKey();
+            List<String> nodeHosts = entry.getValue();
+            ObjectResourcesItem rack = new ObjectResourcesItem(rackId);
+            for (String nodeHost : nodeHosts) {
+                for (RasNode node : hostnameToNodes(nodeHost)) {
+                    
rack.availableResources.add(node.getTotalAvailableResources());
+                    rack.totalResources.add(node.getTotalAvailableResources());

Review comment:
       Should this be `node.getTotalResources()`?

##########
File path: 
storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
##########
@@ -339,4 +354,31 @@ boolean wouldFit(
      * Get the nimbus configuration.
      */
     Map<String, Object> getConf();
+
+    /**
+     * Determine the list of racks on which topologyIds have been assigned. 
Note that the returned set
+     * may contain {@link DNSToSwitchMapping#DEFAULT_RACK} if {@link 
#getHostToRack()} is null or
+     * does not contain the assigned host.
+     *
+     * @param topologyIds for which assignments are examined.
+     * @return set of racks on which assignments have been made.
+     */
+    default Set<String> getAssignedRacks(String... topologyIds) {

Review comment:
       This seems only used in test cases. Do we want to move it into some test 
classes, instead of adding it in the interface? So the interface can be smaller 
and developers don't need to worry about this method

##########
File path: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
##########
@@ -0,0 +1,768 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nonnull;
+import org.apache.storm.Config;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RasNode;
+import org.apache.storm.scheduler.resource.RasNodes;
+import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesSummary;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NodeSorterHostProximity implements INodeSorter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(NodeSorterHostProximity.class);
+
+    // instance variables from class instantiation
+    protected final BaseResourceAwareStrategy.NodeSortType nodeSortType;
+
+    protected Cluster cluster;
+    protected TopologyDetails topologyDetails;
+
+    // Instance variables derived from Cluster.
+    private final Map<String, List<String>> networkTopography;
+    private final Map<String, String> superIdToRack = new HashMap<>();
+    private final Map<String, List<RasNode>> hostnameToNodes = new HashMap<>();
+    private final Map<String, String> nodeIdToHostname = new HashMap<>();
+    private final Map<String, Set<String>> rackIdToHosts;
+    protected List<String> greyListedSupervisorIds;
+
+    // Instance variables from Cluster and TopologyDetails.
+    protected List<String> favoredNodeIds;
+    protected List<String> unFavoredNodeIds;
+
+    // Updated in prepare method
+    ExecutorDetails exec;
+
+    public NodeSorterHostProximity(Cluster cluster, TopologyDetails 
topologyDetails) {
+        this(cluster, topologyDetails, 
BaseResourceAwareStrategy.NodeSortType.COMMON);
+    }
+
+    /**
+     * Initialize for the default implementation node sorting.
+     *
+     * <p>
+     *  <li>{@link BaseResourceAwareStrategy.NodeSortType#GENERIC_RAS} sorting 
implemented in
+     *  {@link #sortObjectResourcesGeneric(ObjectResourcesSummary, 
ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+     *  <li>{@link BaseResourceAwareStrategy.NodeSortType#DEFAULT_RAS} sorting 
implemented in
+     *  {@link #sortObjectResourcesDefault(ObjectResourcesSummary, 
NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+     *  <li>{@link BaseResourceAwareStrategy.NodeSortType#COMMON} sorting 
implemented in
+     *  {@link #sortObjectResourcesCommon(ObjectResourcesSummary, 
ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+     * </p>
+     *
+     * @param cluster for which nodes will be sorted.
+     * @param topologyDetails the topology to sort for.
+     * @param nodeSortType type of sorting to be applied to object resource 
collection {@link BaseResourceAwareStrategy.NodeSortType}.
+     */
+    public NodeSorterHostProximity(Cluster cluster, TopologyDetails 
topologyDetails, BaseResourceAwareStrategy.NodeSortType nodeSortType) {
+        this.cluster = cluster;
+        this.topologyDetails = topologyDetails;
+        this.nodeSortType = nodeSortType;
+
+        // from Cluster
+        networkTopography = cluster.getNetworkTopography();
+        greyListedSupervisorIds = cluster.getGreyListedSupervisors();
+        Map<String, String> hostToRack = cluster.getHostToRack();
+        RasNodes nodes = new RasNodes(cluster);
+        for (RasNode node: nodes.getNodes()) {
+            String superId = node.getId();
+            String hostName = node.getHostname();
+            if (hostName == null) {
+                // ignore supervisors on blacklistedHosts
+                continue;
+            }
+            String rackId = hostToRack.getOrDefault(hostName, 
DNSToSwitchMapping.DEFAULT_RACK);
+            superIdToRack.put(superId, rackId);
+            hostnameToNodes.computeIfAbsent(hostName, (hn) -> new 
ArrayList<>()).add(node);
+            nodeIdToHostname.put(superId, hostName);
+        }
+        rackIdToHosts = computeRackToHosts(cluster, superIdToRack);
+
+        // from TopologyDetails
+        Map<String, Object> topoConf = topologyDetails.getConf();
+
+        // From Cluster and TopologyDetails - and cleaned-up
+        favoredNodeIds = makeHostToNodeIds((List<String>) 
topoConf.get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
+        unFavoredNodeIds = makeHostToNodeIds((List<String>) 
topoConf.get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
+        favoredNodeIds.removeAll(greyListedSupervisorIds);
+        unFavoredNodeIds.removeAll(greyListedSupervisorIds);
+        unFavoredNodeIds.removeAll(favoredNodeIds);
+    }
+
+    @VisibleForTesting
+    public Map<String, Set<String>> getRackIdToHosts() {
+        return rackIdToHosts;
+    }
+
+    @Override
+    public void prepare(ExecutorDetails exec) {
+        this.exec = exec;
+    }
+
+    /**
+     * Get a key corresponding to this executor resource request. The key 
contains all non-zero resources requested by
+     * the executor.
+     *
+     * <p>This key can be used to retrieve pre-sorted list of 
racks/hosts/nodes. So executors with similar set of
+     * rare resource request will be reuse the same cached list instead of 
creating a new sorted list of nodes.</p>
+     *
+     * @param exec executor whose requested resources are being examined.
+     * @return a set of resource names that have values greater that zero.
+     */
+    public Set<String> getExecRequestKey(@Nonnull ExecutorDetails exec) {
+        NormalizedResourceRequest requestedResources = 
topologyDetails.getTotalResources(exec);
+        Set<String> retVal = new HashSet<>();
+        requestedResources.toNormalizedMap().entrySet().forEach(
+            e -> {
+                if (e.getValue() > 0.0) {
+                    retVal.add(e.getKey());
+                }
+            });
+        return retVal;
+    }
+
+    /**
+     * Scheduling uses {@link #sortAllNodes()} which eventually
+     * calls this method whose behavior can altered by setting {@link 
#nodeSortType}.
+     *
+     * @param resourcesSummary     contains all individual {@link 
ObjectResourcesItem} as well as cumulative stats
+     * @param exec                 executor for which the sorting is done
+     * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+     */
+    protected Iterable<ObjectResourcesItem> sortObjectResources(
+            ObjectResourcesSummary resourcesSummary, ExecutorDetails exec, 
ExistingScheduleFunc existingScheduleFunc) {
+        switch (nodeSortType) {
+            case DEFAULT_RAS:
+                return sortObjectResourcesDefault(resourcesSummary, 
existingScheduleFunc);
+            case GENERIC_RAS:
+                return sortObjectResourcesGeneric(resourcesSummary, exec, 
existingScheduleFunc);
+            case COMMON:
+                return sortObjectResourcesCommon(resourcesSummary, exec, 
existingScheduleFunc);
+            default:
+                return null;
+        }
+    }
+
+    /**
+     * Sort objects by the following three criteria.
+     *
+     * <li>
+     *     The number executors of the topology that needs to be scheduled is 
already on the object (node or rack)
+     *     in descending order. The reasoning to sort based on criterion 1 is 
so we schedule the rest of a topology on
+     *     the same object (node or rack) as the existing executors of the 
topology.
+     * </li>
+     *
+     * <li>
+     *     The subordinate/subservient resource availability percentage of a 
rack in descending order We calculate the
+     *     resource availability percentage by dividing the resource 
availability of the object (node or rack) by the
+     *     resource availability of the entire rack or cluster depending on if 
object references a node or a rack.
+     *     How this differs from the DefaultResourceAwareStrategy is that the 
percentage boosts the node or rack if it is
+     *     requested by the executor that the sorting is being done for and 
pulls it down if it is not.
+     *     By doing this calculation, objects (node or rack) that have 
exhausted or little of one of the resources mentioned
+     *     above will be ranked after racks that have more balanced resource 
availability and nodes or racks that have
+     *     resources that are not requested will be ranked below . So we will 
be less likely to pick a rack that
+     *     have a lot of one resource but a low amount of another and have a 
lot of resources that are not requested by the executor.
+     *     This is similar to logic used {@link 
#sortObjectResourcesGeneric(ObjectResourcesSummary, ExecutorDetails, 
ExistingScheduleFunc)}.
+     * </li>
+     *
+     * <li>
+     *     The tie between two nodes with same resource availability is broken 
by using the node with lower minimum
+     *     percentage used. This comparison was used in {@link 
#sortObjectResourcesDefault(ObjectResourcesSummary, ExistingScheduleFunc)}
+     *     but here it is made subservient to modified resource availbility 
used in
+     *     {@link #sortObjectResourcesGeneric(ObjectResourcesSummary, 
ExecutorDetails, ExistingScheduleFunc)}.
+     *
+     * </li>
+     *
+     * @param allResources         contains all individual ObjectResources as 
well as cumulative stats
+     * @param exec                 executor for which the sorting is done
+     * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+     */
+    private Iterable<ObjectResourcesItem> sortObjectResourcesCommon(
+            final ObjectResourcesSummary allResources, final ExecutorDetails 
exec,
+            final ExistingScheduleFunc existingScheduleFunc) {
+        // Copy and modify allResources
+        ObjectResourcesSummary affinityBasedAllResources = new 
ObjectResourcesSummary(allResources);
+        final NormalizedResourceOffer availableResourcesOverall = 
allResources.getAvailableResourcesOverall();
+        final NormalizedResourceRequest requestedResources = (exec != null) ? 
topologyDetails.getTotalResources(exec) : null;
+        affinityBasedAllResources.getObjectResources().forEach(
+            x -> {
+                if (requestedResources != null) {
+                    // negate unrequested resources
+                    
x.availableResources.updateForRareResourceAffinity(requestedResources);
+                }
+                x.minResourcePercent = 
availableResourcesOverall.calculateMinPercentageUsedBy(x.availableResources);
+                x.avgResourcePercent = 
availableResourcesOverall.calculateAveragePercentageUsedBy(x.availableResources);
+
+                LOG.trace("for {}: minResourcePercent={}, 
avgResourcePercent={}, numExistingSchedule={}",
+                        x.id, x.minResourcePercent, x.avgResourcePercent,
+                        existingScheduleFunc.getNumExistingSchedule(x.id));
+            }
+        );
+
+        // Use the following comparator to sort
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            if (o1.minResourcePercent > o2.minResourcePercent) {
+                return -1;
+            } else if (o1.minResourcePercent < o2.minResourcePercent) {
+                return 1;
+            }
+            double o1Avg = o1.avgResourcePercent;
+            double o2Avg = o2.avgResourcePercent;
+            if (o1Avg > o2Avg) {
+                return -1;
+            } else if (o1Avg < o2Avg) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
+        TreeSet<ObjectResourcesItem> sortedObjectResources = new 
TreeSet(comparator);
+        
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+        LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+        return sortedObjectResources;
+    }
+
+    /**
+     * Sort objects by the following two criteria.
+     *
+     * <li>the number executors of the topology that needs to be scheduled is 
already on the
+     * object (node or rack) in descending order. The reasoning to sort based 
on criterion 1 is so we schedule the rest
+     * of a topology on the same object (node or rack) as the existing 
executors of the topology.</li>
+     *
+     * <li>the subordinate/subservient resource availability percentage of a 
rack in descending order We calculate the
+     * resource availability percentage by dividing the resource availability 
of the object (node or rack) by the
+     * resource availability of the entire rack or cluster depending on if 
object references a node or a rack.
+     * How this differs from the DefaultResourceAwareStrategy is that the 
percentage boosts the node or rack if it is
+     * requested by the executor that the sorting is being done for and pulls 
it down if it is not.
+     * By doing this calculation, objects (node or rack) that have exhausted 
or little of one of the resources mentioned
+     * above will be ranked after racks that have more balanced resource 
availability and nodes or racks that have
+     * resources that are not requested will be ranked below . So we will be 
less likely to pick a rack that
+     * have a lot of one resource but a low amount of another and have a lot 
of resources that are not requested by the executor.</li>
+     *
+     * @param allResources         contains all individual ObjectResources as 
well as cumulative stats
+     * @param exec                 executor for which the sorting is done
+     * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+     */
+    @Deprecated
+    private Iterable<ObjectResourcesItem> sortObjectResourcesGeneric(
+            final ObjectResourcesSummary allResources, ExecutorDetails exec,
+            final ExistingScheduleFunc existingScheduleFunc) {
+        ObjectResourcesSummary affinityBasedAllResources = new 
ObjectResourcesSummary(allResources);
+        NormalizedResourceRequest requestedResources = 
topologyDetails.getTotalResources(exec);
+        affinityBasedAllResources.getObjectResources()
+                .forEach(x -> 
x.availableResources.updateForRareResourceAffinity(requestedResources));
+        final NormalizedResourceOffer availableResourcesOverall = 
allResources.getAvailableResourcesOverall();
+
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            double o1Avg = 
availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
+            double o2Avg = 
availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
+            if (o1Avg > o2Avg) {
+                return -1;
+            } else if (o1Avg < o2Avg) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
+        TreeSet<ObjectResourcesItem> sortedObjectResources = new 
TreeSet<>(comparator);
+        
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+        LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+        return sortedObjectResources;
+    }
+
+    /**
+     * Sort objects by the following two criteria.
+     *
+     * <li>the number executors of the topology that needs to be scheduled is 
already on the
+     * object (node or rack) in descending order. The reasoning to sort based 
on criterion 1 is so we schedule the rest
+     * of a topology on the same object (node or rack) as the existing 
executors of the topology.</li>
+     *
+     * <li>the subordinate/subservient resource availability percentage of a 
rack in descending order We calculate the
+     * resource availability percentage by dividing the resource availability 
of the object (node or rack) by the
+     * resource availability of the entire rack or cluster depending on if 
object references a node or a rack.
+     * By doing this calculation, objects (node or rack) that have exhausted 
or little of one of the resources mentioned
+     * above will be ranked after racks that have more balanced resource 
availability. So we will be less likely to pick
+     * a rack that have a lot of one resource but a low amount of another.</li>
+     *
+     * @param allResources         contains all individual ObjectResources as 
well as cumulative stats
+     * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+     */
+    @Deprecated
+    private Iterable<ObjectResourcesItem> sortObjectResourcesDefault(
+            final ObjectResourcesSummary allResources,
+            final ExistingScheduleFunc existingScheduleFunc) {
+
+        final NormalizedResourceOffer availableResourcesOverall = 
allResources.getAvailableResourcesOverall();
+        for (ObjectResourcesItem objectResources : 
allResources.getObjectResources()) {
+            objectResources.minResourcePercent =
+                    
availableResourcesOverall.calculateMinPercentageUsedBy(objectResources.availableResources);
+            objectResources.avgResourcePercent =
+                    
availableResourcesOverall.calculateAveragePercentageUsedBy(objectResources.availableResources);
+            LOG.trace("for {}: minResourcePercent={}, avgResourcePercent={}, 
numExistingSchedule={}",
+                    objectResources.id, objectResources.minResourcePercent, 
objectResources.avgResourcePercent,
+                    
existingScheduleFunc.getNumExistingSchedule(objectResources.id));
+        }
+
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            if (o1.minResourcePercent > o2.minResourcePercent) {
+                return -1;
+            } else if (o1.minResourcePercent < o2.minResourcePercent) {
+                return 1;
+            }
+            double diff = o1.avgResourcePercent - o2.avgResourcePercent;
+            if (diff > 0.0) {
+                return -1;
+            } else if (diff < 0.0) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
+        TreeSet<ObjectResourcesItem> sortedObjectResources = new 
TreeSet<>(comparator);
+        sortedObjectResources.addAll(allResources.getObjectResources());
+        LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+        return sortedObjectResources;
+    }
+
+    /**
+     * Nodes are sorted by two criteria.
+     *
+     * <p>1) the number executors of the topology that needs to be scheduled 
is already on the node in
+     * descending order. The reasoning to sort based on criterion 1 is so we 
schedule the rest of a topology on the same node as the
+     * existing executors of the topology.
+     *
+     * <p>2) the subordinate/subservient resource availability percentage of a 
node in descending
+     * order We calculate the resource availability percentage by dividing the 
resource availability that have exhausted or little of one of
+     * the resources mentioned above will be ranked after on the node by the 
resource availability of the entire rack By doing this
+     * calculation, nodes nodes that have more balanced resource availability. 
So we will be less likely to pick a node that have a lot of
+     * one resource but a low amount of another.
+     *
+     * @param availHosts a collection of all the hosts we want to sort
+     * @param rackId     the rack id availNodes are a part of
+     * @return an iterable of sorted hosts.
+     */
+    private Iterable<ObjectResourcesItem> sortHosts(
+            Collection<String> availHosts, ExecutorDetails exec, String rackId,
+            Map<String, AtomicInteger> scheduledCount) {
+        ObjectResourcesSummary rackResourcesSummary = new 
ObjectResourcesSummary("RACK");
+        availHosts.forEach(h -> {
+            ObjectResourcesItem hostItem = new ObjectResourcesItem(h);
+            for (RasNode x : hostnameToNodes.get(h)) {
+                hostItem.add(new ObjectResourcesItem(x.getId(), 
x.getTotalAvailableResources(), x.getTotalResources(), 0, 0));
+            }
+            rackResourcesSummary.addObjectResourcesItem(hostItem);
+        });
+
+        LOG.debug(
+                "Rack {}: Overall Avail [ {} ] Total [ {} ]",
+                rackId,
+                rackResourcesSummary.getAvailableResourcesOverall(),
+                rackResourcesSummary.getTotalResourcesOverall());
+
+        return sortObjectResources(
+            rackResourcesSummary,
+            exec,
+            (hostId) -> {
+                AtomicInteger count = scheduledCount.get(hostId);
+                if (count == null) {
+                    return 0;
+                }
+                return count.get();
+            });
+    }
+
+    /**
+     * Nodes are sorted by two criteria.
+     *
+     * <p>1) the number executors of the topology that needs to be scheduled 
is already on the node in
+     * descending order. The reasoning to sort based on criterion 1 is so we 
schedule the rest of a topology on the same node as the
+     * existing executors of the topology.
+     *
+     * <p>2) the subordinate/subservient resource availability percentage of a 
node in descending
+     * order We calculate the resource availability percentage by dividing the 
resource availability that have exhausted or little of one of
+     * the resources mentioned above will be ranked after on the node by the 
resource availability of the entire rack By doing this
+     * calculation, nodes nodes that have more balanced resource availability. 
So we will be less likely to pick a node that have a lot of
+     * one resource but a low amount of another.
+     *
+     * @param availRasNodes a list of all the nodes we want to sort
+     * @param hostId     the host-id that availNodes are a part of
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem} for 
nodes.
+     */
+    private Iterable<ObjectResourcesItem> sortNodes(
+            List<RasNode> availRasNodes, ExecutorDetails exec, String hostId,
+            Map<String, AtomicInteger> scheduledCount) {

Review comment:
       Can you please help me understand why AtomicInteger is required? My 
understanding is that scheduling is done sequentially in one thread. 

##########
File path: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
##########
@@ -189,7 +207,7 @@ protected void prepareForScheduling(Cluster cluster, 
TopologyDetails topologyDet
         LOG.debug("The max state search that will be used by topology {} is 
{}", topologyDetails.getId(), maxStateSearch);
 
         searcherState = createSearcherState();
-        setNodeSorter(new NodeSorter(cluster, topologyDetails, nodeSortType));
+        setNodeSorter(new NodeSorterHostProximity(cluster, topologyDetails));

Review comment:
       Do we not care about `nodeSortType` in this new 
`NodeSorterHostProximity` class?

##########
File path: 
storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
##########
@@ -282,6 +285,18 @@ boolean wouldFit(
      */
     Map<String, List<String>> getNetworkTopography();
 
+    /**
+     * Get host -> rack map - the inverse of networkTopography.
+     */
+    default Map<String, String> getHostToRack() {

Review comment:
       Since this is calculated based on getNetworkTopography,  do we want to 
cache it in the implementation class like `networkTopography` instead of 
recomputing it every time?

##########
File path: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
##########
@@ -0,0 +1,768 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nonnull;
+import org.apache.storm.Config;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RasNode;
+import org.apache.storm.scheduler.resource.RasNodes;
+import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesSummary;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NodeSorterHostProximity implements INodeSorter {

Review comment:
       This class looks very similar to NodeSorter. Why do we want to keep both 
of them? And if we need to keep both of them, does it make sense to have 
NodeSorterHostProximity extend NodeSorter?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to