[ https://issues.apache.org/jira/browse/STORM-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15276454#comment-15276454 ]
ASF GitHub Bot commented on STORM-1239: --------------------------------------- Github user vesense commented on a diff in the pull request: https://github.com/apache/storm/pull/1359#discussion_r62511356 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/IsolationScheduler.java --- @@ -0,0 +1,434 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.Config; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.ExecutorDetails; +import org.apache.storm.scheduler.IScheduler; +import org.apache.storm.scheduler.SchedulerAssignment; +import org.apache.storm.scheduler.Topologies; +import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.scheduler.WorkerSlot; + +public class IsolationScheduler implements IScheduler { + private final static Logger LOG = LoggerFactory.getLogger(IsolationScheduler.class); + + private Map conf; + + @Override + public void prepare(Map conf) { + this.conf = conf; + } + + // for each isolated topology: + // compute even distribution of executors -> workers on the number of workers specified for the topology + // compute distribution of workers to machines + // determine host -> list of [slot, topology id, executors] + // iterate through hosts and: a machine is good if: + // 1. only running workers from one isolated topology + // 2. all workers running on it match one of the distributions of executors for that topology + // 3. matches one of the # of workers + // blacklist the good hosts and remove those workers from the list of need to be assigned workers + // otherwise unassign all other workers for isolated topologies if assigned + + public Set<ExecutorDetails> removeElemFromSet(Set<Set<ExecutorDetails>> executorsSets) { + if (executorsSets.iterator().hasNext()) { + Set<ExecutorDetails> elem = executorsSets.iterator().next(); + executorsSets.remove(elem); + return elem; + } else { + return null; + } + } + + // get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned) + // will then have a list of machines that need to be assigned (machine -> [topology, list of list of executors]) + // match each spec to a machine (who has the right number of workers), free everything else on that machine and assign those slots (do one topology at a time) + // blacklist all machines who had production slots defined + // log isolated topologies who weren't able to get enough slots / machines + // run default scheduler on isolated topologies that didn't have enough slots + non-isolated topologies on remaining machines + // set blacklist to what it was initially + @Override + public void schedule(Topologies topologies, Cluster cluster) { + Set<String> origBlacklist = cluster.getBlacklistedHosts(); + List<TopologyDetails> isoTopologies = isolatedTopologies(topologies.getTopologies()); + Set<String> isoIds = isolatedTopoplogyIds(isoTopologies); + Map<String, Set<Set<ExecutorDetails>>> topologyWorkerSpecs = topologyWorkerSpecs(isoTopologies); + Map<String, Map<Integer, Integer>> topologyMachineDistributions = topologyMachineDistributions(isoTopologies); + Map<String, List<AssignmentInfo>> hostAssignments = hostAssignments(cluster); + + Set<Map.Entry<String, List<AssignmentInfo>>> hostAssignEntries = hostAssignments.entrySet(); + for (Map.Entry<String, List<AssignmentInfo>> entry : hostAssignEntries) { + List<AssignmentInfo> assInfos = entry.getValue(); + String topologyId = assInfos.get(0).getTopologyId(); + Map<Integer, Integer> distribution = topologyMachineDistributions.get(topologyId); + Set<Set<ExecutorDetails>> workerSpecs = topologyWorkerSpecs.get(topologyId); + int numWorkers = assInfos.size(); + + if (isoIds.contains(topologyId) + && checkAssignmentTopology(assInfos, topologyId) + && (distribution != null && distribution.containsKey(numWorkers)) + && checkAssignmentWorkerSpecs(assInfos, workerSpecs)) { + decrementDistribution(distribution, numWorkers); + for (AssignmentInfo ass : assInfos) { + workerSpecs.remove(ass.getExecutors()); + } + cluster.blacklistHost(entry.getKey()); + } else { + for (AssignmentInfo ass : assInfos) { + if (isoIds.contains(ass.getTopologyId())) { + cluster.freeSlot(ass.getWorkerSlot()); + } + } + } + } + + Map<String, Set<WorkerSlot>> hostUsedSlots = hostUsedSlots(cluster); + LinkedList<HostAssignableSlots> hss = hostAssignableSlots(cluster); + List<String> failedTopologyIds = new ArrayList<String>(); + for (Map.Entry<String, Set<Set<ExecutorDetails>>> entry : topologyWorkerSpecs.entrySet()) { + String topologyId = entry.getKey(); + Set<Set<ExecutorDetails>> executorSet = entry.getValue(); + if (executorSet != null && executorSet.size() > 0) { + failedTopologyIds.add(topologyId); + } + List<Integer> workerNum = distributionSortedAmts(topologyMachineDistributions.get(topologyId)); + for (Integer num : workerNum) { + HostAssignableSlots hostSlots = hss.peek(); + List<WorkerSlot> slot = hostSlots != null ? hostSlots.getWorkerSlots() : null; + + if (slot != null && slot.size() >= num.intValue()) { + hss.poll(); + cluster.freeSlots(hostUsedSlots.get(hostSlots.getHostName())); + for (WorkerSlot tmpSlot : slot.subList(0, num)) { + Set<ExecutorDetails> executor = removeElemFromSet(executorSet); + cluster.assign(tmpSlot, topologyId, executor); + } + cluster.blacklistHost(hostSlots.getHostName()); + } + } + } + + if (failedTopologyIds.size() > 0) { + LOG.warn("Unable to isolate topologies " + failedTopologyIds + + ". No machine had enough worker slots to run the remaining workers for these topologies. " + + "Clearing all other resources and will wait for enough resources for " + + "isolated topologies before allocating any other resources."); + // clear workers off all hosts that are not blacklisted + Map<String, Set<WorkerSlot>> usedSlots = hostUsedSlots(cluster); + Set<Map.Entry<String, Set<WorkerSlot>>> entries = usedSlots.entrySet(); + for (Map.Entry<String, Set<WorkerSlot>> entry : entries) { + if (!cluster.isBlacklistedHost(entry.getKey())) { + cluster.freeSlots(entry.getValue()); + } + } + } else { + // run default scheduler on non-isolated topologies + Set<String> allocatedTopologies = allocatedTopologies(topologyWorkerSpecs); + Topologies leftOverTopologies = leftoverTopologies(topologies, allocatedTopologies); + DefaultScheduler.defaultSchedule(leftOverTopologies, cluster); + } + cluster.setBlacklistedHosts(origBlacklist); + } + + private List<TopologyDetails> isolatedTopologies(Collection<TopologyDetails> topologies) { + Object isoMachines = conf.get(Config.ISOLATION_SCHEDULER_MACHINES); + if (isoMachines == null) { + return null; + } + + Set<String> topologyNames = ((Map<String, Number>) isoMachines).keySet(); + List<TopologyDetails> isoTopologies = new ArrayList<TopologyDetails>(); + for (TopologyDetails topo : topologies) { + if (topologyNames.contains(topo.getName())) { + isoTopologies.add(topo); + } + } + return isoTopologies; + } + + private Set<String> isolatedTopoplogyIds(List<TopologyDetails> topologies) { + Set<String> ids = new HashSet<String>(); + if (topologies != null && topologies.size() > 0) { + for (TopologyDetails topology : topologies) { + ids.add(topology.getId()); + } + } + return ids; + } + + // map from topology id -> set of sets of executors + private Map<String, Set<Set<ExecutorDetails>>> topologyWorkerSpecs(List<TopologyDetails> topologies) { + Map<String, Set<Set<ExecutorDetails>>> workerSpecs = new HashMap<String, Set<Set<ExecutorDetails>>>(); + for (TopologyDetails topology : topologies) { + workerSpecs.put(topology.getId(), computeWorkerSpecs(topology)); + } + return workerSpecs; + } + + private Map<String, List<AssignmentInfo>> hostAssignments(Cluster cluster) { + Collection<SchedulerAssignment> assignmentValues = cluster.getAssignments().values(); + Map<String, List<AssignmentInfo>> hostAssignments = new HashMap<String, List<AssignmentInfo>>(); + + for (SchedulerAssignment sa : assignmentValues) { + Map<WorkerSlot, List<ExecutorDetails>> slotExecutors = Utils.reverseMap(sa.getExecutorToSlot()); + Set<Map.Entry<WorkerSlot, List<ExecutorDetails>>> entries = slotExecutors.entrySet(); + for (Map.Entry<WorkerSlot, List<ExecutorDetails>> entry : entries) { + WorkerSlot slot = entry.getKey(); + List<ExecutorDetails> executors = entry.getValue(); + + String host = cluster.getHost(slot.getNodeId()); + AssignmentInfo ass = new AssignmentInfo(slot, sa.getTopologyId(), new HashSet<ExecutorDetails>(executors)); + List<AssignmentInfo> executorList = hostAssignments.get(host); + if (executorList == null) { + executorList = new ArrayList<AssignmentInfo>(); + hostAssignments.put(host, executorList); + } + executorList.add(ass); + } + } + return hostAssignments; + } + + private Set<Set<ExecutorDetails>> computeWorkerSpecs(TopologyDetails topology) { + Map<String, List<ExecutorDetails>> compExecutors = Utils.reverseMap(topology.getExecutorToComponent()); + + List<ExecutorDetails> allExecutors = new ArrayList<ExecutorDetails>(); + Collection<List<ExecutorDetails>> values = compExecutors.values(); + for (List<ExecutorDetails> eList : values) { + allExecutors.addAll(eList); + } + + int numWorkers = topology.getNumWorkers(); + int bucketIndex = 0; + Map<Integer, Set<ExecutorDetails>> bucketExecutors = new HashMap<Integer, Set<ExecutorDetails>>(numWorkers); + for (ExecutorDetails executor : allExecutors) { + Set<ExecutorDetails> executors = bucketExecutors.get(bucketIndex); + if (executors == null) { + executors = new HashSet<ExecutorDetails>(); + bucketExecutors.put(bucketIndex, executors); + } + executors.add(executor); + bucketIndex = (++bucketIndex) % numWorkers; --- End diff -- Done. > port backtype.storm.scheduler.IsolationScheduler to java > --------------------------------------------------------- > > Key: STORM-1239 > URL: https://issues.apache.org/jira/browse/STORM-1239 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Xin Wang > Labels: java-migration, jstorm-merger > > port the isolation scheduler to java -- This message was sent by Atlassian JIRA (v6.3.4#6332)