Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1359#discussion_r70393611
  
    --- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/IsolationScheduler.java ---
    @@ -0,0 +1,417 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.scheduler;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import org.apache.commons.lang.Validate;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.IScheduler;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.Topologies;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +
    +// for each isolated topology:
    +//   compute even distribution of executors -> workers on the number of 
workers specified for the topology
    +//   compute distribution of workers to machines
    +// determine host -> list of [slot, topology id, executors]
    +// iterate through hosts and: a machine is good if:
    +//   1. only running workers from one isolated topology
    +//   2. all workers running on it match one of the distributions of 
executors for that topology
    +//   3. matches one of the # of workers
    +// blacklist the good hosts and remove those workers from the list of need 
to be assigned workers
    +// otherwise unassign all other workers for isolated topologies if assigned
    +public class IsolationScheduler implements IScheduler {
    +    private final static Logger LOG = 
LoggerFactory.getLogger(IsolationScheduler.class);
    +
    +    private Map<String, Number> isoMachines;
    +
    +    @Override
    +    public void prepare(Map conf) {
    +        this.isoMachines = (Map<String, Number>) 
conf.get(Config.ISOLATION_SCHEDULER_MACHINES);
    +        Validate.notEmpty(isoMachines);
    +    }
    +
    +    // get host -> all assignable worker slots for non-blacklisted 
machines (assigned or not assigned)
    +    // will then have a list of machines that need to be assigned (machine 
-> [topology, list of list of executors])
    +    // match each spec to a machine (who has the right number of workers), 
free everything else on that machine and assign those slots (do one topology at 
a time)
    +    // blacklist all machines who had production slots defined
    +    // log isolated topologies who weren't able to get enough slots / 
machines
    +    // run default scheduler on isolated topologies that didn't have 
enough slots + non-isolated topologies on remaining machines
    +    // set blacklist to what it was initially
    +    @Override
    +    public void schedule(Topologies topologies, Cluster cluster) {
    +        Set<String> origBlacklist = cluster.getBlacklistedHosts();
    +        List<TopologyDetails> isoTopologies = 
isolatedTopologies(topologies.getTopologies());
    +        Set<String> isoIds = isolatedTopoplogyIds(isoTopologies);
    +        Map<String, Set<Set<ExecutorDetails>>> topologyWorkerSpecs = 
topologyWorkerSpecs(isoTopologies);
    +        Map<String, Map<Integer, Integer>> topologyMachineDistributions = 
topologyMachineDistributions(isoTopologies);
    +        Map<String, List<AssignmentInfo>> hostAssignments = 
hostAssignments(cluster);
    +
    +        for (Map.Entry<String, List<AssignmentInfo>> entry : 
hostAssignments.entrySet()) {
    +            List<AssignmentInfo> assignments = entry.getValue();
    +            String topologyId = assignments.get(0).getTopologyId();
    +            Map<Integer, Integer> distribution = 
topologyMachineDistributions.get(topologyId);
    +            Set<Set<ExecutorDetails>> workerSpecs = 
topologyWorkerSpecs.get(topologyId);
    +            int numWorkers = assignments.size();
    +
    +            if (isoIds.contains(topologyId)
    +                    && checkAssignmentTopology(assignments, topologyId)
    +                    && distribution.containsKey(numWorkers)
    +                    && checkAssignmentWorkerSpecs(assignments, 
workerSpecs)) {
    +                decrementDistribution(distribution, numWorkers);
    +                for (AssignmentInfo ass : assignments) {
    +                    workerSpecs.remove(ass.getExecutors());
    +                }
    +                cluster.blacklistHost(entry.getKey());
    +            } else {
    +                for (AssignmentInfo ass : assignments) {
    +                    if (isoIds.contains(ass.getTopologyId())) {
    +                        cluster.freeSlot(ass.getWorkerSlot());
    +                    }
    +                }
    +            }
    +        }
    +
    +        Map<String, Set<WorkerSlot>> hostUsedSlots = 
hostUsedSlots(cluster);
    +        LinkedList<HostAssignableSlots> hss = hostAssignableSlots(cluster);
    +        List<String> failedTopologyIds = new ArrayList<String>();
    +        for (Map.Entry<String, Set<Set<ExecutorDetails>>> entry : 
topologyWorkerSpecs.entrySet()) {
    +            String topologyId = entry.getKey();
    +            Set<Set<ExecutorDetails>> executorSet = entry.getValue();
    +            if (executorSet != null && executorSet.size() > 0) {
    +                failedTopologyIds.add(topologyId);
    +            }
    +            List<Integer> workerNum = 
distributionSortedAmts(topologyMachineDistributions.get(topologyId));
    +            for (Integer num : workerNum) {
    +                HostAssignableSlots hostSlots = hss.peek();
    +                List<WorkerSlot> slot = hostSlots != null ? 
hostSlots.getWorkerSlots() : null;
    +
    +                if (slot != null && slot.size() >= num.intValue()) {
    +                    hss.poll();
    +                    
cluster.freeSlots(hostUsedSlots.get(hostSlots.getHostName()));
    +                    for (WorkerSlot tmpSlot : slot.subList(0, num)) {
    +                        Set<ExecutorDetails> executor = 
removeElemFromSet(executorSet);
    +                        cluster.assign(tmpSlot, topologyId, executor);
    +                    }
    +                    cluster.blacklistHost(hostSlots.getHostName());
    +                }
    +            }
    +        }
    +
    +        if (failedTopologyIds.size() > 0) {
    +            LOG.warn("Unable to isolate topologies " + failedTopologyIds
    +                    + ". No machine had enough worker slots to run the 
remaining workers for these topologies. "
    +                    + "Clearing all other resources and will wait for 
enough resources for "
    +                    + "isolated topologies before allocating any other 
resources.");
    +            // clear workers off all hosts that are not blacklisted
    +            Map<String, Set<WorkerSlot>> usedSlots = 
hostUsedSlots(cluster);
    +            Set<Map.Entry<String, Set<WorkerSlot>>> entries = 
usedSlots.entrySet();
    +            for (Map.Entry<String, Set<WorkerSlot>> entry : entries) {
    +                if (!cluster.isBlacklistedHost(entry.getKey())) {
    +                    cluster.freeSlots(entry.getValue());
    +                }
    +            }
    +        } else {
    +            // run default scheduler on non-isolated topologies
    +            Set<String> allocatedTopologies = 
allocatedTopologies(topologyWorkerSpecs);
    +            Topologies leftOverTopologies = leftoverTopologies(topologies, 
allocatedTopologies);
    +            DefaultScheduler.defaultSchedule(leftOverTopologies, cluster);
    +        }
    +        cluster.setBlacklistedHosts(origBlacklist);
    +    }
    +
    +    public Set<ExecutorDetails> 
removeElemFromSet(Set<Set<ExecutorDetails>> executorsSets) {
    +        Set<ExecutorDetails> elem = executorsSets.iterator().next();
    +        executorsSets.remove(elem);
    +        return elem;
    +    }
    +
    +    private List<TopologyDetails> 
isolatedTopologies(Collection<TopologyDetails> topologies) {
    +        Set<String> topologyNames = isoMachines.keySet();
    +        List<TopologyDetails> isoTopologies = new 
ArrayList<TopologyDetails>();
    +        for (TopologyDetails topo : topologies) {
    +            if (topologyNames.contains(topo.getName())) {
    +                isoTopologies.add(topo);
    +            }
    +        }
    +        return isoTopologies;
    +    }
    +
    +    private Set<String> isolatedTopoplogyIds(List<TopologyDetails> 
topologies) {
    +        Set<String> ids = new HashSet<String>();
    +        if (topologies != null && topologies.size() > 0) {
    +            for (TopologyDetails topology : topologies) {
    +                ids.add(topology.getId());
    +            }
    +        }
    +        return ids;
    +    }
    +
    +    // map from topology id -> set of sets of executors
    +    private Map<String, Set<Set<ExecutorDetails>>> 
topologyWorkerSpecs(List<TopologyDetails> topologies) {
    +        Map<String, Set<Set<ExecutorDetails>>> workerSpecs = new 
HashMap<String, Set<Set<ExecutorDetails>>>();
    +        for (TopologyDetails topology : topologies) {
    +            workerSpecs.put(topology.getId(), 
computeWorkerSpecs(topology));
    +        }
    +        return workerSpecs;
    +    }
    +
    +    private Map<String, List<AssignmentInfo>> hostAssignments(Cluster 
cluster) {
    +        Collection<SchedulerAssignment> assignmentValues =  
cluster.getAssignments().values();
    +        Map<String, List<AssignmentInfo>> hostAssignments = new 
HashMap<String, List<AssignmentInfo>>();
    +
    +        for (SchedulerAssignment sa : assignmentValues) {
    +            Map<WorkerSlot, List<ExecutorDetails>> slotExecutors = 
Utils.reverseMap(sa.getExecutorToSlot());
    +            Set<Map.Entry<WorkerSlot, List<ExecutorDetails>>> entries = 
slotExecutors.entrySet();
    +            for (Map.Entry<WorkerSlot, List<ExecutorDetails>> entry : 
entries) {
    +                WorkerSlot slot = entry.getKey();
    +                List<ExecutorDetails> executors = entry.getValue();
    +
    +                String host = cluster.getHost(slot.getNodeId());
    +                AssignmentInfo ass = new AssignmentInfo(slot, 
sa.getTopologyId(), new HashSet<ExecutorDetails>(executors));
    +                List<AssignmentInfo> executorList = 
hostAssignments.get(host);
    +                if (executorList == null) {
    +                    executorList = new ArrayList<AssignmentInfo>();
    +                    hostAssignments.put(host, executorList);
    +                }
    +                executorList.add(ass);
    +            }
    +        }
    +        return hostAssignments;
    +    }
    +
    +    private Set<Set<ExecutorDetails>> computeWorkerSpecs(TopologyDetails 
topology) {
    +        Map<String, List<ExecutorDetails>> compExecutors = 
Utils.reverseMap(topology.getExecutorToComponent());
    +
    +        List<ExecutorDetails> allExecutors = new 
ArrayList<ExecutorDetails>();
    +        Collection<List<ExecutorDetails>> values = compExecutors.values();
    +        for (List<ExecutorDetails> eList : values) {
    +            allExecutors.addAll(eList);
    +        }
    +
    +        int numWorkers = topology.getNumWorkers();
    +        int bucketIndex = 0;
    +        Map<Integer, Set<ExecutorDetails>> bucketExecutors = new 
HashMap<Integer, Set<ExecutorDetails>>(numWorkers);
    +        for (ExecutorDetails executor : allExecutors) {
    +            Set<ExecutorDetails> executors = 
bucketExecutors.get(bucketIndex);
    +            if (executors == null) {
    +                executors = new HashSet<ExecutorDetails>();
    +                bucketExecutors.put(bucketIndex, executors);
    +            }
    +            executors.add(executor);
    +            bucketIndex = (bucketIndex+1) % numWorkers;
    +        }
    +
    +        return new HashSet<Set<ExecutorDetails>>(bucketExecutors.values());
    +    }
    +
    +    private Map<String, Map<Integer, Integer>> 
topologyMachineDistributions(List<TopologyDetails> isoTopologies) {
    +        Map<String, Map<Integer, Integer>> machineDistributions = new 
HashMap<String, Map<Integer, Integer>>();
    +        for (TopologyDetails topology : isoTopologies) {
    +            machineDistributions.put(topology.getId(), 
machineDistribution(topology));
    +        }
    +        return machineDistributions;
    +    }
    +
    +    private Map<Integer, Integer> machineDistribution(TopologyDetails 
topology) {
    +        int machineNum = isoMachines.get(topology.getName()).intValue();
    +        int workerNum = topology.getNumWorkers();
    +        TreeMap<Integer, Integer> distribution = 
Utils.integerDivided(workerNum, machineNum);
    +
    +        if (distribution.containsKey(0)) {
    +            distribution.remove(0);
    +        }
    +        return distribution;
    +    }
    +
    +    private boolean checkAssignmentTopology(List<AssignmentInfo> 
assignments, String topologyId) {
    +        for (AssignmentInfo ass : assignments) {
    +            if (!topologyId.equals(ass.getTopologyId())) {
    +                return false;
    +            }
    +        }
    +        return true;
    +    }
    +
    +    private boolean checkAssignmentWorkerSpecs(List<AssignmentInfo> 
assigments, Set<Set<ExecutorDetails>> workerSpecs) {
    +        for (AssignmentInfo ass : assigments) {
    +            if (!workerSpecs.contains(ass.getExecutors())) {
    +                return false;
    +            }
    +        }
    +        return true;
    +    }
    +
    +    private void decrementDistribution(Map<Integer, Integer> distribution, 
int value) {
    +        Integer distValue = distribution.get(value);
    +        if (distValue != null) {
    +            int newValue = distValue - 1;
    +            if (newValue == 0) {
    +                distribution.remove(value);
    +            } else {
    +                distribution.put(value, newValue);
    +            }
    +        }
    +    }
    +
    +    private Map<String, Set<WorkerSlot>> hostUsedSlots(Cluster cluster) {
    +        Collection<WorkerSlot> usedSlots = cluster.getUsedSlots();
    +        Map<String, Set<WorkerSlot>> hostUsedSlots = new HashMap<String, 
Set<WorkerSlot>>();
    +        for (WorkerSlot slot : usedSlots) {
    +            String host = cluster.getHost(slot.getNodeId());
    +            Set<WorkerSlot> slots = hostUsedSlots.get(host);
    +            if (slots == null) {
    +                slots = new HashSet<WorkerSlot>();
    +                hostUsedSlots.put(host, slots);
    +            }
    +            slots.add(slot);
    +        }
    +        return hostUsedSlots;
    +    }
    +
    +    // returns list of list of slots, reverse sorted by number of slots
    +    private LinkedList<HostAssignableSlots> hostAssignableSlots(Cluster 
cluster) {
    +        List<WorkerSlot> assignableSlots = cluster.getAssignableSlots();
    +        Map<String, List<WorkerSlot>> hostAssignableSlots = new 
HashMap<String, List<WorkerSlot>>();
    +        for (WorkerSlot slot : assignableSlots) {
    +            String host = cluster.getHost(slot.getNodeId());
    +            List<WorkerSlot> slots = hostAssignableSlots.get(host);
    +            if (slots == null) {
    +                slots = new ArrayList<WorkerSlot>();
    +                hostAssignableSlots.put(host, slots);
    +            }
    +            slots.add(slot);
    +        }
    +        List<HostAssignableSlots> sortHostAssignSlots = new 
ArrayList<HostAssignableSlots>();
    +        for (Map.Entry<String, List<WorkerSlot>> entry : 
hostAssignableSlots.entrySet()) {
    +            sortHostAssignSlots.add(new 
HostAssignableSlots(entry.getKey(), entry.getValue()));
    +        }
    +        Collections.sort(sortHostAssignSlots, new 
Comparator<HostAssignableSlots>() {
    +                    @Override
    +                    public int compare(HostAssignableSlots o1, 
HostAssignableSlots o2) {
    +                        return o2.getWorkerSlots().size() - 
o1.getWorkerSlots().size();
    +                    }
    +                });
    +        Collections.shuffle(sortHostAssignSlots);
    +
    +        return new LinkedList<HostAssignableSlots>(sortHostAssignSlots);
    +    }
    +
    +    private List<Integer> distributionSortedAmts(Map<Integer, Integer> 
distributions) {
    +        List<Integer> sorts = new ArrayList<Integer>();
    +        for (Map.Entry<Integer, Integer> entry : distributions.entrySet()) 
{
    +            int workers = entry.getKey();
    +            int machines = entry.getValue();
    +            for (int i = 0; i < machines; i++) {
    +                sorts.add(workers);
    +            }
    +        }
    +        Collections.sort(sorts, new Comparator<Integer>() {
    +            @Override
    +            public int compare(Integer o1, Integer o2) {
    +                return o1.intValue() - o2.intValue();
    --- End diff --
    
    Order of sort should be reversed: `return o2.intValue() - o1.intValue()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to