[ 
https://issues.apache.org/jira/browse/STORM-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15276454#comment-15276454
 ] 

ASF GitHub Bot commented on STORM-1239:
---------------------------------------

Github user vesense commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1359#discussion_r62511356
  
    --- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/IsolationScheduler.java ---
    @@ -0,0 +1,434 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.scheduler;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.IScheduler;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.Topologies;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +
    +public class IsolationScheduler implements IScheduler {
    +    private final static Logger LOG = 
LoggerFactory.getLogger(IsolationScheduler.class);
    +
    +    private Map conf;
    +
    +    @Override
    +    public void prepare(Map conf) {
    +        this.conf = conf;
    +    }
    +
    +    // for each isolated topology:
    +    //   compute even distribution of executors -> workers on the number 
of workers specified for the topology
    +    //   compute distribution of workers to machines
    +    // determine host -> list of [slot, topology id, executors]
    +    // iterate through hosts and: a machine is good if:
    +    //   1. only running workers from one isolated topology
    +    //   2. all workers running on it match one of the distributions of 
executors for that topology
    +    //   3. matches one of the # of workers
    +    // blacklist the good hosts and remove those workers from the list of 
need to be assigned workers
    +    // otherwise unassign all other workers for isolated topologies if 
assigned
    +
    +    public Set<ExecutorDetails> 
removeElemFromSet(Set<Set<ExecutorDetails>> executorsSets) {
    +        if (executorsSets.iterator().hasNext()) {
    +            Set<ExecutorDetails> elem = executorsSets.iterator().next();
    +            executorsSets.remove(elem);
    +            return elem;
    +        } else {
    +            return null;
    +        }
    +    }
    +
    +    // get host -> all assignable worker slots for non-blacklisted 
machines (assigned or not assigned)
    +    // will then have a list of machines that need to be assigned (machine 
-> [topology, list of list of executors])
    +    // match each spec to a machine (who has the right number of workers), 
free everything else on that machine and assign those slots (do one topology at 
a time)
    +    // blacklist all machines who had production slots defined
    +    // log isolated topologies who weren't able to get enough slots / 
machines
    +    // run default scheduler on isolated topologies that didn't have 
enough slots + non-isolated topologies on remaining machines
    +    // set blacklist to what it was initially
    +    @Override
    +    public void schedule(Topologies topologies, Cluster cluster) {
    +        Set<String> origBlacklist = cluster.getBlacklistedHosts();
    +        List<TopologyDetails> isoTopologies = 
isolatedTopologies(topologies.getTopologies());
    +        Set<String> isoIds = isolatedTopoplogyIds(isoTopologies);
    +        Map<String, Set<Set<ExecutorDetails>>> topologyWorkerSpecs = 
topologyWorkerSpecs(isoTopologies);
    +        Map<String, Map<Integer, Integer>> topologyMachineDistributions = 
topologyMachineDistributions(isoTopologies);
    +        Map<String, List<AssignmentInfo>> hostAssignments = 
hostAssignments(cluster);
    +
    +        Set<Map.Entry<String, List<AssignmentInfo>>> hostAssignEntries = 
hostAssignments.entrySet();
    +        for (Map.Entry<String, List<AssignmentInfo>> entry : 
hostAssignEntries) {
    +            List<AssignmentInfo> assInfos = entry.getValue();
    +            String topologyId = assInfos.get(0).getTopologyId();
    +            Map<Integer, Integer> distribution = 
topologyMachineDistributions.get(topologyId);
    +            Set<Set<ExecutorDetails>> workerSpecs = 
topologyWorkerSpecs.get(topologyId);
    +            int numWorkers = assInfos.size();
    +
    +            if (isoIds.contains(topologyId)
    +                    && checkAssignmentTopology(assInfos, topologyId)
    +                    && (distribution != null && 
distribution.containsKey(numWorkers))
    +                    && checkAssignmentWorkerSpecs(assInfos, workerSpecs)) {
    +                decrementDistribution(distribution, numWorkers);
    +                for (AssignmentInfo ass : assInfos) {
    +                    workerSpecs.remove(ass.getExecutors());
    +                }
    +                cluster.blacklistHost(entry.getKey());
    +            } else {
    +                for (AssignmentInfo ass : assInfos) {
    +                    if (isoIds.contains(ass.getTopologyId())) {
    +                        cluster.freeSlot(ass.getWorkerSlot());
    +                    }
    +                }
    +            }
    +        }
    +
    +        Map<String, Set<WorkerSlot>> hostUsedSlots = 
hostUsedSlots(cluster);
    +        LinkedList<HostAssignableSlots> hss = hostAssignableSlots(cluster);
    +        List<String> failedTopologyIds = new ArrayList<String>();
    +        for (Map.Entry<String, Set<Set<ExecutorDetails>>> entry : 
topologyWorkerSpecs.entrySet()) {
    +            String topologyId = entry.getKey();
    +            Set<Set<ExecutorDetails>> executorSet = entry.getValue();
    +            if (executorSet != null && executorSet.size() > 0) {
    +                failedTopologyIds.add(topologyId);
    +            }
    +            List<Integer> workerNum = 
distributionSortedAmts(topologyMachineDistributions.get(topologyId));
    +            for (Integer num : workerNum) {
    +                HostAssignableSlots hostSlots = hss.peek();
    +                List<WorkerSlot> slot = hostSlots != null ? 
hostSlots.getWorkerSlots() : null;
    +
    +                if (slot != null && slot.size() >= num.intValue()) {
    +                    hss.poll();
    +                    
cluster.freeSlots(hostUsedSlots.get(hostSlots.getHostName()));
    +                    for (WorkerSlot tmpSlot : slot.subList(0, num)) {
    +                        Set<ExecutorDetails> executor = 
removeElemFromSet(executorSet);
    +                        cluster.assign(tmpSlot, topologyId, executor);
    +                    }
    +                    cluster.blacklistHost(hostSlots.getHostName());
    +                }
    +            }
    +        }
    +
    +        if (failedTopologyIds.size() > 0) {
    +            LOG.warn("Unable to isolate topologies " + failedTopologyIds
    +                    + ". No machine had enough worker slots to run the 
remaining workers for these topologies. "
    +                    + "Clearing all other resources and will wait for 
enough resources for "
    +                    + "isolated topologies before allocating any other 
resources.");
    +            // clear workers off all hosts that are not blacklisted
    +            Map<String, Set<WorkerSlot>> usedSlots = 
hostUsedSlots(cluster);
    +            Set<Map.Entry<String, Set<WorkerSlot>>> entries = 
usedSlots.entrySet();
    +            for (Map.Entry<String, Set<WorkerSlot>> entry : entries) {
    +                if (!cluster.isBlacklistedHost(entry.getKey())) {
    +                    cluster.freeSlots(entry.getValue());
    +                }
    +            }
    +        } else {
    +            // run default scheduler on non-isolated topologies
    +            Set<String> allocatedTopologies = 
allocatedTopologies(topologyWorkerSpecs);
    +            Topologies leftOverTopologies = leftoverTopologies(topologies, 
allocatedTopologies);
    +            DefaultScheduler.defaultSchedule(leftOverTopologies, cluster);
    +        }
    +        cluster.setBlacklistedHosts(origBlacklist);
    +    }
    +
    +    private List<TopologyDetails> 
isolatedTopologies(Collection<TopologyDetails> topologies) {
    +        Object isoMachines = conf.get(Config.ISOLATION_SCHEDULER_MACHINES);
    +        if (isoMachines == null) {
    +            return null;
    +        }
    +
    +        Set<String> topologyNames = ((Map<String, Number>) 
isoMachines).keySet();
    +        List<TopologyDetails> isoTopologies = new 
ArrayList<TopologyDetails>();
    +        for (TopologyDetails topo : topologies) {
    +            if (topologyNames.contains(topo.getName())) {
    +                isoTopologies.add(topo);
    +            }
    +        }
    +        return isoTopologies;
    +    }
    +
    +    private Set<String> isolatedTopoplogyIds(List<TopologyDetails> 
topologies) {
    +        Set<String> ids = new HashSet<String>();
    +        if (topologies != null && topologies.size() > 0) {
    +            for (TopologyDetails topology : topologies) {
    +                ids.add(topology.getId());
    +            }
    +        }
    +        return ids;
    +    }
    +
    +    // map from topology id -> set of sets of executors
    +    private Map<String, Set<Set<ExecutorDetails>>> 
topologyWorkerSpecs(List<TopologyDetails> topologies) {
    +        Map<String, Set<Set<ExecutorDetails>>> workerSpecs = new 
HashMap<String, Set<Set<ExecutorDetails>>>();
    +        for (TopologyDetails topology : topologies) {
    +            workerSpecs.put(topology.getId(), 
computeWorkerSpecs(topology));
    +        }
    +        return workerSpecs;
    +    }
    +
    +    private Map<String, List<AssignmentInfo>> hostAssignments(Cluster 
cluster) {
    +        Collection<SchedulerAssignment> assignmentValues =  
cluster.getAssignments().values();
    +        Map<String, List<AssignmentInfo>> hostAssignments = new 
HashMap<String, List<AssignmentInfo>>();
    +
    +        for (SchedulerAssignment sa : assignmentValues) {
    +            Map<WorkerSlot, List<ExecutorDetails>> slotExecutors = 
Utils.reverseMap(sa.getExecutorToSlot());
    +            Set<Map.Entry<WorkerSlot, List<ExecutorDetails>>> entries = 
slotExecutors.entrySet();
    +            for (Map.Entry<WorkerSlot, List<ExecutorDetails>> entry : 
entries) {
    +                WorkerSlot slot = entry.getKey();
    +                List<ExecutorDetails> executors = entry.getValue();
    +
    +                String host = cluster.getHost(slot.getNodeId());
    +                AssignmentInfo ass = new AssignmentInfo(slot, 
sa.getTopologyId(), new HashSet<ExecutorDetails>(executors));
    +                List<AssignmentInfo> executorList = 
hostAssignments.get(host);
    +                if (executorList == null) {
    +                    executorList = new ArrayList<AssignmentInfo>();
    +                    hostAssignments.put(host, executorList);
    +                }
    +                executorList.add(ass);
    +            }
    +        }
    +        return hostAssignments;
    +    }
    +
    +    private Set<Set<ExecutorDetails>> computeWorkerSpecs(TopologyDetails 
topology) {
    +        Map<String, List<ExecutorDetails>> compExecutors = 
Utils.reverseMap(topology.getExecutorToComponent());
    +
    +        List<ExecutorDetails> allExecutors = new 
ArrayList<ExecutorDetails>();
    +        Collection<List<ExecutorDetails>> values = compExecutors.values();
    +        for (List<ExecutorDetails> eList : values) {
    +            allExecutors.addAll(eList);
    +        }
    +
    +        int numWorkers = topology.getNumWorkers();
    +        int bucketIndex = 0;
    +        Map<Integer, Set<ExecutorDetails>> bucketExecutors = new 
HashMap<Integer, Set<ExecutorDetails>>(numWorkers);
    +        for (ExecutorDetails executor : allExecutors) {
    +            Set<ExecutorDetails> executors = 
bucketExecutors.get(bucketIndex);
    +            if (executors == null) {
    +                executors = new HashSet<ExecutorDetails>();
    +                bucketExecutors.put(bucketIndex, executors);
    +            }
    +            executors.add(executor);
    +            bucketIndex = (++bucketIndex) % numWorkers;
    --- End diff --
    
    Done.


> port  backtype.storm.scheduler.IsolationScheduler to java
> ---------------------------------------------------------
>
>                 Key: STORM-1239
>                 URL: https://issues.apache.org/jira/browse/STORM-1239
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Xin Wang
>              Labels: java-migration, jstorm-merger
>
> port the isolation scheduler to java



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to