http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java index fc9182d..57ee348 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java @@ -54,11 +54,11 @@ public class ResourceNameNormalizer { return new HashMap<>(); } return new HashMap<>(resourceMap.entrySet().stream() - .collect(Collectors.toMap( - //Map the key if needed - (e) -> resourceNameMapping.getOrDefault(e.getKey(), e.getKey()), - //Map the value - (e) -> e.getValue().doubleValue()))); + .collect(Collectors.toMap( + //Map the key if needed + (e) -> resourceNameMapping.getOrDefault(e.getKey(), e.getKey()), + //Map the value + (e) -> e.getValue().doubleValue()))); } public Map<String, String> getResourceNameMapping() {
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java index 8b401aa..2e1ca79 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.resource.strategies.priority; @@ -33,12 +27,44 @@ import org.slf4j.LoggerFactory; public class DefaultSchedulingPriorityStrategy implements ISchedulingPriorityStrategy { private static final Logger LOG = LoggerFactory.getLogger(DefaultSchedulingPriorityStrategy.class); + protected SimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) { + return new SimulatedUser(u, cluster); + } + + @Override + public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) { + double cpuAvail = cluster.getClusterTotalCpuResource(); + double memAvail = cluster.getClusterTotalMemoryResource(); + + List<TopologyDetails> allUserTopologies = new ArrayList<>(); + List<SimulatedUser> users = new ArrayList<>(); + for (User u : userMap.values()) { + users.add(getSimulatedUserFor(u, cluster)); + } + while (!users.isEmpty()) { + Collections.sort(users, new SimulatedUserComparator(cpuAvail, memAvail)); + SimulatedUser u = users.get(0); + TopologyDetails td = u.getNextHighest(); + if (td == null) { + users.remove(0); + } else { + double score = u.getScore(cpuAvail, memAvail); + td = u.simScheduleNextHighest(); + LOG.info("SIM Scheduling {} with score of {}", td.getId(), score); + cpuAvail -= td.getTotalRequestedCpu(); + memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()); + allUserTopologies.add(td); + } + } + return allUserTopologies; + } + protected static class SimulatedUser { + public final double guaranteedCpu; + public final double guaranteedMemory; protected final LinkedList<TopologyDetails> tds = new LinkedList<>(); private double assignedCpu = 0.0; private double assignedMemory = 0.0; - public final double guaranteedCpu; - public final double guaranteedMemory; public SimulatedUser(User other, ISchedulingState cluster) { tds.addAll(cluster.getTopologies().getTopologiesOwnedBy(other.getId())); @@ -82,8 +108,8 @@ public class DefaultSchedulingPriorityStrategy implements ISchedulingPriorityStr } double wouldBeCpu = assignedCpu + td.getTotalRequestedCpu(); double wouldBeMem = assignedMemory + td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap(); - double cpuScore = (wouldBeCpu - guaranteedCpu)/availableCpu; - double memScore = (wouldBeMem - guaranteedMemory)/availableMemory; + double cpuScore = (wouldBeCpu - guaranteedCpu) / availableCpu; + double memScore = (wouldBeMem - guaranteedMemory) / availableMemory; return Math.max(cpuScore, memScore); } @@ -94,38 +120,6 @@ public class DefaultSchedulingPriorityStrategy implements ISchedulingPriorityStr } - protected SimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) { - return new SimulatedUser(u, cluster); - } - - @Override - public List<TopologyDetails> getOrderedTopologies(ISchedulingState cluster, Map<String, User> userMap) { - double cpuAvail = cluster.getClusterTotalCpuResource(); - double memAvail = cluster.getClusterTotalMemoryResource(); - - List<TopologyDetails> allUserTopologies = new ArrayList<>(); - List<SimulatedUser> users = new ArrayList<>(); - for (User u : userMap.values()) { - users.add(getSimulatedUserFor(u, cluster)); - } - while (!users.isEmpty()) { - Collections.sort(users, new SimulatedUserComparator(cpuAvail, memAvail)); - SimulatedUser u = users.get(0); - TopologyDetails td = u.getNextHighest(); - if (td == null) { - users.remove(0); - } else { - double score = u.getScore(cpuAvail, memAvail); - td = u.simScheduleNextHighest(); - LOG.info("SIM Scheduling {} with score of {}", td.getId(), score); - cpuAvail -= td.getTotalRequestedCpu(); - memAvail -= (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()); - allUserTopologies.add(td); - } - } - return allUserTopologies; - } - private static class SimulatedUserComparator implements Comparator<SimulatedUser> { private final double cpuAvail; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/FIFOSchedulingPriorityStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/FIFOSchedulingPriorityStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/FIFOSchedulingPriorityStrategy.java index 1c76b8d..98b3f4e 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/FIFOSchedulingPriorityStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/FIFOSchedulingPriorityStrategy.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.resource.strategies.priority; @@ -29,6 +23,11 @@ import org.slf4j.LoggerFactory; public class FIFOSchedulingPriorityStrategy extends DefaultSchedulingPriorityStrategy { private static final Logger LOG = LoggerFactory.getLogger(FIFOSchedulingPriorityStrategy.class); + @Override + protected SimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) { + return new FIFOSimulatedUser(u, cluster); + } + protected static class FIFOSimulatedUser extends SimulatedUser { public FIFOSimulatedUser(User other, ISchedulingState cluster) { @@ -54,11 +53,6 @@ public class FIFOSchedulingPriorityStrategy extends DefaultSchedulingPriorityStr } } - @Override - protected SimulatedUser getSimulatedUserFor(User u, ISchedulingState cluster) { - return new FIFOSimulatedUser(u, cluster); - } - /** * Comparator that sorts topologies by priority and then by submission time * First sort by Topology Priority, if there is a tie for topology priority, topology uptime is used to sort http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java index 77ce0d9..8bb50af 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java @@ -1,26 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.resource.strategies.priority; import java.util.List; import java.util.Map; - import org.apache.storm.scheduler.ISchedulingState; import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.resource.User; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java index bd4d3ce..a474ccc 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java @@ -20,12 +20,10 @@ package org.apache.storm.scheduler.resource.strategies.scheduling; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; - import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -47,8 +45,8 @@ import org.slf4j.LoggerFactory; public abstract class BaseResourceAwareStrategy implements IStrategy { private static final Logger LOG = LoggerFactory.getLogger(BaseResourceAwareStrategy.class); protected Cluster cluster; - private Map<String, List<String>> networkTopography; protected RAS_Nodes nodes; + private Map<String, List<String>> networkTopography; @VisibleForTesting void prepare(Cluster cluster) { @@ -66,42 +64,43 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { /** * Schedule executor exec from topology td. * - * @param exec the executor to schedule - * @param td the topology executor exec is a part of + * @param exec the executor to schedule + * @param td the topology executor exec is a part of * @param scheduledTasks executors that have been scheduled */ protected void scheduleExecutor( - ExecutorDetails exec, TopologyDetails td, Collection<ExecutorDetails> scheduledTasks, List<ObjectResources> sortedNodes) { + ExecutorDetails exec, TopologyDetails td, Collection<ExecutorDetails> scheduledTasks, List<ObjectResources> sortedNodes) { WorkerSlot targetSlot = findWorkerForExec(exec, td, sortedNodes); if (targetSlot != null) { RAS_Node targetNode = idToNode(targetSlot.getNodeId()); targetNode.assignSingleExecutor(targetSlot, exec, td); scheduledTasks.add(exec); LOG.debug( - "TASK {} assigned to Node: {} avail [ mem: {} cpu: {} ] total [ mem: {} cpu: {} ] on " - + "slot: {} on Rack: {}", - exec, - targetNode.getHostname(), - targetNode.getAvailableMemoryResources(), - targetNode.getAvailableCpuResources(), - targetNode.getTotalMemoryResources(), - targetNode.getTotalCpuResources(), - targetSlot, - nodeToRack(targetNode)); + "TASK {} assigned to Node: {} avail [ mem: {} cpu: {} ] total [ mem: {} cpu: {} ] on " + + "slot: {} on Rack: {}", + exec, + targetNode.getHostname(), + targetNode.getAvailableMemoryResources(), + targetNode.getAvailableCpuResources(), + targetNode.getTotalMemoryResources(), + targetNode.getTotalCpuResources(), + targetSlot, + nodeToRack(targetNode)); } else { LOG.error("Not Enough Resources to schedule Task {}", exec); } } protected abstract TreeSet<ObjectResources> sortObjectResources( - final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails, - final ExistingScheduleFunc existingScheduleFunc + final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails, + final ExistingScheduleFunc existingScheduleFunc ); + /** * Find a worker to schedule executor exec on. * * @param exec the executor to schedule - * @param td the topology that the executor is a part of + * @param td the topology that the executor is a part of * @return a worker to assign exec on. Returns null if a worker cannot be successfully found in cluster */ protected WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, List<ObjectResources> sortedNodes) { @@ -117,98 +116,24 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { } /** - * interface for calculating the number of existing executors scheduled on a object (rack or - * node). - */ - protected interface ExistingScheduleFunc { - int getNumExistingSchedule(String objectId); - } - - /** - * a class to contain individual object resources as well as cumulative stats. - */ - static class AllResources { - List<ObjectResources> objectResources = new LinkedList<>(); - NormalizedResourceOffer availableResourcesOverall = new NormalizedResourceOffer(); - NormalizedResourceOffer totalResourcesOverall = new NormalizedResourceOffer(); - String identifier; - - public AllResources(String identifier) { - this.identifier = identifier; - } - - public AllResources(AllResources other) { - this (null, - new NormalizedResourceOffer(other.availableResourcesOverall), - new NormalizedResourceOffer(other.totalResourcesOverall), - other.identifier); - List<ObjectResources> objectResourcesList = new ArrayList<>(); - for (ObjectResources objectResource : other.objectResources) { - objectResourcesList.add(new ObjectResources(objectResource)); - } - this.objectResources = objectResourcesList; - } - - public AllResources(List<ObjectResources> objectResources, NormalizedResourceOffer availableResourcesOverall, - NormalizedResourceOffer totalResourcesOverall, String identifier) { - this.objectResources = objectResources; - this.availableResourcesOverall = availableResourcesOverall; - this.totalResourcesOverall = totalResourcesOverall; - this.identifier = identifier; - } - } - - /** - * class to keep track of resources on a rack or node. - */ - static class ObjectResources { - public final String id; - public NormalizedResourceOffer availableResources = new NormalizedResourceOffer(); - public NormalizedResourceOffer totalResources = new NormalizedResourceOffer(); - public double effectiveResources = 0.0; - - public ObjectResources(String id) { - this.id = id; - } - - public ObjectResources(ObjectResources other) { - this(other.id, other.availableResources, other.totalResources, other.effectiveResources); - } - - public ObjectResources(String id, NormalizedResourceOffer availableResources, NormalizedResourceOffer totalResources, - double effectiveResources) { - this.id = id; - this.availableResources = availableResources; - this.totalResources = totalResources; - this.effectiveResources = effectiveResources; - } - - @Override - public String toString() { - return this.id; - } - } - - /** * Nodes are sorted by two criteria. * * <p>1) the number executors of the topology that needs to be scheduled is already on the node in - * descending order. The reasoning to sort based on criterion 1 is so we schedule the rest of a - * topology on the same node as the existing executors of the topology. + * descending order. The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same node as the + * existing executors of the topology. * * <p>2) the subordinate/subservient resource availability percentage of a node in descending - * order We calculate the resource availability percentage by dividing the resource availability - * that have exhausted or little of one of the resources mentioned above will be ranked after - * on the node by the resource availability of the entire rack By doing this calculation, nodes - * nodes that have more balanced resource availability. So we will be less likely to pick a node - * that have a lot of one resource but a low amount of another. + * order We calculate the resource availability percentage by dividing the resource availability that have exhausted or little of one of + * the resources mentioned above will be ranked after on the node by the resource availability of the entire rack By doing this + * calculation, nodes nodes that have more balanced resource availability. So we will be less likely to pick a node that have a lot of + * one resource but a low amount of another. * * @param availNodes a list of all the nodes we want to sort - * @param rackId the rack id availNodes are a part of + * @param rackId the rack id availNodes are a part of * @return a sorted list of nodes. */ protected TreeSet<ObjectResources> sortNodes( - List<RAS_Node> availNodes, ExecutorDetails exec, TopologyDetails topologyDetails, String rackId) { + List<RAS_Node> availNodes, ExecutorDetails exec, TopologyDetails topologyDetails, String rackId) { AllResources allResources = new AllResources("RACK"); List<ObjectResources> nodes = allResources.objectResources; @@ -264,7 +189,7 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { for (ObjectResources rack : sortedRacks) { final String rackId = rack.id; TreeSet<ObjectResources> sortedNodes = sortNodes( - getAvailableNodesFromRack(rackId), exec, td, rackId); + getAvailableNodesFromRack(rackId), exec, td, rackId); totallySortedNodes.addAll(sortedNodes); } //Now do some post processing to add make some nodes preferred over others. @@ -304,14 +229,14 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { * Racks are sorted by two criteria. * * <p>1) the number executors of the topology that needs to be scheduled is already on the rack in descending order. - * The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same rack as the - * existing executors of the topology. + * The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same rack as the existing executors of the + * topology. * * <p>2) the subordinate/subservient resource availability percentage of a rack in descending order We calculate - * the resource availability percentage by dividing the resource availability on the rack by the resource - * availability of the entire cluster By doing this calculation, racks that have exhausted or little of one of - * the resources mentioned above will be ranked after racks that have more balanced resource availability. So we - * will be less likely to pick a rack that have a lot of one resource but a low amount of another. + * the resource availability percentage by dividing the resource availability on the rack by the resource availability of the entire + * cluster By doing this calculation, racks that have exhausted or little of one of the resources mentioned above will be ranked after + * racks that have more balanced resource availability. So we will be less likely to pick a rack that have a lot of one resource but a + * low amount of another. * * @return a sorted list of racks */ @@ -369,7 +294,6 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { }); } - /** * Get the rack on which a node is a part of. * @@ -437,7 +361,7 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { /** * Sort a component's neighbors by the number of connections it needs to make with this component. * - * @param thisComp the component that we need to sort its neighbors + * @param thisComp the component that we need to sort its neighbors * @param componentMap all the components to sort * @return a sorted set of components */ @@ -460,15 +384,14 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { } /** - * Order executors based on how many in and out connections it will potentially need to make, in descending order. - * First order components by the number of in and out connections it will have. Then iterate through the sorted list of components. - * For each component sort the neighbors of that component by how many connections it will have to make with that component. - * Add an executor from this component and then from each neighboring component in sorted order. - * Do this until there is nothing left to schedule. + * Order executors based on how many in and out connections it will potentially need to make, in descending order. First order + * components by the number of in and out connections it will have. Then iterate through the sorted list of components. For each + * component sort the neighbors of that component by how many connections it will have to make with that component. Add an executor from + * this component and then from each neighboring component in sorted order. Do this until there is nothing left to schedule. * - * @param td The topology the executors belong to - * @param unassignedExecutors a collection of unassigned executors that need to be unassigned. Should only try to - * assign executors from this list + * @param td The topology the executors belong to + * @param unassignedExecutors a collection of unassigned executors that need to be unassigned. Should only try to assign executors from + * this list * @return a list of executors in sorted order */ protected List<ExecutorDetails> orderExecutors( @@ -596,4 +519,76 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { } return ret; } + + /** + * interface for calculating the number of existing executors scheduled on a object (rack or node). + */ + protected interface ExistingScheduleFunc { + int getNumExistingSchedule(String objectId); + } + + /** + * a class to contain individual object resources as well as cumulative stats. + */ + static class AllResources { + List<ObjectResources> objectResources = new LinkedList<>(); + NormalizedResourceOffer availableResourcesOverall = new NormalizedResourceOffer(); + NormalizedResourceOffer totalResourcesOverall = new NormalizedResourceOffer(); + String identifier; + + public AllResources(String identifier) { + this.identifier = identifier; + } + + public AllResources(AllResources other) { + this(null, + new NormalizedResourceOffer(other.availableResourcesOverall), + new NormalizedResourceOffer(other.totalResourcesOverall), + other.identifier); + List<ObjectResources> objectResourcesList = new ArrayList<>(); + for (ObjectResources objectResource : other.objectResources) { + objectResourcesList.add(new ObjectResources(objectResource)); + } + this.objectResources = objectResourcesList; + } + + public AllResources(List<ObjectResources> objectResources, NormalizedResourceOffer availableResourcesOverall, + NormalizedResourceOffer totalResourcesOverall, String identifier) { + this.objectResources = objectResources; + this.availableResourcesOverall = availableResourcesOverall; + this.totalResourcesOverall = totalResourcesOverall; + this.identifier = identifier; + } + } + + /** + * class to keep track of resources on a rack or node. + */ + static class ObjectResources { + public final String id; + public NormalizedResourceOffer availableResources = new NormalizedResourceOffer(); + public NormalizedResourceOffer totalResources = new NormalizedResourceOffer(); + public double effectiveResources = 0.0; + + public ObjectResources(String id) { + this.id = id; + } + + public ObjectResources(ObjectResources other) { + this(other.id, other.availableResources, other.totalResources, other.effectiveResources); + } + + public ObjectResources(String id, NormalizedResourceOffer availableResources, NormalizedResourceOffer totalResources, + double effectiveResources) { + this.id = id; + this.availableResources = availableResources; + this.totalResources = totalResources; + this.effectiveResources = effectiveResources; + } + + @Override + public String toString() { + return this.id; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java index f790050..12f89ca 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.resource.strategies.scheduling; @@ -47,155 +41,201 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ConstraintSolverStrategy extends BaseResourceAwareStrategy { + //hard coded max number of states to search + public static final int MAX_STATE_SEARCH = 100_000; + public static final int DEFAULT_STATE_SEARCH = 10_000; private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class); + private Map<String, RAS_Node> nodes; + private Map<ExecutorDetails, String> execToComp; + private Map<String, Set<ExecutorDetails>> compToExecs; + private List<String> favoredNodes; + private List<String> unFavoredNodes; - protected static class SolverResult { - private final int statesSearched; - private final boolean success; - private final long timeTakenMillis; - private final int backtracked; + //constraints and spreads + private Map<String, Map<String, Integer>> constraintMatrix; + private HashSet<String> spreadComps = new HashSet<>(); - public SolverResult(SearcherState state, boolean success) { - this.statesSearched = state.getStatesSearched(); - this.success = success; - timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis; - backtracked = state.numBacktrack; + static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails topo, Set<String> comps) { + Map<String, Map<String, Integer>> matrix = new HashMap<>(); + for (String comp : comps) { + matrix.put(comp, new HashMap<>()); + for (String comp2 : comps) { + matrix.get(comp).put(comp2, 0); + } } - - public SchedulingResult asSchedulingResult() { - if (success) { - return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched - + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)"); + List<List<String>> constraints = (List<List<String>>) topo.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINTS); + if (constraints != null) { + for (List<String> constraintPair : constraints) { + String comp1 = constraintPair.get(0); + String comp2 = constraintPair.get(1); + if (!matrix.containsKey(comp1)) { + LOG.warn("Comp: {} declared in constraints is not valid!", comp1); + continue; + } + if (!matrix.containsKey(comp2)) { + LOG.warn("Comp: {} declared in constraints is not valid!", comp2); + continue; + } + matrix.get(comp1).put(comp2, 1); + matrix.get(comp2).put(comp1, 1); } - return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, - "Cannot find scheduling that satisfies all constraints (" + statesSearched - + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)"); } + return matrix; } - protected static class SearcherState { - // Metrics - // How many states searched so far. - private int statesSearched = 0; - // Number of times we had to backtrack. - private int numBacktrack = 0; - final long startTimeMillis; - private final long maxEndTimeMs; - - // Current state - // The current executor we are trying to schedule - private int execIndex = 0; - // A map of the worker to the components in the worker to be able to enforce constraints. - private final Map<WorkerSlot, Set<String>> workerCompAssignment; - private final boolean[] okToRemoveFromWorker; - // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints - private final Map<RAS_Node, Set<String>> nodeCompAssignment; - private final boolean[] okToRemoveFromNode; - - // Static State - // The list of all executors (preferably sorted to make assignments simpler). - private final List<ExecutorDetails> execs; - //The maximum number of state to search before stopping. - private final int maxStatesSearched; - //The topology we are scheduling - private final TopologyDetails td; + /** + * Determines if a scheduling is valid and all constraints are satisfied. + */ + @VisibleForTesting + public static boolean validateSolution(Cluster cluster, TopologyDetails td) { + return checkSpreadSchedulingValid(cluster, td) + && checkConstraintsSatisfied(cluster, td) + && checkResourcesCorrect(cluster, td); + } - private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment, - int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) { - assert !execs.isEmpty(); - assert execs != null; + /** + * Check if constraints are satisfied. + */ + private static boolean checkConstraintsSatisfied(Cluster cluster, TopologyDetails topo) { + LOG.info("Checking constraints..."); + Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot(); + Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent(); + //get topology constraints + Map<String, Map<String, Integer>> constraintMatrix = getConstraintMap(topo, new HashSet<>(topo.getExecutorToComponent().values())); - this.workerCompAssignment = workerCompAssignment; - this.nodeCompAssignment = nodeCompAssignment; - this.maxStatesSearched = maxStatesSearched; - this.execs = execs; - okToRemoveFromWorker = new boolean[execs.size()]; - okToRemoveFromNode = new boolean[execs.size()]; - this.td = td; - startTimeMillis = Time.currentTimeMillis(); - if (maxTimeMs <= 0) { - maxEndTimeMs = Long.MAX_VALUE; - } else { - maxEndTimeMs = startTimeMillis + maxTimeMs; + Map<WorkerSlot, Set<String>> workerCompMap = new HashMap<>(); + result.forEach((exec, worker) -> { + String comp = execToComp.get(exec); + workerCompMap.computeIfAbsent(worker, (k) -> new HashSet<>()).add(comp); + }); + for (Map.Entry<WorkerSlot, Set<String>> entry : workerCompMap.entrySet()) { + Set<String> comps = entry.getValue(); + for (String comp1 : comps) { + for (String comp2 : comps) { + if (!comp1.equals(comp2) && constraintMatrix.get(comp1).get(comp2) != 0) { + LOG.error("Incorrect Scheduling: worker exclusion for Component {} and {} not satisfied on WorkerSlot: {}", + comp1, comp2, entry.getKey()); + return false; + } + } } } + return true; + } - public void incStatesSearched() { - statesSearched++; - if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) { - LOG.debug("States Searched: {}", statesSearched); - LOG.debug("backtrack: {}", numBacktrack); + private static Map<WorkerSlot, RAS_Node> workerToNodes(Cluster cluster) { + Map<WorkerSlot, RAS_Node> workerToNodes = new HashMap<>(); + for (RAS_Node node : RAS_Nodes.getAllNodesFrom(cluster).values()) { + for (WorkerSlot s : node.getUsedSlots()) { + workerToNodes.put(s, node); } } + return workerToNodes; + } - public int getStatesSearched() { - return statesSearched; - } + private static boolean checkSpreadSchedulingValid(Cluster cluster, TopologyDetails topo) { + LOG.info("Checking for a valid scheduling..."); + Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot(); + Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent(); + Map<WorkerSlot, HashSet<ExecutorDetails>> workerExecMap = new HashMap<>(); + Map<WorkerSlot, HashSet<String>> workerCompMap = new HashMap<>(); + Map<RAS_Node, HashSet<String>> nodeCompMap = new HashMap<>(); + Map<WorkerSlot, RAS_Node> workerToNodes = workerToNodes(cluster); + boolean ret = true; - public boolean areSearchLimitsExceeded() { - return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs; - } + HashSet<String> spreadComps = getSpreadComps(topo); + for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) { + ExecutorDetails exec = entry.getKey(); + WorkerSlot worker = entry.getValue(); + RAS_Node node = workerToNodes.get(worker); - public SearcherState nextExecutor() { - execIndex++; - if (execIndex >= execs.size()) { - throw new IllegalStateException("Internal Error: exceeded the exec limit " + execIndex + " >= " + execs.size()); + if (workerExecMap.computeIfAbsent(worker, (k) -> new HashSet<>()).contains(exec)) { + LOG.error("Incorrect Scheduling: Found duplicate in scheduling"); + return false; } - return this; + workerExecMap.get(worker).add(exec); + String comp = execToComp.get(exec); + workerCompMap.computeIfAbsent(worker, (k) -> new HashSet<>()).add(comp); + if (spreadComps.contains(comp)) { + if (nodeCompMap.computeIfAbsent(node, (k) -> new HashSet<>()).contains(comp)) { + LOG.error("Incorrect Scheduling: Spread for Component: {} {} on node {} not satisfied {}", + comp, exec, node.getId(), nodeCompMap.get(node)); + ret = false; + } + } + nodeCompMap.computeIfAbsent(node, (k) -> new HashSet<>()).add(comp); } + return ret; + } - public boolean areAllExecsScheduled() { - return execIndex == execs.size() - 1; + /** + * Check if resource constraints satisfied. + */ + private static boolean checkResourcesCorrect(Cluster cluster, TopologyDetails topo) { + LOG.info("Checking Resources..."); + Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot(); + Map<RAS_Node, Collection<ExecutorDetails>> nodeToExecs = new HashMap<>(); + Map<ExecutorDetails, WorkerSlot> mergedExecToWorker = new HashMap<>(); + Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster); + //merge with existing assignments + if (cluster.getAssignmentById(topo.getId()) != null + && cluster.getAssignmentById(topo.getId()).getExecutorToSlot() != null) { + mergedExecToWorker.putAll(cluster.getAssignmentById(topo.getId()).getExecutorToSlot()); } + mergedExecToWorker.putAll(result); - public ExecutorDetails currentExec() { - return execs.get(execIndex); - } + for (Map.Entry<ExecutorDetails, WorkerSlot> entry : mergedExecToWorker.entrySet()) { + ExecutorDetails exec = entry.getKey(); + WorkerSlot worker = entry.getValue(); + RAS_Node node = nodes.get(worker.getNodeId()); - public void tryToSchedule(Map<ExecutorDetails, String> execToComp, RAS_Node node, WorkerSlot workerSlot) { - ExecutorDetails exec = currentExec(); - String comp = execToComp.get(exec); - LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot); - //It is possible that this component is already scheduled on this node or worker. If so when we backtrack we cannot remove it - okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp); - okToRemoveFromNode[execIndex] = nodeCompAssignment.computeIfAbsent(node, (k) -> new HashSet<>()).add(comp); - node.assignSingleExecutor(workerSlot, exec, td); + if (node.getAvailableMemoryResources() < 0.0 && node.getAvailableCpuResources() < 0.0) { + LOG.error("Incorrect Scheduling: found node with negative available resources"); + return false; + } + nodeToExecs.computeIfAbsent(node, (k) -> new HashSet<>()).add(exec); } - public void backtrack(Map<ExecutorDetails, String> execToComp, RAS_Node node, WorkerSlot workerSlot) { - execIndex--; - if (execIndex < 0) { - throw new IllegalStateException("Internal Error: exec index became negative"); + for (Map.Entry<RAS_Node, Collection<ExecutorDetails>> entry : nodeToExecs.entrySet()) { + RAS_Node node = entry.getKey(); + Collection<ExecutorDetails> execs = entry.getValue(); + double cpuUsed = 0.0; + double memoryUsed = 0.0; + for (ExecutorDetails exec : execs) { + cpuUsed += topo.getTotalCpuReqTask(exec); + memoryUsed += topo.getTotalMemReqTask(exec); } - numBacktrack++; - ExecutorDetails exec = currentExec(); - String comp = execToComp.get(exec); - LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot); - if (okToRemoveFromWorker[execIndex]) { - workerCompAssignment.get(workerSlot).remove(comp); - okToRemoveFromWorker[execIndex] = false; + if (node.getAvailableCpuResources() != (node.getTotalCpuResources() - cpuUsed)) { + LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of cpu. Expected: {}" + + " Actual: {} Executors scheduled on node: {}", + node.getId(), (node.getTotalCpuResources() - cpuUsed), node.getAvailableCpuResources(), execs); + return false; } - if (okToRemoveFromNode[execIndex]) { - nodeCompAssignment.get(node).remove(comp); - okToRemoveFromNode[execIndex] = false; + if (node.getAvailableMemoryResources() != (node.getTotalMemoryResources() - memoryUsed)) { + LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of memory. Expected: {}" + + " Actual: {} Executors scheduled on node: {}", + node.getId(), (node.getTotalMemoryResources() - memoryUsed), node.getAvailableMemoryResources(), execs); + return false; } - node.freeSingleExecutor(exec, td); } + return true; } - private Map<String, RAS_Node> nodes; - private Map<ExecutorDetails, String> execToComp; - private Map<String, Set<ExecutorDetails>> compToExecs; - private List<String> favoredNodes; - private List<String> unFavoredNodes; - - //constraints and spreads - private Map<String, Map<String, Integer>> constraintMatrix; - private HashSet<String> spreadComps = new HashSet<>(); - - //hard coded max number of states to search - public static final int MAX_STATE_SEARCH = 100_000; - public static final int DEFAULT_STATE_SEARCH = 10_000; + private static HashSet<String> getSpreadComps(TopologyDetails topo) { + HashSet<String> retSet = new HashSet<>(); + List<String> spread = (List<String>) topo.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS); + if (spread != null) { + Set<String> comps = topo.getComponents().keySet(); + for (String comp : spread) { + if (comps.contains(comp)) { + retSet.add(comp); + } else { + LOG.warn("Comp {} declared for spread not valid", comp); + } + } + } + return retSet; + } @Override public SchedulingResult schedule(Cluster cluster, TopologyDetails td) { @@ -206,7 +246,8 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy { Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>(); //set max number of states to search final int maxStateSearch = Math.min(MAX_STATE_SEARCH, - ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH), DEFAULT_STATE_SEARCH)); + ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH), + DEFAULT_STATE_SEARCH)); final long maxTimeMs = ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), -1).intValue() * 1000L; @@ -228,8 +269,8 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy { //get a sorted list of unassigned executors based on number of constraints Set<ExecutorDetails> unassignedExecutors = new HashSet<>(cluster.getUnassignedExecutors(td)); List<ExecutorDetails> sortedExecs = getSortedExecs(spreadComps, constraintMatrix, compToExecs).stream() - .filter(unassignedExecutors::contains) - .collect(Collectors.toList()); + .filter(unassignedExecutors::contains) + .collect(Collectors.toList()); //populate with existing assignments SchedulerAssignment existingAssignment = cluster.getAssignmentById(td.getId()); @@ -258,18 +299,18 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy { int numExecs = compToExecs.get(comp).size(); if (numExecs > nodes.size()) { LOG.error("Unsatisfiable constraint: Component: {} marked as spread has {} executors which is larger " - + "than number of nodes: {}", comp, numExecs, nodes.size()); + + "than number of nodes: {}", comp, numExecs, nodes.size()); return false; } } if (execToComp.size() >= MAX_STATE_SEARCH) { LOG.error("Number of executors is greater than the maximum number of states allowed to be searched. " - + "# of executors: {} Max states to search: {}", execToComp.size(), MAX_STATE_SEARCH); + + "# of executors: {} Max states to search: {}", execToComp.size(), MAX_STATE_SEARCH); return false; } return true; } - + @Override protected TreeSet<ObjectResources> sortObjectResources( final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails, @@ -289,233 +330,67 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy { ExecutorDetails exec = state.currentExec(); List<ObjectResources> sortedNodes = sortAllNodes(state.td, exec, favoredNodes, unFavoredNodes); - for (ObjectResources nodeResources: sortedNodes) { + for (ObjectResources nodeResources : sortedNodes) { RAS_Node node = nodes.get(nodeResources.id); for (WorkerSlot workerSlot : node.getSlotsAvailbleTo(state.td)) { if (isExecAssignmentToWorkerValid(workerSlot, state)) { - state.tryToSchedule(execToComp, node, workerSlot); - - if (state.areAllExecsScheduled()) { - //Everything is scheduled correctly, so no need to search any more. - return new SolverResult(state, true); - } - - SolverResult results = backtrackSearch(state.nextExecutor()); - if (results.success) { - //We found a good result we are done. - return results; - } - - if (state.areSearchLimitsExceeded()) { - //No need to search more it is not going to help. - return new SolverResult(state, false); - } - - //backtracking (If we ever get here there really isn't a lot of hope that we will find a scheduling) - state.backtrack(execToComp, node, workerSlot); - } - } - } - //Tried all of the slots and none of them worked. - return new SolverResult(state, false); - } - - /** - * Check if any constraints are violated if exec is scheduled on worker. - * @return true if scheduling exec on worker does not violate any constraints, returns false if it does - */ - public boolean isExecAssignmentToWorkerValid(WorkerSlot worker, SearcherState state) { - final ExecutorDetails exec = state.currentExec(); - //check resources - RAS_Node node = nodes.get(worker.getNodeId()); - if (!node.wouldFit(worker, exec, state.td)) { - LOG.trace("{} would not fit in resources available on {}", exec, worker); - return false; - } - - //check if exec can be on worker based on user defined component exclusions - String execComp = execToComp.get(exec); - Set<String> components = state.workerCompAssignment.get(worker); - if (components != null) { - Map<String, Integer> subMatrix = constraintMatrix.get(execComp); - for (String comp : components) { - if (subMatrix.get(comp) != 0) { - LOG.trace("{} found {} constraint violation {} on {}", exec, execComp, comp, worker); - return false; - } - } - } - - //check if exec satisfy spread - if (spreadComps.contains(execComp)) { - if (state.nodeCompAssignment.computeIfAbsent(node, (k) -> new HashSet<>()).contains(execComp)) { - LOG.trace("{} Found spread violation {} on node {}", exec, execComp, node.getId()); - return false; - } - } - return true; - } - - static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails topo, Set<String> comps) { - Map<String, Map<String, Integer>> matrix = new HashMap<>(); - for (String comp : comps) { - matrix.put(comp, new HashMap<>()); - for (String comp2 : comps) { - matrix.get(comp).put(comp2, 0); - } - } - List<List<String>> constraints = (List<List<String>>) topo.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINTS); - if (constraints != null) { - for (List<String> constraintPair : constraints) { - String comp1 = constraintPair.get(0); - String comp2 = constraintPair.get(1); - if (!matrix.containsKey(comp1)) { - LOG.warn("Comp: {} declared in constraints is not valid!", comp1); - continue; - } - if (!matrix.containsKey(comp2)) { - LOG.warn("Comp: {} declared in constraints is not valid!", comp2); - continue; - } - matrix.get(comp1).put(comp2, 1); - matrix.get(comp2).put(comp1, 1); - } - } - return matrix; - } - - /** - * Determines if a scheduling is valid and all constraints are satisfied. - */ - @VisibleForTesting - public static boolean validateSolution(Cluster cluster, TopologyDetails td) { - return checkSpreadSchedulingValid(cluster, td) - && checkConstraintsSatisfied(cluster, td) - && checkResourcesCorrect(cluster, td); - } - - /** - * Check if constraints are satisfied. - */ - private static boolean checkConstraintsSatisfied(Cluster cluster, TopologyDetails topo) { - LOG.info("Checking constraints..."); - Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot(); - Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent(); - //get topology constraints - Map<String, Map<String, Integer>> constraintMatrix = getConstraintMap(topo, new HashSet<>(topo.getExecutorToComponent().values())); - - Map<WorkerSlot, Set<String>> workerCompMap = new HashMap<>(); - result.forEach((exec, worker) -> { - String comp = execToComp.get(exec); - workerCompMap.computeIfAbsent(worker, (k) -> new HashSet<>()).add(comp); - }); - for (Map.Entry<WorkerSlot, Set<String>> entry : workerCompMap.entrySet()) { - Set<String> comps = entry.getValue(); - for (String comp1 : comps) { - for (String comp2: comps) { - if (!comp1.equals(comp2) && constraintMatrix.get(comp1).get(comp2) != 0) { - LOG.error("Incorrect Scheduling: worker exclusion for Component {} and {} not satisfied on WorkerSlot: {}", - comp1, comp2, entry.getKey()); - return false; - } - } - } - } - return true; - } + state.tryToSchedule(execToComp, node, workerSlot); - private static Map<WorkerSlot, RAS_Node> workerToNodes(Cluster cluster) { - Map<WorkerSlot, RAS_Node> workerToNodes = new HashMap<>(); - for (RAS_Node node: RAS_Nodes.getAllNodesFrom(cluster).values()) { - for (WorkerSlot s : node.getUsedSlots()) { - workerToNodes.put(s, node); - } - } - return workerToNodes; - } + if (state.areAllExecsScheduled()) { + //Everything is scheduled correctly, so no need to search any more. + return new SolverResult(state, true); + } - private static boolean checkSpreadSchedulingValid(Cluster cluster, TopologyDetails topo) { - LOG.info("Checking for a valid scheduling..."); - Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot(); - Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent(); - Map<WorkerSlot, HashSet<ExecutorDetails>> workerExecMap = new HashMap<>(); - Map<WorkerSlot, HashSet<String>> workerCompMap = new HashMap<>(); - Map<RAS_Node, HashSet<String>> nodeCompMap = new HashMap<>(); - Map<WorkerSlot, RAS_Node> workerToNodes = workerToNodes(cluster); - boolean ret = true; + SolverResult results = backtrackSearch(state.nextExecutor()); + if (results.success) { + //We found a good result we are done. + return results; + } - HashSet<String> spreadComps = getSpreadComps(topo); - for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) { - ExecutorDetails exec = entry.getKey(); - WorkerSlot worker = entry.getValue(); - RAS_Node node = workerToNodes.get(worker); + if (state.areSearchLimitsExceeded()) { + //No need to search more it is not going to help. + return new SolverResult(state, false); + } - if (workerExecMap.computeIfAbsent(worker, (k) -> new HashSet<>()).contains(exec)) { - LOG.error("Incorrect Scheduling: Found duplicate in scheduling"); - return false; - } - workerExecMap.get(worker).add(exec); - String comp = execToComp.get(exec); - workerCompMap.computeIfAbsent(worker, (k) -> new HashSet<>()).add(comp); - if (spreadComps.contains(comp)) { - if (nodeCompMap.computeIfAbsent(node, (k) -> new HashSet<>()).contains(comp)) { - LOG.error("Incorrect Scheduling: Spread for Component: {} {} on node {} not satisfied {}", - comp, exec, node.getId(), nodeCompMap.get(node)); - ret = false; + //backtracking (If we ever get here there really isn't a lot of hope that we will find a scheduling) + state.backtrack(execToComp, node, workerSlot); } } - nodeCompMap.computeIfAbsent(node, (k) -> new HashSet<>()).add(comp); } - return ret; + //Tried all of the slots and none of them worked. + return new SolverResult(state, false); } /** - * Check if resource constraints satisfied. + * Check if any constraints are violated if exec is scheduled on worker. + * @return true if scheduling exec on worker does not violate any constraints, returns false if it does */ - private static boolean checkResourcesCorrect(Cluster cluster, TopologyDetails topo) { - LOG.info("Checking Resources..."); - Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot(); - Map<RAS_Node, Collection<ExecutorDetails>> nodeToExecs = new HashMap<>(); - Map<ExecutorDetails, WorkerSlot> mergedExecToWorker = new HashMap<>(); - Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster); - //merge with existing assignments - if (cluster.getAssignmentById(topo.getId()) != null - && cluster.getAssignmentById(topo.getId()).getExecutorToSlot() != null) { - mergedExecToWorker.putAll(cluster.getAssignmentById(topo.getId()).getExecutorToSlot()); + public boolean isExecAssignmentToWorkerValid(WorkerSlot worker, SearcherState state) { + final ExecutorDetails exec = state.currentExec(); + //check resources + RAS_Node node = nodes.get(worker.getNodeId()); + if (!node.wouldFit(worker, exec, state.td)) { + LOG.trace("{} would not fit in resources available on {}", exec, worker); + return false; } - mergedExecToWorker.putAll(result); - - for (Map.Entry<ExecutorDetails, WorkerSlot> entry : mergedExecToWorker.entrySet()) { - ExecutorDetails exec = entry.getKey(); - WorkerSlot worker = entry.getValue(); - RAS_Node node = nodes.get(worker.getNodeId()); - if (node.getAvailableMemoryResources() < 0.0 && node.getAvailableCpuResources() < 0.0) { - LOG.error("Incorrect Scheduling: found node with negative available resources"); - return false; + //check if exec can be on worker based on user defined component exclusions + String execComp = execToComp.get(exec); + Set<String> components = state.workerCompAssignment.get(worker); + if (components != null) { + Map<String, Integer> subMatrix = constraintMatrix.get(execComp); + for (String comp : components) { + if (subMatrix.get(comp) != 0) { + LOG.trace("{} found {} constraint violation {} on {}", exec, execComp, comp, worker); + return false; + } } - nodeToExecs.computeIfAbsent(node, (k) -> new HashSet<>()).add(exec); } - for (Map.Entry<RAS_Node, Collection<ExecutorDetails>> entry : nodeToExecs.entrySet()) { - RAS_Node node = entry.getKey(); - Collection<ExecutorDetails> execs = entry.getValue(); - double cpuUsed = 0.0; - double memoryUsed = 0.0; - for (ExecutorDetails exec : execs) { - cpuUsed += topo.getTotalCpuReqTask(exec); - memoryUsed += topo.getTotalMemReqTask(exec); - } - if (node.getAvailableCpuResources() != (node.getTotalCpuResources() - cpuUsed)) { - LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of cpu. Expected: {}" - + " Actual: {} Executors scheduled on node: {}", - node.getId(), (node.getTotalCpuResources() - cpuUsed), node.getAvailableCpuResources(), execs); - return false; - } - if (node.getAvailableMemoryResources() != (node.getTotalMemoryResources() - memoryUsed)) { - LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of memory. Expected: {}" - + " Actual: {} Executors scheduled on node: {}", - node.getId(), (node.getTotalMemoryResources() - memoryUsed), node.getAvailableMemoryResources(), execs); + //check if exec satisfy spread + if (spreadComps.contains(execComp)) { + if (state.nodeCompAssignment.computeIfAbsent(node, (k) -> new HashSet<>()).contains(execComp)) { + LOG.trace("{} Found spread violation {} on node {}", exec, execComp, node.getId()); return false; } } @@ -551,22 +426,6 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy { return retList; } - private static HashSet<String> getSpreadComps(TopologyDetails topo) { - HashSet<String> retSet = new HashSet<>(); - List<String> spread = (List<String>) topo.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS); - if (spread != null) { - Set<String> comps = topo.getComponents().keySet(); - for (String comp : spread) { - if (comps.contains(comp)) { - retSet.add(comp); - } else { - LOG.warn("Comp {} declared for spread not valid", comp); - } - } - } - return retSet; - } - /** * Used to sort a Map by the values. */ @@ -584,4 +443,136 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy { sortedByValues.putAll(map); return sortedByValues; } + + protected static class SolverResult { + private final int statesSearched; + private final boolean success; + private final long timeTakenMillis; + private final int backtracked; + + public SolverResult(SearcherState state, boolean success) { + this.statesSearched = state.getStatesSearched(); + this.success = success; + timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis; + backtracked = state.numBacktrack; + } + + public SchedulingResult asSchedulingResult() { + if (success) { + return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched + + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)"); + } + return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, + "Cannot find scheduling that satisfies all constraints (" + statesSearched + + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)"); + } + } + + protected static class SearcherState { + final long startTimeMillis; + private final long maxEndTimeMs; + // A map of the worker to the components in the worker to be able to enforce constraints. + private final Map<WorkerSlot, Set<String>> workerCompAssignment; + private final boolean[] okToRemoveFromWorker; + // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints + private final Map<RAS_Node, Set<String>> nodeCompAssignment; + private final boolean[] okToRemoveFromNode; + // Static State + // The list of all executors (preferably sorted to make assignments simpler). + private final List<ExecutorDetails> execs; + //The maximum number of state to search before stopping. + private final int maxStatesSearched; + //The topology we are scheduling + private final TopologyDetails td; + // Metrics + // How many states searched so far. + private int statesSearched = 0; + // Number of times we had to backtrack. + private int numBacktrack = 0; + // Current state + // The current executor we are trying to schedule + private int execIndex = 0; + + private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment, + int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) { + assert !execs.isEmpty(); + assert execs != null; + + this.workerCompAssignment = workerCompAssignment; + this.nodeCompAssignment = nodeCompAssignment; + this.maxStatesSearched = maxStatesSearched; + this.execs = execs; + okToRemoveFromWorker = new boolean[execs.size()]; + okToRemoveFromNode = new boolean[execs.size()]; + this.td = td; + startTimeMillis = Time.currentTimeMillis(); + if (maxTimeMs <= 0) { + maxEndTimeMs = Long.MAX_VALUE; + } else { + maxEndTimeMs = startTimeMillis + maxTimeMs; + } + } + + public void incStatesSearched() { + statesSearched++; + if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) { + LOG.debug("States Searched: {}", statesSearched); + LOG.debug("backtrack: {}", numBacktrack); + } + } + + public int getStatesSearched() { + return statesSearched; + } + + public boolean areSearchLimitsExceeded() { + return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs; + } + + public SearcherState nextExecutor() { + execIndex++; + if (execIndex >= execs.size()) { + throw new IllegalStateException("Internal Error: exceeded the exec limit " + execIndex + " >= " + execs.size()); + } + return this; + } + + public boolean areAllExecsScheduled() { + return execIndex == execs.size() - 1; + } + + public ExecutorDetails currentExec() { + return execs.get(execIndex); + } + + public void tryToSchedule(Map<ExecutorDetails, String> execToComp, RAS_Node node, WorkerSlot workerSlot) { + ExecutorDetails exec = currentExec(); + String comp = execToComp.get(exec); + LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot); + //It is possible that this component is already scheduled on this node or worker. If so when we backtrack we cannot remove it + okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp); + okToRemoveFromNode[execIndex] = nodeCompAssignment.computeIfAbsent(node, (k) -> new HashSet<>()).add(comp); + node.assignSingleExecutor(workerSlot, exec, td); + } + + public void backtrack(Map<ExecutorDetails, String> execToComp, RAS_Node node, WorkerSlot workerSlot) { + execIndex--; + if (execIndex < 0) { + throw new IllegalStateException("Internal Error: exec index became negative"); + } + numBacktrack++; + ExecutorDetails exec = currentExec(); + String comp = execToComp.get(exec); + LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot); + if (okToRemoveFromWorker[execIndex]) { + workerCompAssignment.get(workerSlot).remove(comp); + okToRemoveFromWorker[execIndex] = false; + } + if (okToRemoveFromNode[execIndex]) { + nodeCompAssignment.get(node).remove(comp); + okToRemoveFromNode[execIndex] = false; + } + node.freeSingleExecutor(exec, td); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java index efa3ecf..826c24d 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java @@ -43,10 +43,10 @@ public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy impl if (nodes.getNodes().size() <= 0) { LOG.warn("No available nodes to schedule tasks on!"); return SchedulingResult.failure( - SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!"); + SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!"); } Collection<ExecutorDetails> unassignedExecutors = - new HashSet<>(this.cluster.getUnassignedExecutors(td)); + new HashSet<>(this.cluster.getUnassignedExecutors(td)); LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors); Collection<ExecutorDetails> scheduledTasks = new ArrayList<>(); List<Component> spouts = this.getSpouts(td); @@ -54,7 +54,7 @@ public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy impl if (spouts.size() == 0) { LOG.error("Cannot find a Spout!"); return SchedulingResult.failure( - SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!"); + SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!"); } //order executors to be scheduled @@ -66,10 +66,10 @@ public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy impl for (ExecutorDetails exec : orderedExecutors) { LOG.debug( - "Attempting to schedule: {} of component {}[ REQ {} ]", - exec, - td.getExecutorToComponent().get(exec), - td.getTaskResourceReqList(exec)); + "Attempting to schedule: {} of component {}[ REQ {} ]", + exec, + td.getExecutorToComponent().get(exec), + td.getTaskResourceReqList(exec)); scheduleExecutor(exec, td, scheduledTasks, sortedNodes); } @@ -85,12 +85,12 @@ public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy impl if (executorsNotScheduled.size() > 0) { LOG.error("Not all executors successfully scheduled: {}", executorsNotScheduled); result = - SchedulingResult.failure( - SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, - (td.getExecutors().size() - unassignedExecutors.size()) - + "/" - + td.getExecutors().size() - + " executors scheduled"); + SchedulingResult.failure( + SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, + (td.getExecutors().size() - unassignedExecutors.size()) + + "/" + + td.getExecutors().size() + + " executors scheduled"); } else { LOG.debug("All resources successfully scheduled!"); result = SchedulingResult.success("Fully Scheduled by " + this.getClass().getSimpleName()); @@ -99,64 +99,61 @@ public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy impl } /** - * Sort objects by the following two criteria. 1) the number executors of the topology that needs - * to be scheduled is already on the object (node or rack) in descending order. The reasoning to - * sort based on criterion 1 is so we schedule the rest of a topology on the same object (node or - * rack) as the existing executors of the topology. 2) the subordinate/subservient resource - * availability percentage of a rack in descending order We calculate the resource availability - * percentage by dividing the resource availability of the object (node or rack) by the resource - * availability of the entire rack or cluster depending on if object references a node or a rack. - * By doing this calculation, objects (node or rack) that have exhausted or little of one of the - * resources mentioned above will be ranked after racks that have more balanced resource - * availability. So we will be less likely to pick a rack that have a lot of one resource but a - * low amount of another. + * Sort objects by the following two criteria. 1) the number executors of the topology that needs to be scheduled is already on the + * object (node or rack) in descending order. The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the + * same object (node or rack) as the existing executors of the topology. 2) the subordinate/subservient resource availability percentage + * of a rack in descending order We calculate the resource availability percentage by dividing the resource availability of the object + * (node or rack) by the resource availability of the entire rack or cluster depending on if object references a node or a rack. By + * doing this calculation, objects (node or rack) that have exhausted or little of one of the resources mentioned above will be ranked + * after racks that have more balanced resource availability. So we will be less likely to pick a rack that have a lot of one resource + * but a low amount of another. * - * @param allResources contains all individual ObjectResources as well as cumulative stats + * @param allResources contains all individual ObjectResources as well as cumulative stats * @param existingScheduleFunc a function to get existing executors already scheduled on this object * @return a sorted list of ObjectResources */ @Override protected TreeSet<ObjectResources> sortObjectResources( - final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails, - final ExistingScheduleFunc existingScheduleFunc) { - + final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails, + final ExistingScheduleFunc existingScheduleFunc) { + for (ObjectResources objectResources : allResources.objectResources) { objectResources.effectiveResources = allResources.availableResourcesOverall.calculateMinPercentageUsedBy(objectResources.availableResources); if (LOG.isTraceEnabled()) { LOG.trace("Effective resources for {} is {}, and numExistingSchedule is {}", - objectResources.id, objectResources.effectiveResources, - existingScheduleFunc.getNumExistingSchedule(objectResources.id)); + objectResources.id, objectResources.effectiveResources, + existingScheduleFunc.getNumExistingSchedule(objectResources.id)); } } TreeSet<ObjectResources> sortedObjectResources = - new TreeSet<>((o1, o2) -> { - int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id); - int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id); - if (execsScheduled1 > execsScheduled2) { + new TreeSet<>((o1, o2) -> { + int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id); + int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id); + if (execsScheduled1 > execsScheduled2) { + return -1; + } else if (execsScheduled1 < execsScheduled2) { + return 1; + } else { + if (o1.effectiveResources > o2.effectiveResources) { return -1; - } else if (execsScheduled1 < execsScheduled2) { + } else if (o1.effectiveResources < o2.effectiveResources) { return 1; } else { - if (o1.effectiveResources > o2.effectiveResources) { + double o1Avg = allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources); + double o2Avg = allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources); + + if (o1Avg > o2Avg) { return -1; - } else if (o1.effectiveResources < o2.effectiveResources) { + } else if (o1Avg < o2Avg) { return 1; } else { - double o1Avg = allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources); - double o2Avg = allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources); - - if (o1Avg > o2Avg) { - return -1; - } else if (o1Avg < o2Avg) { - return 1; - } else { - return o1.id.compareTo(o2.id); - } + return o1.id.compareTo(o2.id); } } - }); + } + }); sortedObjectResources.addAll(allResources.objectResources); LOG.debug("Sorted Object Resources: {}", sortedObjectResources); return sortedObjectResources;
