Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r154194951
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
 ---
    @@ -0,0 +1,623 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Set;
    +import java.util.Stack;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.RAS_Node;
    +import org.apache.storm.scheduler.resource.RAS_Nodes;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(ConstraintSolverStrategy.class);
    +
    +    protected static class SolverResult {
    +        private final int statesSearched;
    +        private final boolean success;
    +        private final long timeTakenMillis;
    +        private final int backtracked;
    +
    +        public SolverResult(SearcherState state, boolean success) {
    +            this.statesSearched = state.getStatesSearched();
    +            this.success = success;
    +            timeTakenMillis = Time.currentTimeMillis() - 
state.startTimeMillis;
    +            backtracked = state.numBacktrack;
    +        }
    +
    +        public SchedulingResult asSchedulingResult() {
    +            if (success) {
    +                return SchedulingResult.success("Fully Scheduled by 
ConstraintSolverStrategy (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, 
backtracked " + backtracked + " times)");
    +            }
    +            return 
SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
    +                "Cannot find scheduling that satisfies all constraints (" 
+ statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, 
backtracked " + backtracked + " times)");
    +        }
    +    }
    +
    +    protected static class SearcherState {
    +        // Metrics
    +        // How many states searched so far.
    +        private int statesSearched = 0;
    +        // Number of times we had to backtrack.
    +        private int numBacktrack = 0;
    +        final long startTimeMillis;
    +        private final long maxEndTimeMs;
    +
    +        // Current state
    +        // The current executor we are trying to schedule
    +        private int execIndex = 0;
    +        // A map of the worker to the components in the worker to be able 
to enforce constraints.
    +        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
    +        private final boolean[] okToRemoveFromWorker;
    +        // for the currently tested assignment a Map of the node to the 
components on it to be able to enforce constraints
    +        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
    +        private final boolean[] okToRemoveFromNode;
    +
    +        // Static State
    +        // The list of all executors (preferably sorted to make 
assignments simpler).
    +        private final List<ExecutorDetails> execs;
    +        //The maximum number of state to search before stopping.
    +        private final int maxStatesSearched;
    +        //The topology we are scheduling
    +        private final TopologyDetails td;
    +
    +        private SearcherState(Map<WorkerSlot, Set<String>> 
workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
    +                              int maxStatesSearched, long maxTimeMs, 
List<ExecutorDetails> execs, TopologyDetails td) {
    +            assert !execs.isEmpty();
    +            assert execs != null;
    +
    +            this.workerCompAssignment = workerCompAssignment;
    +            this.nodeCompAssignment = nodeCompAssignment;
    +            this.maxStatesSearched = maxStatesSearched;
    +            this.execs = execs;
    +            okToRemoveFromWorker = new boolean[execs.size()];
    +            okToRemoveFromNode = new boolean[execs.size()];
    +            this.td = td;
    +            startTimeMillis = Time.currentTimeMillis();
    +            if (maxTimeMs <= 0) {
    +                maxEndTimeMs = Long.MAX_VALUE;
    +            } else {
    +                maxEndTimeMs = startTimeMillis + maxTimeMs;
    +            }
    +        }
    +
    +        public void incStatesSearched() {
    +            statesSearched++;
    +            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
    +                LOG.debug("States Searched: {}", statesSearched);
    +                LOG.debug("backtrack: {}", numBacktrack);
    +            }
    +        }
    +
    +        public int getStatesSearched() {
    +            return statesSearched;
    +        }
    +
    +        public boolean areSearchLimitsExceeded() {
    +            return statesSearched > maxStatesSearched || 
Time.currentTimeMillis() > maxEndTimeMs;
    +        }
    +
    +        public SearcherState nextExecutor() {
    +            execIndex++;
    +            if (execIndex >= execs.size()) {
    +                throw new IllegalStateException("Exceeded the exec limit " 
+ execIndex + " >= " + execs.size());
    +            }
    +            return this;
    +        }
    +
    +        public boolean areAllExecsScheduled() {
    +            return execIndex == execs.size() - 1;
    +        }
    +
    +        public ExecutorDetails currentExec() {
    +            return execs.get(execIndex);
    +        }
    +
    +        public void tryToSchedule(String comp, RAS_Node node, WorkerSlot 
workerSlot) {
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Trying assignment of {} {} to {}", exec, comp, 
workerSlot);
    +            //It is possible that this component is already scheduled on 
this node or worker.  If so when we backtrack we cannot remove it
    +            okToRemoveFromWorker[execIndex] = 
workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new 
HashSet<>()).add(comp);
    +            okToRemoveFromNode[execIndex] = 
nodeCompAssignment.get(node).add(comp);
    +            node.assignSingleExecutor(workerSlot, exec, td);
    +        }
    +
    +        public void backtrack(String comp, RAS_Node node, WorkerSlot 
workerSlot) {
    +            execIndex--;
    +            if (execIndex < 0) {
    +                throw new IllegalStateException("Internal Error exec index 
became negative");
    +            }
    +            numBacktrack++;
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Backtracking {} {} from {}", exec, comp, 
workerSlot);
    +            if (okToRemoveFromWorker[execIndex]) {
    +                workerCompAssignment.get(workerSlot).remove(comp);
    +            }
    +            if (okToRemoveFromNode[execIndex]) {
    +                nodeCompAssignment.get(node).remove(comp);
    +            }
    +            node.freeSingleExecutor(exec, td);
    +        }
    +    }
    +
    +    private Map<String, RAS_Node> nodes;
    +    private Map<ExecutorDetails, String> execToComp;
    +    private Map<String, Set<ExecutorDetails>> compToExecs;
    +    private List<String> favoredNodes;
    +    private List<String> unFavoredNodes;
    +
    +    //constraints and spreads
    +    private Map<String, Map<String, Integer>> constraintMatrix;
    +    private HashSet<String> spreadComps = new HashSet<>();
    +
    +    //hard coded max number of states to search
    +    public static final int MAX_STATE_SEARCH = 100_000;
    +
    +    @Override
    +    public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
    +        prepare(cluster);
    +        LOG.debug("Scheduling {}", td.getId());
    +        nodes = RAS_Nodes.getAllNodesFrom(cluster);
    +        Map<WorkerSlot, Set<String>> workerCompAssignment = new 
HashMap<>();
    +        Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>();
    +        //set max number of states to search
    +        final int maxStateSearch = Math.min(MAX_STATE_SEARCH,
    +            
ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_DEPTH_TRAVERSAL)));
    +
    +        final long maxTimeMs =
    +            
ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_TIME_SECS),
 -1).intValue() * 1000L;
    +
    +        favoredNodes = (List<String>) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
    +        unFavoredNodes = (List<String>) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
    +
    +        //get mapping of execs to components
    +        execToComp = td.getExecutorToComponent();
    +        //get mapping of components to executors
    +        compToExecs = getCompToExecs(execToComp);
    +
    +        //get topology constraints
    +        constraintMatrix = getConstraintMap(td, compToExecs.keySet());
    +
    +        //get spread components
    +        spreadComps = getSpreadComps(td);
    +
    +        ArrayList<ExecutorDetails> sortedExecs = new ArrayList<>();
    +        //get a sorted list of unassigned executors based on number of 
constraints
    +        Set<ExecutorDetails> unassignedExecutors = new 
HashSet<>(cluster.getUnassignedExecutors(td));
    +        for (ExecutorDetails exec1 : getSortedExecs(spreadComps, 
constraintMatrix, compToExecs)) {
    +            if (unassignedExecutors.contains(exec1)) {
    +                sortedExecs.add(exec1);
    +            }
    +        }
    +
    +        //initialize structures
    +        for (RAS_Node node : nodes.values()) {
    +            nodeCompAssignment.put(node, new HashSet<>());
    +        }
    +        //populate with existing assignments
    +        SchedulerAssignment existingAssignment = 
cluster.getAssignmentById(td.getId());
    +        if (existingAssignment != null) {
    +            for (Map.Entry<ExecutorDetails, WorkerSlot> entry1 : 
existingAssignment.getExecutorToSlot().entrySet()) {
    +                ExecutorDetails exec1 = entry1.getKey();
    +                String compId = execToComp.get(exec1);
    +                WorkerSlot ws = entry1.getValue();
    +                RAS_Node node = nodes.get(ws.getNodeId());
    +                //populate node to component Assignments
    +                nodeCompAssignment.get(node).add(compId);
    +                //populate worker to comp assignments
    +                workerCompAssignment.computeIfAbsent(ws, (k) -> new 
HashSet<>()).add(compId);
    +            }
    +        }
    +
    +        //early detection/early fail
    +        if (!checkSchedulingFeasibility()) {
    +            //Scheduling Status set to FAIL_OTHER so no eviction policy 
will be attempted to make space for this topology
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, 
"Scheduling not feasible!");
    +        }
    +        return backtrackSearch(new SearcherState(workerCompAssignment, 
nodeCompAssignment, maxStateSearch, maxTimeMs, sortedExecs, td))
    +            .asSchedulingResult();
    +    }
    +
    +    private boolean checkSchedulingFeasibility() {
    +        for (String comp : spreadComps) {
    +            int numExecs = compToExecs.get(comp).size();
    +            if (numExecs > nodes.size()) {
    +                LOG.error("Unsatisfiable constraint: Component: {} marked 
as spread has {} executors which is larger "
    +                    + "than number of nodes: {}", comp, numExecs, 
nodes.size());
    +                return false;
    +            }
    +        }
    +        if (execToComp.size() >= MAX_STATE_SEARCH) {
    +            LOG.error("Number of executors is greater than the maximum 
number of states allowed to be searched.  "
    +                + "# of executors: {} Max states to search: {}", 
execToComp.size(), MAX_STATE_SEARCH);
    +            return false;
    +        }
    +        return true;
    +    }
    +    
    +    @Override
    +    protected TreeSet<ObjectResources> sortObjectResources(
    +        final AllResources allResources, ExecutorDetails exec, 
TopologyDetails topologyDetails,
    +        final ExistingScheduleFunc existingScheduleFunc) {
    +        return 
GenericResourceAwareStrategy.sortObjectResourcesImpl(allResources, exec, 
topologyDetails, existingScheduleFunc);
    +    }
    +
    +    // Backtracking algorithm does not take into account the ordering of 
executors in worker to reduce traversal space
    +    @VisibleForTesting
    +    protected SolverResult backtrackSearch(SearcherState state) {
    +        state.incStatesSearched();
    +        if (state.areSearchLimitsExceeded()) {
    +            LOG.warn("Limits Exceeded");
    +            return new SolverResult(state, false);
    +        }
    +
    +        ExecutorDetails exec = state.currentExec();
    +        List<ObjectResources> sortedNodes = sortAllNodes(state.td, exec, 
favoredNodes, unFavoredNodes);
    +
    +        for (ObjectResources nodeResources: sortedNodes) {
    +            RAS_Node node = nodes.get(nodeResources.id);
    +            for (WorkerSlot workerSlot : 
node.getSlotsAvailbleTo(state.td)) {
    +                if (isExecAssignmentToWorkerValid(workerSlot, state)) {
    +                    String comp = execToComp.get(exec);
    +
    +                    state.tryToSchedule(comp, node, workerSlot);
    +
    +                    if (state.areAllExecsScheduled()) {
    +                        //Everything is scheduled correctly, so no need to 
search any more.
    +                        return new SolverResult(state, true);
    +                    }
    +
    +                    SolverResult results = 
backtrackSearch(state.nextExecutor());
    +                    if (results.success) {
    +                        //We found a good result we are done.
    +                        return results;
    +                    }
    +
    +                    if (state.areSearchLimitsExceeded()) {
    +                        //No need to search more it is not going to help.
    +                        return new SolverResult(state, false);
    +                    }
    +
    +                    //backtracking (If we ever get here there really isn't 
a lot of hope that we will find a scheduling)
    +                    state.backtrack(comp, node, workerSlot);
    +                }
    +            }
    +        }
    +        //Tried all of the slots and none of them worked.
    +        return new SolverResult(state, false);
    +    }
    +
    +    /**
    +     * Check if any constraints are violated if exec is scheduled on 
worker.
    +     * @return true if scheduling exec on worker does not violate any 
constraints, returns false if it does
    +     */
    +    public boolean isExecAssignmentToWorkerValid(WorkerSlot worker, 
SearcherState state) {
    +        final ExecutorDetails exec = state.currentExec();
    +        //check resources
    +        RAS_Node node = nodes.get(worker.getNodeId());
    +        if (!node.wouldFit(worker, exec, state.td)) {
    +            LOG.trace("{} would not fit in resources available on {}", 
exec, worker);
    +            return false;
    +        }
    +
    +        //check if exec can be on worker based on user defined component 
exclusions
    +        String execComp = execToComp.get(exec);
    +        Set<String> components = state.workerCompAssignment.get(worker);
    +        if (components != null) {
    +            for (String comp : components) {
    +                if (constraintMatrix.get(execComp).get(comp) != 0) {
    +                    LOG.trace("{} found {} constraint violation {} on {}", 
exec, execComp, comp, worker);
    +                    return false;
    +                }
    +            }
    +        }
    +
    +        //check if exec satisfy spread
    +        if (spreadComps.contains(execComp)) {
    +            if (state.nodeCompAssignment.get(node).contains(execComp)) {
    +                LOG.trace("{} Found spread violation {} on node {}", exec, 
execComp, node.getId());
    +                return false;
    +            }
    +        }
    +        return true;
    +    }
    +
    +    static Map<String, Map<String, Integer>> 
getConstraintMap(TopologyDetails topo, Set<String> comps) {
    +        Map<String, Map<String, Integer>> matrix = new HashMap<>();
    +        for (String comp : comps) {
    +            matrix.put(comp, new HashMap<>());
    +            for (String comp2 : comps) {
    +                matrix.get(comp).put(comp2, 0);
    +            }
    +        }
    +        List<List<String>> constraints = (List<List<String>>) 
topo.getConf().get(Config.TOPOLOGY_CONSTRAINTS);
    +        if (constraints != null) {
    +            for (List<String> constraintPair : constraints) {
    +                String comp1 = constraintPair.get(0);
    +                String comp2 = constraintPair.get(1);
    +                if (!matrix.containsKey(comp1)) {
    +                    LOG.warn("Comp: {} declared in constraints is not 
valid!", comp1);
    +                    continue;
    +                }
    +                if (!matrix.containsKey(comp2)) {
    +                    LOG.warn("Comp: {} declared in constraints is not 
valid!", comp2);
    +                    continue;
    +                }
    +                matrix.get(comp1).put(comp2, 1);
    +                matrix.get(comp2).put(comp1, 1);
    +            }
    +        }
    +        return matrix;
    +    }
    +
    +    /**
    +     * Determines if a scheduling is valid and all constraints are 
satisfied.
    +     */
    +    @VisibleForTesting
    +    public static boolean validateSolution(Cluster cluster, 
TopologyDetails td) {
    +        return checkSpreadSchedulingValid(cluster, td)
    +            && checkConstraintsSatisfied(cluster, td)
    +            && checkResourcesCorrect(cluster, td);
    +    }
    +
    +    /**
    +     * Check if constraints are satisfied.
    +     */
    +    private static boolean checkConstraintsSatisfied(Cluster cluster, 
TopologyDetails topo) {
    +        LOG.info("Checking constraints...");
    +        Map<ExecutorDetails, WorkerSlot> result = 
cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
    +        Map<ExecutorDetails, String> execToComp = 
topo.getExecutorToComponent();
    +        //get topology constraints
    +        Map<String, Map<String, Integer>> constraintMatrix = 
getConstraintMap(topo, new HashSet<>(topo.getExecutorToComponent().values()));
    +
    +        Map<WorkerSlot, List<String>> workerCompMap = new HashMap<>();
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
result.entrySet()) {
    +            WorkerSlot worker = entry.getValue();
    +            ExecutorDetails exec = entry.getKey();
    +            String comp = execToComp.get(exec);
    +            if (!workerCompMap.containsKey(worker)) {
    +                workerCompMap.put(worker, new LinkedList<>());
    +            }
    +            workerCompMap.get(worker).add(comp);
    +        }
    +        for (Map.Entry<WorkerSlot, List<String>> entry : 
workerCompMap.entrySet()) {
    +            List<String> comps = entry.getValue();
    +            for (int i = 0; i < comps.size(); i++) {
    +                for (int j = 0; j < comps.size(); j++) {
    +                    if (i != j && 
constraintMatrix.get(comps.get(i)).get(comps.get(j)) == 1) {
    +                        LOG.error("Incorrect Scheduling: worker exclusion 
for Component {} and {} not satisfied on WorkerSlot: {}",
    +                            comps.get(i), comps.get(j), entry.getKey());
    +                        return false;
    +                    }
    +                }
    +            }
    +        }
    +        return true;
    +    }
    +
    +    private static Map<WorkerSlot, RAS_Node> workerToNodes(Cluster 
cluster) {
    +        Map<WorkerSlot, RAS_Node> workerToNodes = new HashMap<>();
    +        for (RAS_Node node: RAS_Nodes.getAllNodesFrom(cluster).values()) {
    +            for (WorkerSlot s : node.getUsedSlots()) {
    +                workerToNodes.put(s, node);
    +            }
    +        }
    +        return workerToNodes;
    +    }
    +
    +    private static boolean checkSpreadSchedulingValid(Cluster cluster, 
TopologyDetails topo) {
    +        LOG.info("Checking for a valid scheduling...");
    +        Map<ExecutorDetails, WorkerSlot> result = 
cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
    +        Map<ExecutorDetails, String> execToComp = 
topo.getExecutorToComponent();
    +        Map<WorkerSlot, HashSet<ExecutorDetails>> workerExecMap = new 
HashMap<>();
    +        Map<WorkerSlot, HashSet<String>> workerCompMap = new HashMap<>();
    +        Map<RAS_Node, HashSet<String>> nodeCompMap = new HashMap<>();
    +        Map<WorkerSlot, RAS_Node> workerToNodes = workerToNodes(cluster);
    +        boolean ret = true;
    +
    +        HashSet<String> spreadComps = getSpreadComps(topo);
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
result.entrySet()) {
    +            ExecutorDetails exec = entry.getKey();
    +            WorkerSlot worker = entry.getValue();
    +            RAS_Node node = workerToNodes.get(worker);
    +
    +            if (!workerExecMap.containsKey(worker)) {
    +                workerExecMap.put(worker, new HashSet<>());
    +                workerCompMap.put(worker, new HashSet<>());
    +            }
    +
    +            if (!nodeCompMap.containsKey(node)) {
    +                nodeCompMap.put(node, new HashSet<>());
    +            }
    +            if (workerExecMap.get(worker).contains(exec)) {
    +                LOG.error("Incorrect Scheduling: Found duplicate in 
scheduling");
    +                return false;
    +            }
    +            workerExecMap.get(worker).add(exec);
    +            String comp = execToComp.get(exec);
    +            workerCompMap.get(worker).add(comp);
    +            if (spreadComps.contains(comp)) {
    +                if (nodeCompMap.get(node).contains(comp)) {
    +                    LOG.error("Incorrect Scheduling: Spread for Component: 
{} {} on node {} not satisfied {}",
    +                        comp, exec, node.getId(), nodeCompMap.get(node));
    +                    ret = false;
    +                }
    +            }
    +            nodeCompMap.get(node).add(comp);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Check if resource constraints satisfied.
    +     */
    +    private static boolean checkResourcesCorrect(Cluster cluster, 
TopologyDetails topo) {
    +        LOG.info("Checking Resources...");
    +        Map<ExecutorDetails, WorkerSlot> result = 
cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
    +        Map<RAS_Node, Collection<ExecutorDetails>> nodeToExecs = new 
HashMap<>();
    +        Map<ExecutorDetails, WorkerSlot> mergedExecToWorker = new 
HashMap<>();
    +        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster);
    +        //merge with existing assignments
    +        if (cluster.getAssignmentById(topo.getId()) != null
    +                && 
cluster.getAssignmentById(topo.getId()).getExecutorToSlot() != null) {
    +            
mergedExecToWorker.putAll(cluster.getAssignmentById(topo.getId()).getExecutorToSlot());
    +        }
    +        mergedExecToWorker.putAll(result);
    +
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
mergedExecToWorker.entrySet()) {
    +            ExecutorDetails exec = entry.getKey();
    +            WorkerSlot worker = entry.getValue();
    +            RAS_Node node = nodes.get(worker.getNodeId());
    +
    +            if (node.getAvailableMemoryResources() < 0.0 && 
node.getAvailableCpuResources() < 0.0) {
    +                LOG.error("Incorrect Scheduling: found node with negative 
available resources");
    +                return false;
    +            }
    +            if (!nodeToExecs.containsKey(node)) {
    +                nodeToExecs.put(node, new LinkedList<>());
    +            }
    +            nodeToExecs.get(node).add(exec);
    +        }
    +
    +        for (Map.Entry<RAS_Node, Collection<ExecutorDetails>> entry : 
nodeToExecs.entrySet()) {
    +            RAS_Node node = entry.getKey();
    +            Collection<ExecutorDetails> execs = entry.getValue();
    +            double cpuUsed = 0.0;
    +            double memoryUsed = 0.0;
    +            for (ExecutorDetails exec : execs) {
    +                cpuUsed += topo.getTotalCpuReqTask(exec);
    +                memoryUsed += topo.getTotalMemReqTask(exec);
    +            }
    +            if (node.getAvailableCpuResources() != 
(node.getTotalCpuResources() - cpuUsed)) {
    +                LOG.error("Incorrect Scheduling: node {} has consumed 
incorrect amount of cpu. Expected: {}"
    +                        + " Actual: {} Executors scheduled on node: {}",
    +                        node.getId(), (node.getTotalCpuResources() - 
cpuUsed), node.getAvailableCpuResources(), execs);
    +                return false;
    +            }
    +            if (node.getAvailableMemoryResources() != 
(node.getTotalMemoryResources() - memoryUsed)) {
    +                LOG.error("Incorrect Scheduling: node {} has consumed 
incorrect amount of memory. Expected: {}"
    +                        + " Actual: {} Executors scheduled on node: {}",
    +                        node.getId(), (node.getTotalMemoryResources() - 
memoryUsed), node.getAvailableMemoryResources(), execs);
    +                return false;
    +            }
    +        }
    +        return true;
    +    }
    +
    +    private Map<String, Set<ExecutorDetails>> 
getCompToExecs(Map<ExecutorDetails, String> executorToComp) {
    +        Map<String, Set<ExecutorDetails>> retMap = new HashMap<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : 
executorToComp.entrySet()) {
    +            ExecutorDetails exec = entry.getKey();
    +            String comp = entry.getValue();
    +            if (!retMap.containsKey(comp)) {
    +                retMap.put(comp, new HashSet<>());
    +            }
    +            retMap.get(comp).add(exec);
    --- End diff --
    
    Should we make this more java 8 like?


---

Reply via email to