[
https://issues.apache.org/jira/browse/STORM-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372503#comment-15372503
]
ASF GitHub Bot commented on STORM-1239:
---------------------------------------
Github user vesense commented on a diff in the pull request:
https://github.com/apache/storm/pull/1359#discussion_r70394061
--- Diff:
storm-core/src/jvm/org/apache/storm/scheduler/IsolationScheduler.java ---
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+
+// for each isolated topology:
+// compute even distribution of executors -> workers on the number of
workers specified for the topology
+// compute distribution of workers to machines
+// determine host -> list of [slot, topology id, executors]
+// iterate through hosts and: a machine is good if:
+// 1. only running workers from one isolated topology
+// 2. all workers running on it match one of the distributions of
executors for that topology
+// 3. matches one of the # of workers
+// blacklist the good hosts and remove those workers from the list of need
to be assigned workers
+// otherwise unassign all other workers for isolated topologies if assigned
+public class IsolationScheduler implements IScheduler {
+ private final static Logger LOG =
LoggerFactory.getLogger(IsolationScheduler.class);
+
+ private Map<String, Number> isoMachines;
+
+ @Override
+ public void prepare(Map conf) {
+ this.isoMachines = (Map<String, Number>)
conf.get(Config.ISOLATION_SCHEDULER_MACHINES);
+ Validate.notEmpty(isoMachines);
+ }
+
+ // get host -> all assignable worker slots for non-blacklisted
machines (assigned or not assigned)
+ // will then have a list of machines that need to be assigned (machine
-> [topology, list of list of executors])
+ // match each spec to a machine (who has the right number of workers),
free everything else on that machine and assign those slots (do one topology at
a time)
+ // blacklist all machines who had production slots defined
+ // log isolated topologies who weren't able to get enough slots /
machines
+ // run default scheduler on isolated topologies that didn't have
enough slots + non-isolated topologies on remaining machines
+ // set blacklist to what it was initially
+ @Override
+ public void schedule(Topologies topologies, Cluster cluster) {
+ Set<String> origBlacklist = cluster.getBlacklistedHosts();
+ List<TopologyDetails> isoTopologies =
isolatedTopologies(topologies.getTopologies());
+ Set<String> isoIds = isolatedTopoplogyIds(isoTopologies);
+ Map<String, Set<Set<ExecutorDetails>>> topologyWorkerSpecs =
topologyWorkerSpecs(isoTopologies);
+ Map<String, Map<Integer, Integer>> topologyMachineDistributions =
topologyMachineDistributions(isoTopologies);
+ Map<String, List<AssignmentInfo>> hostAssignments =
hostAssignments(cluster);
+
+ for (Map.Entry<String, List<AssignmentInfo>> entry :
hostAssignments.entrySet()) {
+ List<AssignmentInfo> assignments = entry.getValue();
+ String topologyId = assignments.get(0).getTopologyId();
+ Map<Integer, Integer> distribution =
topologyMachineDistributions.get(topologyId);
+ Set<Set<ExecutorDetails>> workerSpecs =
topologyWorkerSpecs.get(topologyId);
+ int numWorkers = assignments.size();
+
+ if (isoIds.contains(topologyId)
+ && checkAssignmentTopology(assignments, topologyId)
+ && distribution.containsKey(numWorkers)
+ && checkAssignmentWorkerSpecs(assignments,
workerSpecs)) {
+ decrementDistribution(distribution, numWorkers);
+ for (AssignmentInfo ass : assignments) {
+ workerSpecs.remove(ass.getExecutors());
+ }
+ cluster.blacklistHost(entry.getKey());
+ } else {
+ for (AssignmentInfo ass : assignments) {
+ if (isoIds.contains(ass.getTopologyId())) {
+ cluster.freeSlot(ass.getWorkerSlot());
+ }
+ }
+ }
+ }
+
+ Map<String, Set<WorkerSlot>> hostUsedSlots =
hostUsedSlots(cluster);
+ LinkedList<HostAssignableSlots> hss = hostAssignableSlots(cluster);
+ List<String> failedTopologyIds = new ArrayList<String>();
+ for (Map.Entry<String, Set<Set<ExecutorDetails>>> entry :
topologyWorkerSpecs.entrySet()) {
+ String topologyId = entry.getKey();
+ Set<Set<ExecutorDetails>> executorSet = entry.getValue();
+ if (executorSet != null && executorSet.size() > 0) {
+ failedTopologyIds.add(topologyId);
+ }
+ List<Integer> workerNum =
distributionSortedAmts(topologyMachineDistributions.get(topologyId));
+ for (Integer num : workerNum) {
+ HostAssignableSlots hostSlots = hss.peek();
+ List<WorkerSlot> slot = hostSlots != null ?
hostSlots.getWorkerSlots() : null;
+
+ if (slot != null && slot.size() >= num.intValue()) {
+ hss.poll();
+
cluster.freeSlots(hostUsedSlots.get(hostSlots.getHostName()));
+ for (WorkerSlot tmpSlot : slot.subList(0, num)) {
+ Set<ExecutorDetails> executor =
removeElemFromSet(executorSet);
+ cluster.assign(tmpSlot, topologyId, executor);
+ }
+ cluster.blacklistHost(hostSlots.getHostName());
+ }
+ }
+ }
+
+ if (failedTopologyIds.size() > 0) {
+ LOG.warn("Unable to isolate topologies " + failedTopologyIds
+ + ". No machine had enough worker slots to run the
remaining workers for these topologies. "
+ + "Clearing all other resources and will wait for
enough resources for "
+ + "isolated topologies before allocating any other
resources.");
+ // clear workers off all hosts that are not blacklisted
+ Map<String, Set<WorkerSlot>> usedSlots =
hostUsedSlots(cluster);
+ Set<Map.Entry<String, Set<WorkerSlot>>> entries =
usedSlots.entrySet();
+ for (Map.Entry<String, Set<WorkerSlot>> entry : entries) {
+ if (!cluster.isBlacklistedHost(entry.getKey())) {
+ cluster.freeSlots(entry.getValue());
+ }
+ }
+ } else {
+ // run default scheduler on non-isolated topologies
+ Set<String> allocatedTopologies =
allocatedTopologies(topologyWorkerSpecs);
+ Topologies leftOverTopologies = leftoverTopologies(topologies,
allocatedTopologies);
+ DefaultScheduler.defaultSchedule(leftOverTopologies, cluster);
+ }
+ cluster.setBlacklistedHosts(origBlacklist);
+ }
+
+ public Set<ExecutorDetails>
removeElemFromSet(Set<Set<ExecutorDetails>> executorsSets) {
+ Set<ExecutorDetails> elem = executorsSets.iterator().next();
+ executorsSets.remove(elem);
+ return elem;
+ }
+
+ private List<TopologyDetails>
isolatedTopologies(Collection<TopologyDetails> topologies) {
+ Set<String> topologyNames = isoMachines.keySet();
+ List<TopologyDetails> isoTopologies = new
ArrayList<TopologyDetails>();
+ for (TopologyDetails topo : topologies) {
+ if (topologyNames.contains(topo.getName())) {
+ isoTopologies.add(topo);
+ }
+ }
+ return isoTopologies;
+ }
+
+ private Set<String> isolatedTopoplogyIds(List<TopologyDetails>
topologies) {
+ Set<String> ids = new HashSet<String>();
+ if (topologies != null && topologies.size() > 0) {
+ for (TopologyDetails topology : topologies) {
+ ids.add(topology.getId());
+ }
+ }
+ return ids;
+ }
+
+ // map from topology id -> set of sets of executors
+ private Map<String, Set<Set<ExecutorDetails>>>
topologyWorkerSpecs(List<TopologyDetails> topologies) {
+ Map<String, Set<Set<ExecutorDetails>>> workerSpecs = new
HashMap<String, Set<Set<ExecutorDetails>>>();
+ for (TopologyDetails topology : topologies) {
+ workerSpecs.put(topology.getId(),
computeWorkerSpecs(topology));
+ }
+ return workerSpecs;
+ }
+
+ private Map<String, List<AssignmentInfo>> hostAssignments(Cluster
cluster) {
+ Collection<SchedulerAssignment> assignmentValues =
cluster.getAssignments().values();
+ Map<String, List<AssignmentInfo>> hostAssignments = new
HashMap<String, List<AssignmentInfo>>();
+
+ for (SchedulerAssignment sa : assignmentValues) {
+ Map<WorkerSlot, List<ExecutorDetails>> slotExecutors =
Utils.reverseMap(sa.getExecutorToSlot());
+ Set<Map.Entry<WorkerSlot, List<ExecutorDetails>>> entries =
slotExecutors.entrySet();
+ for (Map.Entry<WorkerSlot, List<ExecutorDetails>> entry :
entries) {
+ WorkerSlot slot = entry.getKey();
+ List<ExecutorDetails> executors = entry.getValue();
+
+ String host = cluster.getHost(slot.getNodeId());
+ AssignmentInfo ass = new AssignmentInfo(slot,
sa.getTopologyId(), new HashSet<ExecutorDetails>(executors));
+ List<AssignmentInfo> executorList =
hostAssignments.get(host);
+ if (executorList == null) {
+ executorList = new ArrayList<AssignmentInfo>();
+ hostAssignments.put(host, executorList);
+ }
+ executorList.add(ass);
+ }
+ }
+ return hostAssignments;
+ }
+
+ private Set<Set<ExecutorDetails>> computeWorkerSpecs(TopologyDetails
topology) {
+ Map<String, List<ExecutorDetails>> compExecutors =
Utils.reverseMap(topology.getExecutorToComponent());
+
+ List<ExecutorDetails> allExecutors = new
ArrayList<ExecutorDetails>();
+ Collection<List<ExecutorDetails>> values = compExecutors.values();
+ for (List<ExecutorDetails> eList : values) {
+ allExecutors.addAll(eList);
+ }
+
+ int numWorkers = topology.getNumWorkers();
+ int bucketIndex = 0;
+ Map<Integer, Set<ExecutorDetails>> bucketExecutors = new
HashMap<Integer, Set<ExecutorDetails>>(numWorkers);
+ for (ExecutorDetails executor : allExecutors) {
+ Set<ExecutorDetails> executors =
bucketExecutors.get(bucketIndex);
+ if (executors == null) {
+ executors = new HashSet<ExecutorDetails>();
+ bucketExecutors.put(bucketIndex, executors);
+ }
+ executors.add(executor);
+ bucketIndex = (bucketIndex+1) % numWorkers;
+ }
+
+ return new HashSet<Set<ExecutorDetails>>(bucketExecutors.values());
+ }
+
+ private Map<String, Map<Integer, Integer>>
topologyMachineDistributions(List<TopologyDetails> isoTopologies) {
+ Map<String, Map<Integer, Integer>> machineDistributions = new
HashMap<String, Map<Integer, Integer>>();
+ for (TopologyDetails topology : isoTopologies) {
+ machineDistributions.put(topology.getId(),
machineDistribution(topology));
+ }
+ return machineDistributions;
+ }
+
+ private Map<Integer, Integer> machineDistribution(TopologyDetails
topology) {
+ int machineNum = isoMachines.get(topology.getName()).intValue();
+ int workerNum = topology.getNumWorkers();
+ TreeMap<Integer, Integer> distribution =
Utils.integerDivided(workerNum, machineNum);
+
+ if (distribution.containsKey(0)) {
+ distribution.remove(0);
+ }
+ return distribution;
+ }
+
+ private boolean checkAssignmentTopology(List<AssignmentInfo>
assignments, String topologyId) {
+ for (AssignmentInfo ass : assignments) {
+ if (!topologyId.equals(ass.getTopologyId())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean checkAssignmentWorkerSpecs(List<AssignmentInfo>
assigments, Set<Set<ExecutorDetails>> workerSpecs) {
+ for (AssignmentInfo ass : assigments) {
+ if (!workerSpecs.contains(ass.getExecutors())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void decrementDistribution(Map<Integer, Integer> distribution,
int value) {
+ Integer distValue = distribution.get(value);
+ if (distValue != null) {
+ int newValue = distValue - 1;
+ if (newValue == 0) {
+ distribution.remove(value);
+ } else {
+ distribution.put(value, newValue);
+ }
+ }
+ }
+
+ private Map<String, Set<WorkerSlot>> hostUsedSlots(Cluster cluster) {
--- End diff --
Will fix.
> port backtype.storm.scheduler.IsolationScheduler to java
> ---------------------------------------------------------
>
> Key: STORM-1239
> URL: https://issues.apache.org/jira/browse/STORM-1239
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Xin Wang
> Labels: java-migration, jstorm-merger
>
> port the isolation scheduler to java
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)