Repository: storm Updated Branches: refs/heads/master 3cf71d2c6 -> becab7cc2
http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java new file mode 100644 index 0000000..926184c --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.resource; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.generated.ComponentCommon; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ObjectReader; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A request that has been normalized. + */ +public class NormalizedResourceRequest extends NormalizedResources { + private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class); + + private static void putIfMissing(Map<String, Double> dest, String destKey, Map<String, Object> src, String srcKey) { + if (!dest.containsKey(destKey)) { + Number value = (Number)src.get(srcKey); + if (value != null) { + dest.put(destKey, value.doubleValue()); + } + } + } + + private static Map<String, Double> getDefaultResources(Map<String, Object> topoConf) { + Map<String, Double> ret = normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault( + Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>())); + putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); + putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); + putIfMissing(ret, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); + return ret; + } + + private static Map<String, Double> parseResources(String input) { + Map<String, Double> topologyResources = new HashMap<>(); + JSONParser parser = new JSONParser(); + LOG.debug("Input to parseResources {}", input); + try { + if (input != null) { + Object obj = parser.parse(input); + JSONObject jsonObject = (JSONObject) obj; + + // Legacy resource parsing + if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) { + Double topoMemOnHeap = ObjectReader + .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null); + topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap); + } + if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) { + Double topoMemOffHeap = ObjectReader + .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null); + topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap); + } + if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) { + Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), + null); + topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu); + } + + // If resource is also present in resources map will overwrite the above + if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) { + Map<String, Number> rawResourcesMap = + (Map<String, Number>) jsonObject.computeIfAbsent( + Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>()); + + for (Map.Entry<String, Number> stringNumberEntry : rawResourcesMap.entrySet()) { + topologyResources.put( + stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue()); + } + + + } + } + } catch (ParseException e) { + LOG.error("Failed to parse component resources is:" + e.toString(), e); + return null; + } + return topologyResources; + } + + private double onHeap; + private double offHeap; + + /** + * Create a new normalized set of resources. Note that memory is not covered here becasue it is not consistent in requests vs offers + * because of how on heap vs off heap is used. + * + * @param resources the resources to be normalized. + * @param topologyConf the config for the topology + */ + private NormalizedResourceRequest(Map<String, ? extends Number> resources, + Map<String, Object> topologyConf) { + super(resources, getDefaultResources(topologyConf)); + } + + public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) { + this(parseResources(component.get_json_conf()), topoConf); + } + + public NormalizedResourceRequest(Map<String, Object> topoConf) { + this((Map<String, ? extends Number>) null, topoConf); + } + + public NormalizedResourceRequest() { + super(null, null); + } + + @Override + public Map<String,Double> toNormalizedMap() { + Map<String, Double> ret = super.toNormalizedMap(); + ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap); + ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap); + return ret; + } + + @Override + protected void initializeMemory(Map<String, Double> normalizedResources) { + onHeap = normalizedResources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0); + offHeap = normalizedResources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0); + } + + public double getOnHeapMemoryMb() { + return onHeap; + } + + public void addOnHeap(final double onHeap) { + this.onHeap += onHeap; + } + + public double getOffHeapMemoryMb() { + return offHeap; + } + + public void addOffHeap(final double offHeap) { + this.offHeap += offHeap; + } + + /** + * Add the resources in other to this. + * @param other the other Request to add to this. + */ + public void add(NormalizedResourceRequest other) { + super.add(other); + onHeap += other.onHeap; + offHeap += other.offHeap; + } + + @Override + public void add(WorkerResources value) { + super.add(value); + //The resources are already normalized + Map<String, Double> resources = value.get_resources(); + onHeap += resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0); + offHeap += resources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0); + } + + @Override + public double getTotalMemoryMb() { + return getOnHeapMemoryMb() + getOffHeapMemoryMb(); + } + + @Override + public String toString() { + return super.toString() + " onHeap: " + onHeap + " offHeap: " + offHeap; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java new file mode 100644 index 0000000..8ed1a57 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.resource; + +import static org.apache.storm.Constants.*; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.generated.WorkerResources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Resources that have been normalized. + */ +public abstract class NormalizedResources { + private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class); + public static final Map<String, String> RESOURCE_NAME_MAPPING; + + static { + Map<String, String> tmp = new HashMap<>(); + tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, COMMON_CPU_RESOURCE_NAME); + tmp.put(Config.SUPERVISOR_CPU_CAPACITY, COMMON_CPU_RESOURCE_NAME); + tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, COMMON_ONHEAP_MEMORY_RESOURCE_NAME); + tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, COMMON_OFFHEAP_MEMORY_RESOURCE_NAME); + tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, COMMON_TOTAL_MEMORY_RESOURCE_NAME); + RESOURCE_NAME_MAPPING = Collections.unmodifiableMap(tmp); + } + + private static double[] makeArray(Map<String, Double> normalizedResources) { + //To avoid locking we will go through the map twice. It should be small so it is probably not a big deal + for (String key : normalizedResources.keySet()) { + //We are going to skip over CPU and Memory, because they are captured elsewhere + if (!COMMON_CPU_RESOURCE_NAME.equals(key) + && !COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key) + && !COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key) + && !COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) { + resourceNames.computeIfAbsent(key, (k) -> counter.getAndIncrement()); + } + } + //By default all of the values are 0 + double [] ret = new double[counter.get()]; + for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) { + Integer index = resourceNames.get(entry.getKey()); + if (index != null) { + //index == null if it is memory or CPU + ret[index] = entry.getValue(); + } + } + return ret; + } + + private static final ConcurrentMap<String, Integer> resourceNames = new ConcurrentHashMap<>(); + private static final AtomicInteger counter = new AtomicInteger(0); + private double cpu; + private double[] otherResources; + + public NormalizedResources(NormalizedResources other) { + cpu = other.cpu; + otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length); + } + + /** + * Create a new normalized set of resources. Note that memory is not + * covered here because it is not consistent in requests vs offers because + * of how on heap vs off heap is used. + * @param resources the resources to be normalized. + * @param defaults the default resources that will also be normalized and combined with the real resources. + */ + public NormalizedResources(Map<String, ? extends Number> resources, Map<String, ? extends Number> defaults) { + Map<String, Double> normalizedResources = normalizedResourceMap(defaults); + normalizedResources.putAll(normalizedResourceMap(resources)); + cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0); + otherResources = makeArray(normalizedResources); + initializeMemory(normalizedResources); + } + + /** + * Initialize any memory usage from the normalized map. + * @param normalizedResources the normalized resource map. + */ + protected abstract void initializeMemory(Map<String, Double> normalizedResources); + + /** + * Normalizes a supervisor resource map or topology details map's keys to universal resource names. + * @param resourceMap resource map of either Supervisor or Topology + * @return the resource map with common resource names + */ + public static Map<String, Double> normalizedResourceMap(Map<String, ? extends Number> resourceMap) { + if (resourceMap == null) { + return new HashMap<>(); + } + return new HashMap<>(resourceMap.entrySet().stream() + .collect(Collectors.toMap( + //Map the key if needed + (e) -> RESOURCE_NAME_MAPPING.getOrDefault(e.getKey(), e.getKey()), + //Map the value + (e) -> e.getValue().doubleValue()))); + } + + /** + * Get the total amount of memory. + * @return the total amount of memory requested or provided. + */ + public abstract double getTotalMemoryMb(); + + /** + * Get the total amount of cpu. + * @return the amount of cpu. + */ + public double getTotalCpu() { + return cpu; + } + + private void add(double[] resourceArray) { + int otherLength = resourceArray.length; + int length = otherResources.length; + if (otherLength > length) { + double [] newResources = new double[otherLength]; + System.arraycopy(newResources, 0, otherResources, 0, length); + otherResources = newResources; + } + for (int i = 0; i < otherLength; i++) { + otherResources[i] += resourceArray[i]; + } + } + + public void add(NormalizedResources other) { + this.cpu += other.cpu; + add(other.otherResources); + } + + /** + * Add the resources from a worker to this. + * @param value the worker resources that should be added to this. + */ + public void add(WorkerResources value) { + Map<String, Double> normalizedResources = value.get_resources(); + cpu += normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0); + add(makeArray(normalizedResources)); + } + + /** + * Remove the resources from other. This is the same as subtracting the resources in other from this. + * @param other the resources we want removed. + */ + public void remove(NormalizedResources other) { + this.cpu -= other.cpu; + assert cpu >= 0.0; + int otherLength = other.otherResources.length; + int length = otherResources.length; + if (otherLength > length) { + double [] newResources = new double[otherLength]; + System.arraycopy(newResources, 0, otherResources, 0, length); + otherResources = newResources; + } + for (int i = 0; i < Math.min(length, otherLength); i++) { + otherResources[i] -= other.otherResources[i]; + assert otherResources[i] >= 0.0; + } + } + + @Override + public String toString() { + return "CPU: " + cpu; + } + + /** + * Return a Map of the normalized resource name to a double. This should only + * be used when returning thrift resource requests to the end user. + */ + public Map<String,Double> toNormalizedMap() { + HashMap<String, Double> ret = new HashMap<>(); + ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu); + int length = otherResources.length; + for (Map.Entry<String, Integer> entry: resourceNames.entrySet()) { + int index = entry.getValue(); + if (index < length) { + ret.put(entry.getKey(), otherResources[index]); + } + } + return ret; + } + + private double getResourceAt(int index) { + if (index >= otherResources.length) { + return 0.0; + } + return otherResources[index]; + } + + /** + * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory. + * It does not check memory because with shared memory it is beyond the scope of this. + * @param other the resources that we want to check if they would fit in this. + * @return true if it might fit, else false if it could not possibly fit. + */ + public boolean couldHoldIgnoringMemory(NormalizedResources other) { + if (this.cpu < other.getTotalCpu()) { + return false; + } + int length = Math.max(this.otherResources.length, other.otherResources.length); + for (int i = 0; i < length; i++) { + if (getResourceAt(i) < other.getResourceAt(i)) { + return false; + } + } + return true; + } + + /** + * Calculate the average resource usage percentage with this being the total resources and + * used being the amounts used. + * @param used the amount of resources used. + * @return the average percentage used 0.0 to 100.0. + */ + public double calculateAveragePercentageUsedBy(NormalizedResources used) { + double total = 0.0; + double totalMemory = getTotalMemoryMb(); + if (totalMemory != 0.0) { + total += used.getTotalMemoryMb() / totalMemory; + } + double totalCpu = getTotalCpu(); + if (totalCpu != 0.0) { + total += used.getTotalCpu() / getTotalCpu(); + } + //If total is 0 we add in a 0% used, so we can just skip over anything that is not in both. + int length = Math.min(used.otherResources.length, otherResources.length); + for (int i = 0; i < length; i++) { + if (otherResources[i] != 0.0) { + total += used.otherResources[i] / otherResources[i]; + } + } + //To get the count we divide by we need to take the maximum length because we are doing an average. + return (total * 100.0) / (2 + Math.max(otherResources.length, used.otherResources.length)); + } + + /** + * Calculate the minimum resource usage percentage with this being the total resources and + * used being the amounts used. + * @param used the amount of resources used. + * @return the minimum percentage used 0.0 to 100.0. + */ + public double calculateMinPercentageUsedBy(NormalizedResources used) { + double totalMemory = getTotalMemoryMb(); + double totalCpu = getTotalCpu(); + if (used.otherResources.length != otherResources.length + || totalMemory == 0.0 + || totalCpu == 0.0) { + //If the lengths don't match one of the resources will be 0, which means we would calculate the percentage to be 0.0 + // and so the min would be 0.0 (assuming that we can never go negative on a resource being used. + return 0.0; + } + double min = used.getTotalMemoryMb() / totalMemory; + min = Math.min(min, used.getTotalCpu() / getTotalCpu()); + + for (int i = 0; i < otherResources.length; i++) { + if (otherResources[i] != 0.0) { + min = Math.min(min, used.otherResources[i] / otherResources[i]); + } else { + return 0.0; //0 will be the minimum, because we count values not in here as 0 + } + } + return min * 100.0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java index 633fb5c..5350005 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import org.apache.storm.Config; import org.apache.storm.Constants; import org.apache.storm.scheduler.Cluster; import org.apache.storm.scheduler.ExecutorDetails; @@ -55,7 +54,6 @@ public class RAS_Node { private SupervisorDetails sup; private final Cluster cluster; private final Set<WorkerSlot> originallyFreeSlots; - private final Map<String, Double> totalResources; public RAS_Node( String nodeId, @@ -89,12 +87,6 @@ public class RAS_Node { this.sup = sup; } - if (isAlive) { - totalResources = getTotalResources(); - } else { - totalResources = new HashMap<>(); - } - HashSet<String> freeById = new HashSet<>(slots.keySet()); if (assignmentMap != null) { for (Map<String, Collection<ExecutorDetails>> assignment : assignmentMap.values()) { @@ -367,7 +359,7 @@ public class RAS_Node { ws, exec, td, - this.getTotalAvailableResources(), + getTotalAvailableResources(), td.getTopologyWorkerMaxHeapSize() ); } @@ -429,52 +421,33 @@ public class RAS_Node { * * @return the available memory for this node */ - public Double getAvailableMemoryResources() { - Map<String, Double> allAvailableResources = getTotalAvailableResources(); - return allAvailableResources.getOrDefault( - Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0); + public double getAvailableMemoryResources() { + return getTotalAvailableResources().getTotalMemoryMb(); } /** * Gets total resources for this node. - * - * @return Map<String, Double> of all resources */ - public Map<String, Double> getTotalResources() { + public NormalizedResourceOffer getTotalResources() { if (sup != null) { return sup.getTotalResources(); } else { - return new HashMap<>(); + return new NormalizedResourceOffer(); } } /** * Gets all available resources for this node. * - * @return Map<String, Double> of all resources + * @return All of the available resources. */ - public Map<String, Double> getTotalAvailableResources() { + public NormalizedResourceOffer getTotalAvailableResources() { if (sup != null) { - Map<String, Double> totalResources = sup.getTotalResources(); - Map<String, Double> scheduledResources = cluster.getAllScheduledResourcesForNode(sup.getId()); - Map<String, Double> availableResources = new HashMap<>(); - for (Entry resource : totalResources.entrySet()) { - if(resource.getKey() == Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME) { - availableResources.put(resource.getKey().toString(), - ObjectReader.getDouble(resource.getValue()) - - (scheduledResources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0) - + scheduledResources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0)) - ); - continue; - } - availableResources.put(resource.getKey().toString(), - ObjectReader.getDouble(resource.getValue()) - - scheduledResources.getOrDefault(resource.getKey(), 0.0)); - - } + NormalizedResourceOffer availableResources = new NormalizedResourceOffer(sup.getTotalResources()); + availableResources.remove(cluster.getAllScheduledResourcesForNode(sup.getId())); return availableResources; } else { - return new HashMap<>(); + return new NormalizedResourceOffer(); } } @@ -483,7 +456,7 @@ public class RAS_Node { * * @return the total memory for this node */ - public Double getTotalMemoryResources() { + public double getTotalMemoryResources() { if (sup != null) { return sup.getTotalMemory(); } else { @@ -497,7 +470,7 @@ public class RAS_Node { * @return the available cpu for this node */ public double getAvailableCpuResources() { - return getTotalAvailableResources().getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0); + return getTotalAvailableResources().getTotalCpu(); } /** @@ -505,9 +478,9 @@ public class RAS_Node { * * @return the total cpu for this node */ - public Double getTotalCpuResources() { + public double getTotalCpuResources() { if (sup != null) { - return sup.getTotalCPU(); + return sup.getTotalCpu(); } else { return 0.0; } http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java index 9b0ee15..83ee4cb 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java @@ -18,37 +18,38 @@ package org.apache.storm.scheduler.resource; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import org.apache.storm.Config; import org.apache.storm.generated.Bolt; import org.apache.storm.generated.ComponentCommon; import org.apache.storm.generated.SpoutSpec; import org.apache.storm.generated.StormTopology; -import org.apache.storm.utils.ObjectReader; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.storm.Constants.resourceNameMapping; - public class ResourceUtils { private static final Logger LOG = LoggerFactory.getLogger(ResourceUtils.class); - public static Map<String, Map<String, Double>> getBoltsResources(StormTopology topology, + public static NormalizedResourceRequest getBoltResources(StormTopology topology, Map<String, Object> topologyConf, + String componentId) { + if (topology.get_bolts() != null) { + Bolt bolt = topology.get_bolts().get(componentId); + return new NormalizedResourceRequest(bolt.get_common(), topologyConf); + } + return null; + } + + public static Map<String, NormalizedResourceRequest> getBoltsResources(StormTopology topology, Map<String, Object> topologyConf) { - Map<String, Map<String, Double>> boltResources = new HashMap<>(); + Map<String, NormalizedResourceRequest> boltResources = new HashMap<>(); if (topology.get_bolts() != null) { for (Map.Entry<String, Bolt> bolt : topology.get_bolts().entrySet()) { - Map<String, Double> topologyResources = parseResources(bolt.getValue().get_common().get_json_conf()); - checkInitialization(topologyResources, bolt.getValue().toString(), topologyConf); + NormalizedResourceRequest topologyResources = new NormalizedResourceRequest(bolt.getValue().get_common(), topologyConf); if (LOG.isTraceEnabled()) { LOG.trace("Turned {} into {}", bolt.getValue().get_common().get_json_conf(), topologyResources); } @@ -58,13 +59,21 @@ public class ResourceUtils { return boltResources; } - public static Map<String, Map<String, Double>> getSpoutsResources(StormTopology topology, + public static NormalizedResourceRequest getSpoutResources(StormTopology topology, Map<String, Object> topologyConf, + String componentId) { + if (topology.get_spouts() != null) { + SpoutSpec spout = topology.get_spouts().get(componentId); + return new NormalizedResourceRequest(spout.get_common(), topologyConf); + } + return null; + } + + public static Map<String, NormalizedResourceRequest> getSpoutsResources(StormTopology topology, Map<String, Object> topologyConf) { - Map<String, Map<String, Double>> spoutResources = new HashMap<>(); + Map<String, NormalizedResourceRequest> spoutResources = new HashMap<>(); if (topology.get_spouts() != null) { for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) { - Map<String, Double> topologyResources = parseResources(spout.getValue().get_common().get_json_conf()); - checkInitialization(topologyResources, spout.getValue().toString(), topologyConf); + NormalizedResourceRequest topologyResources = new NormalizedResourceRequest(spout.getValue().get_common(), topologyConf); if (LOG.isTraceEnabled()) { LOG.trace("Turned {} into {}", spout.getValue().get_common().get_json_conf(), topologyResources); } @@ -139,187 +148,4 @@ public class ResourceUtils { } } - public static void checkInitialization(Map<String, Double> topologyResources, - String componentId, Map<String, Object> topologyConf) { - StringBuilder msgBuilder = new StringBuilder(); - - Set<String> resourceNameSet = new HashSet<>(); - - resourceNameSet.add( - Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT - ); - resourceNameSet.add( - Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB - ); - resourceNameSet.add( - Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB - ); - - Map<String, Double> topologyComponentResourcesMap = - (Map<String, Double>) topologyConf.getOrDefault( - Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()); - - resourceNameSet.addAll(topologyResources.keySet()); - resourceNameSet.addAll(topologyComponentResourcesMap.keySet()); - - for (String resourceName : resourceNameSet) { - msgBuilder.append(checkInitResource(topologyResources, topologyConf, topologyComponentResourcesMap, resourceName)); - } - - Map<String, Double> normalizedTopologyResources = normalizedResourceMap(topologyResources); - topologyResources.clear(); - topologyResources.putAll(normalizedTopologyResources); - - if (msgBuilder.length() > 0) { - String resourceDefaults = msgBuilder.toString(); - LOG.debug( - "Unable to extract resource requirement for Component {} \n Resources : {}", - componentId, resourceDefaults); - } - } - - private static String checkInitResource(Map<String, Double> topologyResources, Map topologyConf, - Map<String, Double> topologyComponentResourcesMap, String resourceName) { - StringBuilder msgBuilder = new StringBuilder(); - String normalizedResourceName = resourceNameMapping.getOrDefault(resourceName, resourceName); - if (!topologyResources.containsKey(normalizedResourceName)) { - if (topologyConf.containsKey(resourceName)) { - Double resourceValue = ObjectReader.getDouble(topologyConf.get(resourceName)); - if (resourceValue != null) { - topologyResources.put(normalizedResourceName, resourceValue); - } - } - - if (topologyComponentResourcesMap.containsKey(normalizedResourceName)) { - Double resourceValue = ObjectReader.getDouble(topologyComponentResourcesMap.get(resourceName)); - if (resourceValue != null) { - topologyResources.put(normalizedResourceName, resourceValue); - } - } - } - - return msgBuilder.toString(); - } - - public static Map<String, Double> parseResources(String input) { - Map<String, Double> topologyResources = new HashMap<>(); - JSONParser parser = new JSONParser(); - LOG.debug("Input to parseResources {}", input); - try { - if (input != null) { - Object obj = parser.parse(input); - JSONObject jsonObject = (JSONObject) obj; - - // Legacy resource parsing - if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) { - Double topoMemOnHeap = ObjectReader - .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null); - topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap); - } - if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) { - Double topoMemOffHeap = ObjectReader - .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null); - topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap); - } - if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) { - Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), - null); - topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu); - } - - // If resource is also present in resources map will overwrite the above - if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) { - Map<String, Number> rawResourcesMap = - (Map<String, Number>) jsonObject.computeIfAbsent( - Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>()); - - for (Map.Entry<String, Number> stringNumberEntry : rawResourcesMap.entrySet()) { - topologyResources.put( - stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue()); - } - - - } - } - } catch (ParseException e) { - LOG.error("Failed to parse component resources is:" + e.toString(), e); - return null; - } - return normalizedResourceMap(topologyResources); - } - - /** - * Calculate the sum of a collection of doubles. - * @param list collection of doubles - * @return the sum of of collection of doubles - */ - public static double sum(Collection<Double> list) { - double sum = 0.0; - for (Double elem : list) { - sum += elem; - } - return sum; - } - - /** - * Calculate the average of a collection of doubles. - * @param list a collection of doubles - * @return the average of collection of doubles - */ - public static double avg(Collection<Double> list) { - return sum(list) / list.size(); - } - - /** - * Normalizes a supervisor resource map or topology details map's keys to universal resource names. - * @param resourceMap resource map of either Supervisor or Topology - * @return the resource map with common resource names - */ - public static Map<String, Double> normalizedResourceMap(Map<String, Double> resourceMap) { - Map<String, Double> result = new HashMap(); - - result.putAll(resourceMap); - for (Map.Entry entry: resourceMap.entrySet()) { - if (resourceNameMapping.containsKey(entry.getKey())) { - result.put(resourceNameMapping.get(entry.getKey()), ObjectReader.getDouble(entry.getValue(), 0.0)); - result.remove(entry.getKey()); - } - } - return result; - } - - public static Map<String, Double> addResources(Map<String, Double> resourceMap1, Map<String, Double> resourceMap2) { - Map<String, Double> result = new HashMap(); - - result.putAll(resourceMap1); - - for (Map.Entry<String, Double> entry: resourceMap2.entrySet()) { - if (result.containsKey(entry.getKey())) { - result.put(entry.getKey(), ObjectReader.getDouble(entry.getValue(), - 0.0) + ObjectReader.getDouble(resourceMap1.get(entry.getKey()), 0.0)); - } else { - result.put(entry.getKey(), entry.getValue()); - } - } - return result; - - } - - public static Double getMinValuePresentInResourceMap(Map<String, Double> resourceMap) { - return Collections.min(resourceMap.values()); - } - - public static Map<String, Double> getPercentageOfTotalResourceMap(Map<String, Double> resourceMap, Map<String, Double> totalResourceMap) { - Map<String, Double> result = new HashMap(); - - for(Map.Entry<String, Double> entry: totalResourceMap.entrySet()) { - if (resourceMap.containsKey(entry.getKey())) { - result.put(entry.getKey(), (ObjectReader.getDouble(resourceMap.get(entry.getKey()))/ entry.getValue()) * 100.0) ; - } else { - result.put(entry.getKey(), 0.0); - } - } - return result; - - } } http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java index 07c908e..67bc867 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java @@ -39,6 +39,7 @@ import org.apache.storm.scheduler.Component; import org.apache.storm.scheduler.ExecutorDetails; import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.resource.NormalizedResourceOffer; import org.apache.storm.scheduler.resource.RAS_Node; import org.apache.storm.scheduler.resource.RAS_Nodes; import org.apache.storm.scheduler.resource.ResourceUtils; @@ -129,9 +130,9 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { * a class to contain individual object resources as well as cumulative stats. */ static class AllResources { - List<ObjectResources> objectResources = new LinkedList<ObjectResources>(); - Map<String, Double> availableResourcesOverall = new HashMap<>(); - Map<String, Double> totalResourcesOverall = new HashMap<>(); + List<ObjectResources> objectResources = new LinkedList<>(); + NormalizedResourceOffer availableResourcesOverall = new NormalizedResourceOffer(); + NormalizedResourceOffer totalResourcesOverall = new NormalizedResourceOffer(); String identifier; public AllResources(String identifier) { @@ -139,19 +140,19 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { } public AllResources(AllResources other) { - this( null, - new HashMap<>(other.availableResourcesOverall), - new HashMap(other.totalResourcesOverall), - other.identifier); + this (null, + new NormalizedResourceOffer(other.availableResourcesOverall), + new NormalizedResourceOffer(other.totalResourcesOverall), + other.identifier); List<ObjectResources> objectResourcesList = new ArrayList<>(); for (ObjectResources objectResource : other.objectResources) { objectResourcesList.add(new ObjectResources(objectResource)); } this.objectResources = objectResourcesList; - } - public AllResources(List<ObjectResources> objectResources, Map<String, Double> availableResourcesOverall, Map<String, Double> totalResourcesOverall, String identifier) { + public AllResources(List<ObjectResources> objectResources, NormalizedResourceOffer availableResourcesOverall, + NormalizedResourceOffer totalResourcesOverall, String identifier) { this.objectResources = objectResources; this.availableResourcesOverall = availableResourcesOverall; this.totalResourcesOverall = totalResourcesOverall; @@ -163,10 +164,10 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { * class to keep track of resources on a rack or node. */ static class ObjectResources { - String id; - Map<String, Double> availableResources = new HashMap<>(); - Map<String, Double> totalResources = new HashMap<>(); - double effectiveResources = 0.0; + public final String id; + public NormalizedResourceOffer availableResources = new NormalizedResourceOffer(); + public NormalizedResourceOffer totalResources = new NormalizedResourceOffer(); + public double effectiveResources = 0.0; public ObjectResources(String id) { this.id = id; @@ -176,7 +177,8 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { this(other.id, other.availableResources, other.totalResources, other.effectiveResources); } - public ObjectResources(String id, Map<String, Double> availableResources, Map<String, Double> totalResources, double effectiveResources) { + public ObjectResources(String id, NormalizedResourceOffer availableResources, NormalizedResourceOffer totalResources, + double effectiveResources) { this.id = id; this.availableResources = availableResources; this.totalResources = totalResources; @@ -220,10 +222,8 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { node.totalResources = rasNode.getTotalResources(); nodes.add(node); - allResources.availableResourcesOverall = ResourceUtils.addResources( - allResources.availableResourcesOverall, node.availableResources); - allResources.totalResourcesOverall = ResourceUtils.addResources( - allResources.totalResourcesOverall, node.totalResources); + allResources.availableResourcesOverall.add(node.availableResources); + allResources.totalResourcesOverall.add(node.totalResources); } @@ -234,7 +234,7 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { allResources.totalResourcesOverall); String topoId = topologyDetails.getId(); - return this.sortObjectResources( + return sortObjectResources( allResources, exec, topologyDetails, @@ -243,7 +243,7 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { public int getNumExistingSchedule(String objectId) { //Get execs already assigned in rack - Collection<ExecutorDetails> execs = new LinkedList<ExecutorDetails>(); + Collection<ExecutorDetails> execs = new LinkedList<>(); if (cluster.getAssignmentById(topoId) != null) { for (Map.Entry<ExecutorDetails, WorkerSlot> entry : cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) { @@ -331,13 +331,13 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { racks.add(rack); for (String nodeId : nodeIds) { RAS_Node node = nodes.getNodeById(nodeHostnameToId(nodeId)); - rack.availableResources = ResourceUtils.addResources(rack.availableResources, node.getTotalAvailableResources()); - rack.totalResources = ResourceUtils.addResources(rack.totalResources, node.getTotalResources()); + rack.availableResources.add(node.getTotalAvailableResources()); + rack.totalResources.add(node.getTotalAvailableResources()); nodeIdToRackId.put(nodeId, rack.id); - allResources.totalResourcesOverall = ResourceUtils.addResources(allResources.totalResourcesOverall, rack.totalResources); - allResources.availableResourcesOverall = ResourceUtils.addResources(allResources.availableResourcesOverall, rack.availableResources); + allResources.totalResourcesOverall.add(rack.totalResources); + allResources.availableResourcesOverall.add(rack.availableResources); } } @@ -347,30 +347,27 @@ public abstract class BaseResourceAwareStrategy implements IStrategy { allResources.totalResourcesOverall); String topoId = topologyDetails.getId(); - return this.sortObjectResources( + return sortObjectResources( allResources, exec, topologyDetails, - new ExistingScheduleFunc() { - @Override - public int getNumExistingSchedule(String objectId) { - String rackId = objectId; - //Get execs already assigned in rack - Collection<ExecutorDetails> execs = new LinkedList<ExecutorDetails>(); - if (cluster.getAssignmentById(topoId) != null) { - for (Map.Entry<ExecutorDetails, WorkerSlot> entry : - cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) { - String nodeId = entry.getValue().getNodeId(); - String hostname = idToNode(nodeId).getHostname(); - ExecutorDetails exec = entry.getKey(); - if (nodeIdToRackId.get(hostname) != null - && nodeIdToRackId.get(hostname).equals(rackId)) { - execs.add(exec); - } + (objectId) -> { + String rackId = objectId; + //Get execs already assigned in rack + Collection<ExecutorDetails> execs = new LinkedList<>(); + if (cluster.getAssignmentById(topoId) != null) { + for (Map.Entry<ExecutorDetails, WorkerSlot> entry : + cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) { + String nodeId = entry.getValue().getNodeId(); + String hostname = idToNode(nodeId).getHostname(); + ExecutorDetails exec1 = entry.getKey(); + if (nodeIdToRackId.get(hostname) != null + && nodeIdToRackId.get(hostname).equals(rackId)) { + execs.add(exec1); } } - return execs.size(); } + return execs.size(); }); } http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java index adf05f5..fc746b0 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java @@ -68,7 +68,7 @@ public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy impl Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors); List<String> favoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES); List<String> unFavoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES); - final List<ObjectResources> sortedNodes = this.sortAllNodes(td, null, favoredNodes, unFavoredNodes); + final List<ObjectResources> sortedNodes = sortAllNodes(td, null, favoredNodes, unFavoredNodes); for (ExecutorDetails exec : orderedExecutors) { LOG.debug( @@ -127,32 +127,8 @@ public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy impl final ExistingScheduleFunc existingScheduleFunc) { for (ObjectResources objectResources : allResources.objectResources) { - StringBuilder sb = new StringBuilder(); - if (ResourceUtils.getMinValuePresentInResourceMap(objectResources.availableResources) <= 0) { - objectResources.effectiveResources = 0.0; - } else { - List<Double> values = new LinkedList<>(); - - Map<String, Double> percentageTotal = ResourceUtils.getPercentageOfTotalResourceMap( - objectResources.availableResources, allResources.availableResourcesOverall - ); - for(Map.Entry<String, Double> percentageEntry : percentageTotal.entrySet()) { - values.add(percentageEntry.getValue()); - sb.append(String.format("%s %f(%f%%) ", percentageEntry.getKey(), - objectResources.availableResources.get(percentageEntry.getKey()), - percentageEntry.getValue()) - ); - - } - - objectResources.effectiveResources = Collections.min(values); - } - LOG.debug( - "{}: Avail [ {} ] Total [ {} ] effective resources: {}", - objectResources.id, - sb.toString(), - objectResources.totalResources, - objectResources.effectiveResources); + objectResources.effectiveResources = + allResources.availableResourcesOverall.calculateMinPercentageUsedBy(objectResources.availableResources); } TreeSet<ObjectResources> sortedObjectResources = @@ -169,13 +145,8 @@ public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy impl } else if (o1.effectiveResources < o2.effectiveResources) { return 1; } else { - Collection<Double> o1Values = ResourceUtils.getPercentageOfTotalResourceMap( - o1.availableResources, allResources.availableResourcesOverall).values(); - Collection<Double> o2Values = ResourceUtils.getPercentageOfTotalResourceMap( - o2.availableResources, allResources.availableResourcesOverall).values(); - - double o1Avg = ResourceUtils.avg(o1Values); - double o2Avg = ResourceUtils.avg(o2Values); + double o1Avg = allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources); + double o2Avg = allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources); if (o1Avg > o2Avg) { return -1; http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java index a4de7c2..893e289 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java @@ -18,8 +18,6 @@ package org.apache.storm.scheduler.resource.strategies.scheduling; -import com.google.common.annotations.VisibleForTesting; - import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -27,7 +25,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeSet; - import org.apache.storm.Config; import org.apache.storm.scheduler.Cluster; import org.apache.storm.scheduler.Component; @@ -133,31 +130,7 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails, final ExistingScheduleFunc existingScheduleFunc) { - Map<String, Double> requestedResources = topologyDetails.getTotalResources(exec); AllResources affinityBasedAllResources = new AllResources(allResources); - for (ObjectResources objectResources : affinityBasedAllResources.objectResources) { - StringBuilder sb = new StringBuilder(); - List<Double> values = new LinkedList<>(); - - for (Map.Entry<String, Double> availableResourcesEntry : objectResources.availableResources.entrySet()) { - if (!requestedResources.containsKey(availableResourcesEntry.getKey())) { - objectResources.availableResources.put(availableResourcesEntry.getKey(), -1.0 * availableResourcesEntry.getValue()); - } - } - - Map<String, Double> percentageTotal = ResourceUtils.getPercentageOfTotalResourceMap( - objectResources.availableResources, allResources.availableResourcesOverall - ); - for(Map.Entry<String, Double> percentageEntry : percentageTotal.entrySet()) { - values.add(percentageEntry.getValue()); - sb.append(String.format("%s %f(%f%%) ", percentageEntry.getKey(), - objectResources.availableResources.get(percentageEntry.getKey()), - percentageEntry.getValue()) - ); - - } - - } TreeSet<ObjectResources> sortedObjectResources = new TreeSet<>((o1, o2) -> { @@ -168,13 +141,8 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl } else if (execsScheduled1 < execsScheduled2) { return 1; } else { - Collection<Double> o1Values = ResourceUtils.getPercentageOfTotalResourceMap( - o1.availableResources, allResources.availableResourcesOverall).values(); - Collection<Double> o2Values = ResourceUtils.getPercentageOfTotalResourceMap( - o2.availableResources, allResources.availableResourcesOverall).values(); - - double o1Avg = ResourceUtils.avg(o1Values); - double o2Avg = ResourceUtils.avg(o2Values); + double o1Avg = allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources); + double o2Avg = allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources); if (o1Avg > o2Avg) { return -1; http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java index bd4c4fe..a64c9b4 100644 --- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java +++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java @@ -73,6 +73,7 @@ import org.apache.storm.generated.ReadableBlobMeta; import org.apache.storm.generated.SettableBlobMeta; import org.apache.storm.generated.StormTopology; import org.apache.storm.nimbus.NimbusInfo; +import org.apache.storm.scheduler.resource.NormalizedResourceRequest; import org.apache.storm.scheduler.resource.ResourceUtils; import org.apache.storm.security.auth.SingleUserPrincipal; import org.apache.thrift.TException; @@ -715,30 +716,33 @@ public class ServerUtils { return false; } - public static int getEstimatedWorkerCountForRASTopo(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException { + public static int getEstimatedWorkerCountForRASTopo(Map<String, Object> topoConf, StormTopology topology) + throws InvalidTopologyException { return (int) Math.ceil(getEstimatedTotalHeapMemoryRequiredByTopo(topoConf, topology) / ObjectReader.getDouble(topoConf.get(Config.WORKER_HEAP_MEMORY_MB))); } - public static double getEstimatedTotalHeapMemoryRequiredByTopo(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException { + public static double getEstimatedTotalHeapMemoryRequiredByTopo(Map<String, Object> topoConf, StormTopology topology) + throws InvalidTopologyException { Map<String, Integer> componentParallelism = getComponentParallelism(topoConf, topology); double totalMemoryRequired = 0.0; - for(Map.Entry<String, Map<String, Double>> entry: ResourceUtils.getBoltsResources(topology, topoConf).entrySet()) { + for (Map.Entry<String, NormalizedResourceRequest> entry: ResourceUtils.getBoltsResources(topology, topoConf).entrySet()) { int parallelism = componentParallelism.getOrDefault(entry.getKey(), 1); - double memoryRequirement = entry.getValue().get(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME); + double memoryRequirement = entry.getValue().getOnHeapMemoryMb(); totalMemoryRequired += memoryRequirement * parallelism; } - for(Map.Entry<String, Map<String, Double>> entry: ResourceUtils.getSpoutsResources(topology, topoConf).entrySet()) { + for (Map.Entry<String, NormalizedResourceRequest> entry: ResourceUtils.getSpoutsResources(topology, topoConf).entrySet()) { int parallelism = componentParallelism.getOrDefault(entry.getKey(), 1); - double memoryRequirement = entry.getValue().get(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME); + double memoryRequirement = entry.getValue().getOnHeapMemoryMb(); totalMemoryRequired += memoryRequirement * parallelism; } return totalMemoryRequired; } - public static Map<String, Integer> getComponentParallelism(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException { + public static Map<String, Integer> getComponentParallelism(Map<String, Object> topoConf, StormTopology topology) + throws InvalidTopologyException { Map<String, Integer> ret = new HashMap<>(); Map<String, Object> components = StormCommon.allComponents(topology); for (Map.Entry<String, Object> entry : components.entrySet()) { http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java index 16c4fb2..38b68c0 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java @@ -333,7 +333,7 @@ public class TestResourceAwareScheduler { executorsOnSupervisor.add(entry.getKey()); } for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) { - Double supervisorTotalCpu = entry.getKey().getTotalCPU(); + Double supervisorTotalCpu = entry.getKey().getTotalCpu(); Double supervisorTotalMemory = entry.getKey().getTotalMemory(); Double supervisorUsedCpu = 0.0; Double supervisorUsedMemory = 0.0; @@ -501,6 +501,10 @@ public class TestResourceAwareScheduler { @Test public void testHeterogeneousCluster() { + Map<String, Double> test = new HashMap<>(); + test.put("gpu.count", 0.0); + new NormalizedResourceOffer(test); + LOG.info("\n\n\t\ttestHeterogeneousCluster"); INimbus iNimbus = new INimbusTest(); Map<String, Double> resourceMap1 = new HashMap<>(); // strong supervisor node resourceMap1.put(Config.SUPERVISOR_CPU_CAPACITY, 800.0); @@ -509,10 +513,10 @@ public class TestResourceAwareScheduler { resourceMap2.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0); resourceMap2.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0); - resourceMap1 = ResourceUtils.normalizedResourceMap(resourceMap1); - resourceMap2 = ResourceUtils.normalizedResourceMap(resourceMap2); + resourceMap1 = NormalizedResources.normalizedResourceMap(resourceMap1); + resourceMap2 = NormalizedResources.normalizedResourceMap(resourceMap2); - Map<String, SupervisorDetails> supMap = new HashMap<String, SupervisorDetails>(); + Map<String, SupervisorDetails> supMap = new HashMap<>(); for (int i = 0; i < 2; i++) { List<Number> ports = new LinkedList<>(); for (int j = 0; j < 4; j++) { @@ -521,6 +525,7 @@ public class TestResourceAwareScheduler { SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, i == 0 ? resourceMap1 : resourceMap2); supMap.put(sup.getId(), sup); } + LOG.info("SUPERVISORS = {}", supMap); // topo1 has one single huge task that can not be handled by the small-super TopologyBuilder builder1 = new TopologyBuilder(); @@ -569,6 +574,7 @@ public class TestResourceAwareScheduler { // Test1: Launch topo 1-3 together, it should be able to use up either mem or cpu resource due to exact division ResourceAwareScheduler rs = new ResourceAwareScheduler(); + LOG.info("\n\n\t\tScheduling topologies 1, 2 and 3"); Topologies topologies = new Topologies(topology1, topology2, topology3); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, config1); rs.prepare(config1); @@ -583,14 +589,17 @@ public class TestResourceAwareScheduler { final Double EPSILON = 0.0001; for (SupervisorDetails supervisor : supMap.values()) { - Double cpuAvailable = supervisor.getTotalCPU(); + Double cpuAvailable = supervisor.getTotalCpu(); Double memAvailable = supervisor.getTotalMemory(); Double cpuUsed = superToCpu.get(supervisor); Double memUsed = superToMem.get(supervisor); - assertTrue((Math.abs(memAvailable - memUsed) < EPSILON) || (Math.abs(cpuAvailable - cpuUsed) < EPSILON)); + + assertTrue(supervisor.getId() + " MEM: "+ memAvailable + " == " + memUsed + " OR CPU: " + cpuAvailable + " == " + cpuUsed, + (Math.abs(memAvailable - memUsed) < EPSILON) || (Math.abs(cpuAvailable - cpuUsed) < EPSILON)); } // end of Test1 + LOG.warn("\n\n\t\tSwitching to topologies 1, 2 and 4"); // Test2: Launch topo 1, 2 and 4, they together request a little more mem than available, so one of the 3 topos will not be scheduled topologies = new Topologies(topology1, topology2, topology4); cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, config1); @@ -598,17 +607,21 @@ public class TestResourceAwareScheduler { rs.schedule(topologies, cluster); int numTopologiesAssigned = 0; if (cluster.getStatusMap().get(topology1.getId()).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) { + LOG.info("TOPO 1 scheduled"); numTopologiesAssigned++; } if (cluster.getStatusMap().get(topology2.getId()).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) { + LOG.info("TOPO 2 scheduled"); numTopologiesAssigned++; } if (cluster.getStatusMap().get(topology4.getId()).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) { + LOG.info("TOPO 3 scheduled"); numTopologiesAssigned++; } assertEquals(2, numTopologiesAssigned); //end of Test2 + LOG.info("\n\n\t\tScheduling just topo 5"); //Test3: "Launch topo5 only, both mem and cpu should be exactly used up" topologies = new Topologies(topology5); cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, config1); @@ -617,7 +630,7 @@ public class TestResourceAwareScheduler { superToCpu = getSupervisorToCpuUsage(cluster, topologies); superToMem = getSupervisorToMemoryUsage(cluster, topologies); for (SupervisorDetails supervisor : supMap.values()) { - Double cpuAvailable = supervisor.getTotalCPU(); + Double cpuAvailable = supervisor.getTotalCpu(); Double memAvailable = supervisor.getTotalMemory(); Double cpuUsed = superToCpu.get(supervisor); Double memUsed = superToMem.get(supervisor); http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java index 7fa3c3b..4a2e644 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java @@ -65,7 +65,7 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; -import static org.apache.storm.scheduler.resource.ResourceUtils.normalizedResourceMap; +import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap; public class TestUtilsForResourceAwareScheduler { private static final Logger LOG = LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class); http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java index 5990d2c..0f7fed9 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java @@ -201,7 +201,7 @@ public class TestDefaultResourceAwareStrategy { @Test public void testMultipleRacks() { - final Map<String, SupervisorDetails> supMap = new HashMap<String, SupervisorDetails>(); + final Map<String, SupervisorDetails> supMap = new HashMap<>(); final Map<String, SupervisorDetails> supMapRack1 = genSupervisors(10, 4, 0, 400, 8000); //generate another rack of supervisors with less resources final Map<String, SupervisorDetails> supMapRack2 = genSupervisors(10, 4, 10, 200, 4000); @@ -229,7 +229,7 @@ public class TestDefaultResourceAwareStrategy { DNSToSwitchMapping TestNetworkTopographyPlugin = new DNSToSwitchMapping() { @Override public Map<String, String> resolve(List<String> names) { - Map<String, String> ret = new HashMap<String, String>(); + Map<String, String> ret = new HashMap<>(); for (SupervisorDetails sup : supMapRack1.values()) { ret.put(sup.getHost(), "rack-0"); } @@ -267,7 +267,7 @@ public class TestDefaultResourceAwareStrategy { String rack = entry.getValue(); List<String> nodesForRack = rackToNodes.get(rack); if (nodesForRack == null) { - nodesForRack = new ArrayList<String>(); + nodesForRack = new ArrayList<>(); rackToNodes.put(rack, nodesForRack); } nodesForRack.add(hostName); @@ -277,7 +277,8 @@ public class TestDefaultResourceAwareStrategy { DefaultResourceAwareStrategy rs = new DefaultResourceAwareStrategy(); rs.prepare(cluster); - TreeSet<ObjectResources> sortedRacks= rs.sortRacks(null, topo1); + TreeSet<ObjectResources> sortedRacks = rs.sortRacks(null, topo1); + LOG.info("Sorted Racks {}", sortedRacks); Assert.assertEquals("# of racks sorted", 5, sortedRacks.size()); Iterator<ObjectResources> it = sortedRacks.iterator();
