Repository: storm
Updated Branches:
  refs/heads/master 3cf71d2c6 -> becab7cc2


http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
new file mode 100644
index 0000000..926184c
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ObjectReader;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A request that has been normalized.
+ */
+public class NormalizedResourceRequest extends NormalizedResources {
+    private static final Logger LOG = 
LoggerFactory.getLogger(NormalizedResourceRequest.class);
+
+    private static void putIfMissing(Map<String, Double> dest, String destKey, 
Map<String, Object> src, String srcKey) {
+        if (!dest.containsKey(destKey)) {
+            Number value = (Number)src.get(srcKey);
+            if (value != null) {
+                dest.put(destKey, value.doubleValue());
+            }
+        }
+    }
+
+    private static Map<String, Double> getDefaultResources(Map<String, Object> 
topoConf) {
+        Map<String, Double> ret = normalizedResourceMap((Map<String, Number>) 
topoConf.getOrDefault(
+            Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
+        putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, 
Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+        putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 
topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+        putIfMissing(ret, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 
topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+        return ret;
+    }
+
+    private static Map<String, Double> parseResources(String input) {
+        Map<String, Double> topologyResources = new HashMap<>();
+        JSONParser parser = new JSONParser();
+        LOG.debug("Input to parseResources {}", input);
+        try {
+            if (input != null) {
+                Object obj = parser.parse(input);
+                JSONObject jsonObject = (JSONObject) obj;
+
+                // Legacy resource parsing
+                if 
(jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
+                    Double topoMemOnHeap = ObjectReader
+                        
.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
 null);
+                    
topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
topoMemOnHeap);
+                }
+                if 
(jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) 
{
+                    Double topoMemOffHeap = ObjectReader
+                        
.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
 null);
+                    
topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
topoMemOffHeap);
+                }
+                if 
(jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
+                    Double topoCpu = 
ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
+                        null);
+                    
topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
+                }
+
+                // If resource is also present in resources map will overwrite 
the above
+                if 
(jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
+                    Map<String, Number> rawResourcesMap =
+                        (Map<String, Number>) jsonObject.computeIfAbsent(
+                            Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> 
new HashMap<>());
+
+                    for (Map.Entry<String, Number> stringNumberEntry : 
rawResourcesMap.entrySet()) {
+                        topologyResources.put(
+                            stringNumberEntry.getKey(), 
stringNumberEntry.getValue().doubleValue());
+                    }
+
+
+                }
+            }
+        } catch (ParseException e) {
+            LOG.error("Failed to parse component resources is:" + 
e.toString(), e);
+            return null;
+        }
+        return topologyResources;
+    }
+
+    private double onHeap;
+    private double offHeap;
+
+    /**
+     * Create a new normalized set of resources.  Note that memory is not 
covered here becasue it is not consistent in requests vs offers
+     * because of how on heap vs off heap is used.
+     *
+     * @param resources the resources to be normalized.
+     * @param topologyConf the config for the topology
+     */
+    private NormalizedResourceRequest(Map<String, ? extends Number> resources,
+                                     Map<String, Object> topologyConf) {
+        super(resources, getDefaultResources(topologyConf));
+    }
+
+    public NormalizedResourceRequest(ComponentCommon component, Map<String, 
Object> topoConf) {
+        this(parseResources(component.get_json_conf()), topoConf);
+    }
+
+    public NormalizedResourceRequest(Map<String, Object> topoConf) {
+        this((Map<String, ? extends Number>) null, topoConf);
+    }
+
+    public NormalizedResourceRequest() {
+        super(null, null);
+    }
+
+    @Override
+    public Map<String,Double> toNormalizedMap() {
+        Map<String, Double> ret = super.toNormalizedMap();
+        ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
+        ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap);
+        return ret;
+    }
+
+    @Override
+    protected void initializeMemory(Map<String, Double> normalizedResources) {
+        onHeap = 
normalizedResources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 
0.0);
+        offHeap = 
normalizedResources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 
0.0);
+    }
+
+    public double getOnHeapMemoryMb() {
+        return onHeap;
+    }
+
+    public void addOnHeap(final double onHeap) {
+        this.onHeap += onHeap;
+    }
+
+    public double getOffHeapMemoryMb() {
+        return offHeap;
+    }
+
+    public void addOffHeap(final double offHeap) {
+        this.offHeap += offHeap;
+    }
+
+    /**
+     * Add the resources in other to this.
+     * @param other the other Request to add to this.
+     */
+    public void add(NormalizedResourceRequest other) {
+        super.add(other);
+        onHeap += other.onHeap;
+        offHeap += other.offHeap;
+    }
+
+    @Override
+    public void add(WorkerResources value) {
+        super.add(value);
+        //The resources are already normalized
+        Map<String, Double> resources = value.get_resources();
+        onHeap += 
resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+        offHeap += 
resources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+    }
+
+    @Override
+    public double getTotalMemoryMb() {
+        return getOnHeapMemoryMb() + getOffHeapMemoryMb();
+    }
+
+    @Override
+    public String toString() {
+        return super.toString() + " onHeap: " + onHeap + " offHeap: " + 
offHeap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
new file mode 100644
index 0000000..8ed1a57
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import static org.apache.storm.Constants.*;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.generated.WorkerResources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resources that have been normalized.
+ */
+public abstract class NormalizedResources {
+    private static final Logger LOG = 
LoggerFactory.getLogger(NormalizedResources.class);
+    public static final Map<String, String> RESOURCE_NAME_MAPPING;
+
+    static {
+        Map<String, String> tmp = new HashMap<>();
+        tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 
COMMON_CPU_RESOURCE_NAME);
+        tmp.put(Config.SUPERVISOR_CPU_CAPACITY, COMMON_CPU_RESOURCE_NAME);
+        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
+        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
+        tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 
COMMON_TOTAL_MEMORY_RESOURCE_NAME);
+        RESOURCE_NAME_MAPPING = Collections.unmodifiableMap(tmp);
+    }
+
+    private static double[] makeArray(Map<String, Double> normalizedResources) 
{
+        //To avoid locking we will go through the map twice.  It should be 
small so it is probably not a big deal
+        for (String key : normalizedResources.keySet()) {
+            //We are going to skip over CPU and Memory, because they are 
captured elsewhere
+            if (!COMMON_CPU_RESOURCE_NAME.equals(key)
+                && !COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
+                && !COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
+                && !COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
+                resourceNames.computeIfAbsent(key, (k) -> 
counter.getAndIncrement());
+            }
+        }
+        //By default all of the values are 0
+        double [] ret = new double[counter.get()];
+        for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) 
{
+            Integer index = resourceNames.get(entry.getKey());
+            if (index != null) {
+                //index == null if it is memory or CPU
+                ret[index] = entry.getValue();
+            }
+        }
+        return ret;
+    }
+
+    private static final ConcurrentMap<String, Integer> resourceNames = new 
ConcurrentHashMap<>();
+    private static final AtomicInteger counter = new AtomicInteger(0);
+    private double cpu;
+    private double[] otherResources;
+
+    public NormalizedResources(NormalizedResources other) {
+        cpu = other.cpu;
+        otherResources = Arrays.copyOf(other.otherResources, 
other.otherResources.length);
+    }
+
+    /**
+     * Create a new normalized set of resources.  Note that memory is not
+     * covered here because it is not consistent in requests vs offers because
+     * of how on heap vs off heap is used.
+     * @param resources the resources to be normalized.
+     * @param defaults the default resources that will also be normalized and 
combined with the real resources.
+     */
+    public NormalizedResources(Map<String, ? extends Number> resources, 
Map<String, ? extends Number> defaults) {
+        Map<String, Double> normalizedResources = 
normalizedResourceMap(defaults);
+        normalizedResources.putAll(normalizedResourceMap(resources));
+        cpu = 
normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+        otherResources = makeArray(normalizedResources);
+        initializeMemory(normalizedResources);
+    }
+
+    /**
+     * Initialize any memory usage from the normalized map.
+     * @param normalizedResources the normalized resource map.
+     */
+    protected abstract void initializeMemory(Map<String, Double> 
normalizedResources);
+
+    /**
+     * Normalizes a supervisor resource map or topology details map's keys to 
universal resource names.
+     * @param resourceMap resource map of either Supervisor or Topology
+     * @return the resource map with common resource names
+     */
+    public static Map<String, Double> normalizedResourceMap(Map<String, ? 
extends Number> resourceMap) {
+        if (resourceMap == null) {
+            return new HashMap<>();
+        }
+        return new HashMap<>(resourceMap.entrySet().stream()
+            .collect(Collectors.toMap(
+                //Map the key if needed
+                (e) -> RESOURCE_NAME_MAPPING.getOrDefault(e.getKey(), 
e.getKey()),
+                //Map the value
+                (e) -> e.getValue().doubleValue())));
+    }
+
+    /**
+     * Get the total amount of memory.
+     * @return the total amount of memory requested or provided.
+     */
+    public abstract double getTotalMemoryMb();
+
+    /**
+     * Get the total amount of cpu.
+     * @return the amount of cpu.
+     */
+    public double getTotalCpu() {
+        return cpu;
+    }
+
+    private void add(double[] resourceArray) {
+        int otherLength = resourceArray.length;
+        int length = otherResources.length;
+        if (otherLength > length) {
+            double [] newResources = new double[otherLength];
+            System.arraycopy(newResources, 0, otherResources, 0, length);
+            otherResources = newResources;
+        }
+        for (int i = 0; i < otherLength; i++) {
+            otherResources[i] += resourceArray[i];
+        }
+    }
+
+    public void add(NormalizedResources other) {
+        this.cpu += other.cpu;
+        add(other.otherResources);
+    }
+
+    /**
+     * Add the resources from a worker to this.
+     * @param value the worker resources that should be added to this.
+     */
+    public void add(WorkerResources value) {
+        Map<String, Double> normalizedResources = value.get_resources();
+        cpu += 
normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+        add(makeArray(normalizedResources));
+    }
+
+    /**
+     * Remove the resources from other.  This is the same as subtracting the 
resources in other from this.
+     * @param other the resources we want removed.
+     */
+    public void remove(NormalizedResources other) {
+        this.cpu -= other.cpu;
+        assert cpu >= 0.0;
+        int otherLength = other.otherResources.length;
+        int length = otherResources.length;
+        if (otherLength > length) {
+            double [] newResources = new double[otherLength];
+            System.arraycopy(newResources, 0, otherResources, 0, length);
+            otherResources = newResources;
+        }
+        for (int i = 0; i < Math.min(length, otherLength); i++) {
+            otherResources[i] -= other.otherResources[i];
+            assert otherResources[i] >= 0.0;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "CPU: " + cpu;
+    }
+
+    /**
+     * Return a Map of the normalized resource name to a double.  This should 
only
+     * be used when returning thrift resource requests to the end user.
+     */
+    public Map<String,Double> toNormalizedMap() {
+        HashMap<String, Double> ret = new HashMap<>();
+        ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
+        int length = otherResources.length;
+        for (Map.Entry<String, Integer> entry: resourceNames.entrySet()) {
+            int index = entry.getValue();
+            if (index < length) {
+                ret.put(entry.getKey(), otherResources[index]);
+            }
+        }
+        return ret;
+    }
+
+    private double getResourceAt(int index) {
+        if (index >= otherResources.length) {
+            return 0.0;
+        }
+        return otherResources[index];
+    }
+
+    /**
+     * A simple sanity check to see if all of the resources in this would be 
large enough to hold the resources in other ignoring memory.
+     * It does not check memory because with shared memory it is beyond the 
scope of this.
+     * @param other the resources that we want to check if they would fit in 
this.
+     * @return true if it might fit, else false if it could not possibly fit.
+     */
+    public boolean couldHoldIgnoringMemory(NormalizedResources other) {
+        if (this.cpu < other.getTotalCpu()) {
+            return false;
+        }
+        int length = Math.max(this.otherResources.length, 
other.otherResources.length);
+        for (int i = 0; i < length; i++) {
+            if (getResourceAt(i) < other.getResourceAt(i)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Calculate the average resource usage percentage with this being the 
total resources and
+     * used being the amounts used.
+     * @param used the amount of resources used.
+     * @return the average percentage used 0.0 to 100.0.
+     */
+    public double calculateAveragePercentageUsedBy(NormalizedResources used) {
+        double total = 0.0;
+        double totalMemory = getTotalMemoryMb();
+        if (totalMemory != 0.0) {
+            total += used.getTotalMemoryMb() / totalMemory;
+        }
+        double totalCpu = getTotalCpu();
+        if (totalCpu != 0.0) {
+            total += used.getTotalCpu() / getTotalCpu();
+        }
+        //If total is 0 we add in a 0% used, so we can just skip over anything 
that is not in both.
+        int length = Math.min(used.otherResources.length, 
otherResources.length);
+        for (int i = 0; i < length; i++) {
+            if (otherResources[i] != 0.0) {
+                total += used.otherResources[i] / otherResources[i];
+            }
+        }
+        //To get the count we divide by we need to take the maximum length 
because we are doing an average.
+        return (total * 100.0) / (2 + Math.max(otherResources.length, 
used.otherResources.length));
+    }
+
+    /**
+     * Calculate the minimum resource usage percentage with this being the 
total resources and
+     * used being the amounts used.
+     * @param used the amount of resources used.
+     * @return the minimum percentage used 0.0 to 100.0.
+     */
+    public double calculateMinPercentageUsedBy(NormalizedResources used) {
+        double totalMemory = getTotalMemoryMb();
+        double totalCpu = getTotalCpu();
+        if (used.otherResources.length != otherResources.length
+            || totalMemory == 0.0
+            || totalCpu == 0.0) {
+            //If the lengths don't match one of the resources will be 0, which 
means we would calculate the percentage to be 0.0
+            // and so the min would be 0.0 (assuming that we can never go 
negative on a resource being used.
+            return 0.0;
+        }
+        double min = used.getTotalMemoryMb() / totalMemory;
+        min = Math.min(min, used.getTotalCpu() / getTotalCpu());
+
+        for (int i = 0; i < otherResources.length; i++) {
+            if (otherResources[i] != 0.0) {
+                min = Math.min(min, used.otherResources[i] / 
otherResources[i]);
+            } else {
+                return 0.0; //0 will be the minimum, because we count values 
not in here as 0
+            }
+        }
+        return min * 100.0;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index 633fb5c..5350005 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ExecutorDetails;
@@ -55,7 +54,6 @@ public class RAS_Node {
     private SupervisorDetails sup;
     private final Cluster cluster;
     private final Set<WorkerSlot> originallyFreeSlots;
-    private final Map<String, Double> totalResources;
 
     public RAS_Node(
         String nodeId,
@@ -89,12 +87,6 @@ public class RAS_Node {
             this.sup = sup;
         }
 
-        if (isAlive) {
-            totalResources = getTotalResources();
-        } else {
-            totalResources = new HashMap<>();
-        }
-
         HashSet<String> freeById = new HashSet<>(slots.keySet());
         if (assignmentMap != null) {
             for (Map<String, Collection<ExecutorDetails>> assignment : 
assignmentMap.values()) {
@@ -367,7 +359,7 @@ public class RAS_Node {
             ws,
             exec,
             td,
-            this.getTotalAvailableResources(),
+            getTotalAvailableResources(),
             td.getTopologyWorkerMaxHeapSize()
             );
     }
@@ -429,52 +421,33 @@ public class RAS_Node {
      *
      * @return the available memory for this node
      */
-    public Double getAvailableMemoryResources() {
-        Map<String, Double> allAvailableResources = 
getTotalAvailableResources();
-        return allAvailableResources.getOrDefault(
-                Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
+    public double getAvailableMemoryResources() {
+        return getTotalAvailableResources().getTotalMemoryMb();
     }
 
     /**
      * Gets total resources for this node.
-     *
-     * @return Map<String, Double> of all resources
      */
-    public Map<String, Double> getTotalResources() {
+    public NormalizedResourceOffer getTotalResources() {
         if (sup != null) {
             return sup.getTotalResources();
         } else {
-            return new HashMap<>();
+            return new NormalizedResourceOffer();
         }
     }
 
     /**
      * Gets all available resources for this node.
      *
-     * @return Map<String, Double> of all resources
+     * @return All of the available resources.
      */
-    public Map<String, Double> getTotalAvailableResources() {
+    public NormalizedResourceOffer getTotalAvailableResources() {
         if (sup != null) {
-            Map<String, Double> totalResources = sup.getTotalResources();
-            Map<String, Double> scheduledResources = 
cluster.getAllScheduledResourcesForNode(sup.getId());
-            Map<String, Double> availableResources = new HashMap<>();
-            for (Entry resource : totalResources.entrySet()) {
-                if(resource.getKey() == 
Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME) {
-                    availableResources.put(resource.getKey().toString(),
-                            ObjectReader.getDouble(resource.getValue())
-                                    - 
(scheduledResources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 
0.0)
-                                    + 
scheduledResources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 
0.0))
-                    );
-                    continue;
-                }
-                availableResources.put(resource.getKey().toString(),
-                        ObjectReader.getDouble(resource.getValue())
-                                - 
scheduledResources.getOrDefault(resource.getKey(), 0.0));
-
-            }
+            NormalizedResourceOffer availableResources = new 
NormalizedResourceOffer(sup.getTotalResources());
+            
availableResources.remove(cluster.getAllScheduledResourcesForNode(sup.getId()));
             return availableResources;
         } else {
-            return new HashMap<>();
+            return new NormalizedResourceOffer();
         }
     }
 
@@ -483,7 +456,7 @@ public class RAS_Node {
      *
      * @return the total memory for this node
      */
-    public Double getTotalMemoryResources() {
+    public double getTotalMemoryResources() {
         if (sup != null) {
             return sup.getTotalMemory();
         } else {
@@ -497,7 +470,7 @@ public class RAS_Node {
      * @return the available cpu for this node
      */
     public double getAvailableCpuResources() {
-        return 
getTotalAvailableResources().getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 
0.0);
+        return getTotalAvailableResources().getTotalCpu();
     }
 
     /**
@@ -505,9 +478,9 @@ public class RAS_Node {
      *
      * @return the total cpu for this node
      */
-    public Double getTotalCpuResources() {
+    public double getTotalCpuResources() {
         if (sup != null) {
-            return sup.getTotalCPU();
+            return sup.getTotalCpu();
         } else {
             return 0.0;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
index 9b0ee15..83ee4cb 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -18,37 +18,38 @@
 
 package org.apache.storm.scheduler.resource;
 
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.storm.Config;
 import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.utils.ObjectReader;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.storm.Constants.resourceNameMapping;
-
 public class ResourceUtils {
     private static final Logger LOG = 
LoggerFactory.getLogger(ResourceUtils.class);
 
-    public static Map<String, Map<String, Double>> 
getBoltsResources(StormTopology topology,
+    public static NormalizedResourceRequest getBoltResources(StormTopology 
topology, Map<String, Object> topologyConf,
+                                                              String 
componentId) {
+        if (topology.get_bolts() != null) {
+            Bolt bolt = topology.get_bolts().get(componentId);
+            return new NormalizedResourceRequest(bolt.get_common(), 
topologyConf);
+        }
+        return null;
+    }
+
+    public static Map<String, NormalizedResourceRequest> 
getBoltsResources(StormTopology topology,
                                                                      
Map<String, Object> topologyConf) {
-        Map<String, Map<String, Double>> boltResources = new HashMap<>();
+        Map<String, NormalizedResourceRequest> boltResources = new HashMap<>();
         if (topology.get_bolts() != null) {
             for (Map.Entry<String, Bolt> bolt : 
topology.get_bolts().entrySet()) {
-                Map<String, Double> topologyResources = 
parseResources(bolt.getValue().get_common().get_json_conf());
-                checkInitialization(topologyResources, 
bolt.getValue().toString(), topologyConf);
+                NormalizedResourceRequest topologyResources = new 
NormalizedResourceRequest(bolt.getValue().get_common(), topologyConf);
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Turned {} into {}", 
bolt.getValue().get_common().get_json_conf(), topologyResources);
                 }
@@ -58,13 +59,21 @@ public class ResourceUtils {
         return boltResources;
     }
 
-    public static Map<String, Map<String, Double>> 
getSpoutsResources(StormTopology topology,
+    public static NormalizedResourceRequest getSpoutResources(StormTopology 
topology, Map<String, Object> topologyConf,
+                                                              String 
componentId) {
+        if (topology.get_spouts() != null) {
+            SpoutSpec spout = topology.get_spouts().get(componentId);
+            return new NormalizedResourceRequest(spout.get_common(), 
topologyConf);
+        }
+        return null;
+    }
+
+    public static Map<String, NormalizedResourceRequest> 
getSpoutsResources(StormTopology topology,
                                                                       
Map<String, Object> topologyConf) {
-        Map<String, Map<String, Double>> spoutResources = new HashMap<>();
+        Map<String, NormalizedResourceRequest> spoutResources = new 
HashMap<>();
         if (topology.get_spouts() != null) {
             for (Map.Entry<String, SpoutSpec> spout : 
topology.get_spouts().entrySet()) {
-                Map<String, Double> topologyResources = 
parseResources(spout.getValue().get_common().get_json_conf());
-                checkInitialization(topologyResources, 
spout.getValue().toString(), topologyConf);
+                NormalizedResourceRequest topologyResources = new 
NormalizedResourceRequest(spout.getValue().get_common(), topologyConf);
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Turned {} into {}", 
spout.getValue().get_common().get_json_conf(), topologyResources);
                 }
@@ -139,187 +148,4 @@ public class ResourceUtils {
         }
     }
 
-    public static void checkInitialization(Map<String, Double> 
topologyResources,
-                                           String componentId, Map<String, 
Object> topologyConf) {
-        StringBuilder msgBuilder = new StringBuilder();
-
-        Set<String> resourceNameSet = new HashSet<>();
-
-        resourceNameSet.add(
-                Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT
-        );
-        resourceNameSet.add(
-                Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB
-        );
-        resourceNameSet.add(
-                Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB
-        );
-
-        Map<String, Double> topologyComponentResourcesMap =
-                (Map<String, Double>) topologyConf.getOrDefault(
-                        Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new 
HashMap<>());
-
-        resourceNameSet.addAll(topologyResources.keySet());
-        resourceNameSet.addAll(topologyComponentResourcesMap.keySet());
-
-        for (String resourceName : resourceNameSet) {
-            msgBuilder.append(checkInitResource(topologyResources, 
topologyConf, topologyComponentResourcesMap, resourceName));
-        }
-
-        Map<String, Double> normalizedTopologyResources = 
normalizedResourceMap(topologyResources);
-        topologyResources.clear();
-        topologyResources.putAll(normalizedTopologyResources);
-
-        if (msgBuilder.length() > 0) {
-            String resourceDefaults = msgBuilder.toString();
-            LOG.debug(
-                    "Unable to extract resource requirement for Component {} 
\n Resources : {}",
-                    componentId, resourceDefaults);
-        }
-    }
-
-    private static String checkInitResource(Map<String, Double> 
topologyResources, Map topologyConf,
-                                            Map<String, Double> 
topologyComponentResourcesMap, String resourceName) {
-        StringBuilder msgBuilder = new StringBuilder();
-        String normalizedResourceName = 
resourceNameMapping.getOrDefault(resourceName, resourceName);
-        if (!topologyResources.containsKey(normalizedResourceName)) {
-            if (topologyConf.containsKey(resourceName)) {
-                Double resourceValue = 
ObjectReader.getDouble(topologyConf.get(resourceName));
-                if (resourceValue != null) {
-                    topologyResources.put(normalizedResourceName, 
resourceValue);
-                }
-            }
-
-            if 
(topologyComponentResourcesMap.containsKey(normalizedResourceName)) {
-                Double resourceValue = 
ObjectReader.getDouble(topologyComponentResourcesMap.get(resourceName));
-                if (resourceValue != null) {
-                    topologyResources.put(normalizedResourceName, 
resourceValue);
-                }
-            }
-        }
-
-        return msgBuilder.toString();
-    }
-
-    public static Map<String, Double> parseResources(String input) {
-        Map<String, Double> topologyResources = new HashMap<>();
-        JSONParser parser = new JSONParser();
-        LOG.debug("Input to parseResources {}", input);
-        try {
-            if (input != null) {
-                Object obj = parser.parse(input);
-                JSONObject jsonObject = (JSONObject) obj;
-
-                // Legacy resource parsing
-                if 
(jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
-                    Double topoMemOnHeap = ObjectReader
-                            
.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
 null);
-                    
topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
topoMemOnHeap);
-                }
-                if 
(jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) 
{
-                    Double topoMemOffHeap = ObjectReader
-                            
.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
 null);
-                    
topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
topoMemOffHeap);
-                }
-                if 
(jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
-                    Double topoCpu = 
ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
-                        null);
-                    
topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
-                }
-
-                // If resource is also present in resources map will overwrite 
the above
-                if 
(jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
-                    Map<String, Number> rawResourcesMap =
-                            (Map<String, Number>) jsonObject.computeIfAbsent(
-                                    Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, 
(k) -> new HashMap<>());
-
-                    for (Map.Entry<String, Number> stringNumberEntry : 
rawResourcesMap.entrySet()) {
-                        topologyResources.put(
-                                stringNumberEntry.getKey(), 
stringNumberEntry.getValue().doubleValue());
-                    }
-
-
-                }
-            }
-        } catch (ParseException e) {
-            LOG.error("Failed to parse component resources is:" + 
e.toString(), e);
-            return null;
-        }
-        return normalizedResourceMap(topologyResources);
-    }
-
-    /**
-     * Calculate the sum of a collection of doubles.
-     * @param list collection of doubles
-     * @return the sum of of collection of doubles
-     */
-    public static double sum(Collection<Double> list) {
-        double sum = 0.0;
-        for (Double elem : list) {
-            sum += elem;
-        }
-        return sum;
-    }
-
-    /**
-     * Calculate the average of a collection of doubles.
-     * @param list a collection of doubles
-     * @return the average of collection of doubles
-     */
-    public static double avg(Collection<Double> list) {
-        return sum(list) / list.size();
-    }
-
-    /**
-     * Normalizes a supervisor resource map or topology details map's keys to 
universal resource names.
-     * @param resourceMap resource map of either Supervisor or Topology
-     * @return the resource map with common resource names
-     */
-    public static Map<String, Double> normalizedResourceMap(Map<String, 
Double> resourceMap) {
-        Map<String, Double> result = new HashMap();
-
-        result.putAll(resourceMap);
-        for (Map.Entry entry: resourceMap.entrySet()) {
-            if (resourceNameMapping.containsKey(entry.getKey())) {
-                result.put(resourceNameMapping.get(entry.getKey()), 
ObjectReader.getDouble(entry.getValue(), 0.0));
-                result.remove(entry.getKey());
-            }
-        }
-        return result;
-    }
-
-    public static Map<String, Double> addResources(Map<String, Double> 
resourceMap1, Map<String, Double> resourceMap2) {
-        Map<String, Double> result = new HashMap();
-
-        result.putAll(resourceMap1);
-
-        for (Map.Entry<String, Double> entry: resourceMap2.entrySet()) {
-            if (result.containsKey(entry.getKey())) {
-                result.put(entry.getKey(), 
ObjectReader.getDouble(entry.getValue(),
-                        0.0) + 
ObjectReader.getDouble(resourceMap1.get(entry.getKey()), 0.0));
-            } else {
-                result.put(entry.getKey(), entry.getValue());
-            }
-        }
-        return result;
-
-    }
-
-    public static Double getMinValuePresentInResourceMap(Map<String, Double> 
resourceMap) {
-        return Collections.min(resourceMap.values());
-    }
-
-    public static Map<String, Double> 
getPercentageOfTotalResourceMap(Map<String, Double> resourceMap, Map<String, 
Double> totalResourceMap) {
-        Map<String, Double> result = new HashMap();
-
-        for(Map.Entry<String, Double> entry: totalResourceMap.entrySet()) {
-            if (resourceMap.containsKey(entry.getKey())) {
-                result.put(entry.getKey(),  
(ObjectReader.getDouble(resourceMap.get(entry.getKey()))/ entry.getValue()) * 
100.0) ;
-            } else {
-                result.put(entry.getKey(), 0.0);
-            }
-        }
-        return result;
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 07c908e..67bc867 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -39,6 +39,7 @@ import org.apache.storm.scheduler.Component;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
 import org.apache.storm.scheduler.resource.RAS_Node;
 import org.apache.storm.scheduler.resource.RAS_Nodes;
 import org.apache.storm.scheduler.resource.ResourceUtils;
@@ -129,9 +130,9 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
      * a class to contain individual object resources as well as cumulative 
stats.
      */
     static class AllResources {
-        List<ObjectResources> objectResources = new 
LinkedList<ObjectResources>();
-        Map<String, Double> availableResourcesOverall = new HashMap<>();
-        Map<String, Double> totalResourcesOverall = new HashMap<>();
+        List<ObjectResources> objectResources = new LinkedList<>();
+        NormalizedResourceOffer availableResourcesOverall = new 
NormalizedResourceOffer();
+        NormalizedResourceOffer totalResourcesOverall = new 
NormalizedResourceOffer();
         String identifier;
 
         public AllResources(String identifier) {
@@ -139,19 +140,19 @@ public abstract class BaseResourceAwareStrategy 
implements IStrategy {
         }
 
         public AllResources(AllResources other) {
-            this(   null,
-                    new HashMap<>(other.availableResourcesOverall),
-                    new HashMap(other.totalResourcesOverall),
-                    other.identifier);
+            this (null,
+                new NormalizedResourceOffer(other.availableResourcesOverall),
+                new NormalizedResourceOffer(other.totalResourcesOverall),
+                other.identifier);
             List<ObjectResources> objectResourcesList = new ArrayList<>();
             for (ObjectResources objectResource : other.objectResources) {
                 objectResourcesList.add(new ObjectResources(objectResource));
             }
             this.objectResources = objectResourcesList;
-
         }
 
-        public AllResources(List<ObjectResources> objectResources, Map<String, 
Double> availableResourcesOverall, Map<String, Double> totalResourcesOverall, 
String identifier) {
+        public AllResources(List<ObjectResources> objectResources, 
NormalizedResourceOffer availableResourcesOverall,
+                            NormalizedResourceOffer totalResourcesOverall, 
String identifier) {
             this.objectResources = objectResources;
             this.availableResourcesOverall = availableResourcesOverall;
             this.totalResourcesOverall = totalResourcesOverall;
@@ -163,10 +164,10 @@ public abstract class BaseResourceAwareStrategy 
implements IStrategy {
      * class to keep track of resources on a rack or node.
      */
     static class ObjectResources {
-        String id;
-        Map<String, Double> availableResources = new HashMap<>();
-        Map<String, Double> totalResources = new HashMap<>();
-        double effectiveResources = 0.0;
+        public final String id;
+        public NormalizedResourceOffer availableResources = new 
NormalizedResourceOffer();
+        public NormalizedResourceOffer totalResources = new 
NormalizedResourceOffer();
+        public double effectiveResources = 0.0;
 
         public ObjectResources(String id) {
             this.id = id;
@@ -176,7 +177,8 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
             this(other.id, other.availableResources, other.totalResources, 
other.effectiveResources);
         }
 
-        public ObjectResources(String id, Map<String, Double> 
availableResources, Map<String, Double> totalResources, double 
effectiveResources) {
+        public ObjectResources(String id, NormalizedResourceOffer 
availableResources, NormalizedResourceOffer totalResources,
+                               double effectiveResources) {
             this.id = id;
             this.availableResources = availableResources;
             this.totalResources = totalResources;
@@ -220,10 +222,8 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
             node.totalResources = rasNode.getTotalResources();
 
             nodes.add(node);
-            allResources.availableResourcesOverall = 
ResourceUtils.addResources(
-                    allResources.availableResourcesOverall, 
node.availableResources);
-            allResources.totalResourcesOverall = ResourceUtils.addResources(
-                    allResources.totalResourcesOverall, node.totalResources);
+            
allResources.availableResourcesOverall.add(node.availableResources);
+            allResources.totalResourcesOverall.add(node.totalResources);
 
         }
 
@@ -234,7 +234,7 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
             allResources.totalResourcesOverall);
 
         String topoId = topologyDetails.getId();
-        return this.sortObjectResources(
+        return sortObjectResources(
             allResources,
             exec,
             topologyDetails,
@@ -243,7 +243,7 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
                 public int getNumExistingSchedule(String objectId) {
 
                     //Get execs already assigned in rack
-                    Collection<ExecutorDetails> execs = new 
LinkedList<ExecutorDetails>();
+                    Collection<ExecutorDetails> execs = new LinkedList<>();
                     if (cluster.getAssignmentById(topoId) != null) {
                         for (Map.Entry<ExecutorDetails, WorkerSlot> entry :
                             
cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
@@ -331,13 +331,13 @@ public abstract class BaseResourceAwareStrategy 
implements IStrategy {
             racks.add(rack);
             for (String nodeId : nodeIds) {
                 RAS_Node node = nodes.getNodeById(nodeHostnameToId(nodeId));
-                rack.availableResources = 
ResourceUtils.addResources(rack.availableResources, 
node.getTotalAvailableResources());
-                rack.totalResources = 
ResourceUtils.addResources(rack.totalResources, node.getTotalResources());
+                rack.availableResources.add(node.getTotalAvailableResources());
+                rack.totalResources.add(node.getTotalAvailableResources());
 
                 nodeIdToRackId.put(nodeId, rack.id);
 
-                allResources.totalResourcesOverall = 
ResourceUtils.addResources(allResources.totalResourcesOverall, 
rack.totalResources);
-                allResources.availableResourcesOverall = 
ResourceUtils.addResources(allResources.availableResourcesOverall, 
rack.availableResources);
+                allResources.totalResourcesOverall.add(rack.totalResources);
+                
allResources.availableResourcesOverall.add(rack.availableResources);
 
             }
         }
@@ -347,30 +347,27 @@ public abstract class BaseResourceAwareStrategy 
implements IStrategy {
             allResources.totalResourcesOverall);
 
         String topoId = topologyDetails.getId();
-        return this.sortObjectResources(
+        return sortObjectResources(
             allResources,
             exec,
             topologyDetails,
-            new ExistingScheduleFunc() {
-                @Override
-                public int getNumExistingSchedule(String objectId) {
-                    String rackId = objectId;
-                    //Get execs already assigned in rack
-                    Collection<ExecutorDetails> execs = new 
LinkedList<ExecutorDetails>();
-                    if (cluster.getAssignmentById(topoId) != null) {
-                        for (Map.Entry<ExecutorDetails, WorkerSlot> entry :
-                            
cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
-                            String nodeId = entry.getValue().getNodeId();
-                            String hostname = idToNode(nodeId).getHostname();
-                            ExecutorDetails exec = entry.getKey();
-                            if (nodeIdToRackId.get(hostname) != null
-                                && 
nodeIdToRackId.get(hostname).equals(rackId)) {
-                                execs.add(exec);
-                            }
+            (objectId) -> {
+                String rackId = objectId;
+                //Get execs already assigned in rack
+                Collection<ExecutorDetails> execs = new LinkedList<>();
+                if (cluster.getAssignmentById(topoId) != null) {
+                    for (Map.Entry<ExecutorDetails, WorkerSlot> entry :
+                        
cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
+                        String nodeId = entry.getValue().getNodeId();
+                        String hostname = idToNode(nodeId).getHostname();
+                        ExecutorDetails exec1 = entry.getKey();
+                        if (nodeIdToRackId.get(hostname) != null
+                            && nodeIdToRackId.get(hostname).equals(rackId)) {
+                            execs.add(exec1);
                         }
                     }
-                    return execs.size();
                 }
+                return execs.size();
             });
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index adf05f5..fc746b0 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -68,7 +68,7 @@ public class DefaultResourceAwareStrategy extends 
BaseResourceAwareStrategy impl
         Collection<ExecutorDetails> executorsNotScheduled = new 
HashSet<>(unassignedExecutors);
         List<String> favoredNodes = (List<String>) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
         List<String> unFavoredNodes = (List<String>) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
-        final List<ObjectResources> sortedNodes = this.sortAllNodes(td, null, 
favoredNodes, unFavoredNodes);
+        final List<ObjectResources> sortedNodes = sortAllNodes(td, null, 
favoredNodes, unFavoredNodes);
 
         for (ExecutorDetails exec : orderedExecutors) {
             LOG.debug(
@@ -127,32 +127,8 @@ public class DefaultResourceAwareStrategy extends 
BaseResourceAwareStrategy impl
             final ExistingScheduleFunc existingScheduleFunc) {
 
         for (ObjectResources objectResources : allResources.objectResources) {
-            StringBuilder sb = new StringBuilder();
-            if 
(ResourceUtils.getMinValuePresentInResourceMap(objectResources.availableResources)
 <= 0) {
-                objectResources.effectiveResources = 0.0;
-            } else {
-                List<Double> values = new LinkedList<>();
-
-                Map<String, Double> percentageTotal = 
ResourceUtils.getPercentageOfTotalResourceMap(
-                        objectResources.availableResources, 
allResources.availableResourcesOverall
-                );
-                for(Map.Entry<String, Double> percentageEntry : 
percentageTotal.entrySet()) {
-                    values.add(percentageEntry.getValue());
-                    sb.append(String.format("%s %f(%f%%) ", 
percentageEntry.getKey(),
-                            
objectResources.availableResources.get(percentageEntry.getKey()),
-                            percentageEntry.getValue())
-                    );
-
-                }
-
-                objectResources.effectiveResources = Collections.min(values);
-            }
-            LOG.debug(
-                    "{}: Avail [ {} ] Total [ {} ] effective resources: {}",
-                    objectResources.id,
-                    sb.toString(),
-                    objectResources.totalResources,
-                    objectResources.effectiveResources);
+            objectResources.effectiveResources =
+                
allResources.availableResourcesOverall.calculateMinPercentageUsedBy(objectResources.availableResources);
         }
 
         TreeSet<ObjectResources> sortedObjectResources =
@@ -169,13 +145,8 @@ public class DefaultResourceAwareStrategy extends 
BaseResourceAwareStrategy impl
                         } else if (o1.effectiveResources < 
o2.effectiveResources) {
                             return 1;
                         } else {
-                            Collection<Double> o1Values = 
ResourceUtils.getPercentageOfTotalResourceMap(
-                                    o1.availableResources, 
allResources.availableResourcesOverall).values();
-                            Collection<Double> o2Values = 
ResourceUtils.getPercentageOfTotalResourceMap(
-                                    o2.availableResources, 
allResources.availableResourcesOverall).values();
-
-                            double o1Avg = ResourceUtils.avg(o1Values);
-                            double o2Avg = ResourceUtils.avg(o2Values);
+                            double o1Avg = 
allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
+                            double o2Avg = 
allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
 
                             if (o1Avg > o2Avg) {
                                 return -1;

http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
index a4de7c2..893e289 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
@@ -18,8 +18,6 @@
 
 package org.apache.storm.scheduler.resource.strategies.scheduling;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -27,7 +25,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
-
 import org.apache.storm.Config;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.Component;
@@ -133,31 +130,7 @@ public class GenericResourceAwareStrategy extends 
BaseResourceAwareStrategy impl
             final AllResources allResources, ExecutorDetails exec, 
TopologyDetails topologyDetails,
             final ExistingScheduleFunc existingScheduleFunc) {
 
-        Map<String, Double> requestedResources = 
topologyDetails.getTotalResources(exec);
         AllResources affinityBasedAllResources = new 
AllResources(allResources);
-        for (ObjectResources objectResources : 
affinityBasedAllResources.objectResources) {
-            StringBuilder sb = new StringBuilder();
-            List<Double> values = new LinkedList<>();
-
-            for (Map.Entry<String, Double> availableResourcesEntry : 
objectResources.availableResources.entrySet()) {
-                if 
(!requestedResources.containsKey(availableResourcesEntry.getKey())) {
-                    
objectResources.availableResources.put(availableResourcesEntry.getKey(), -1.0 * 
availableResourcesEntry.getValue());
-                }
-            }
-
-            Map<String, Double> percentageTotal = 
ResourceUtils.getPercentageOfTotalResourceMap(
-                    objectResources.availableResources, 
allResources.availableResourcesOverall
-            );
-            for(Map.Entry<String, Double> percentageEntry : 
percentageTotal.entrySet()) {
-                values.add(percentageEntry.getValue());
-                sb.append(String.format("%s %f(%f%%) ", 
percentageEntry.getKey(),
-                        
objectResources.availableResources.get(percentageEntry.getKey()),
-                        percentageEntry.getValue())
-                );
-
-            }
-
-        }
 
         TreeSet<ObjectResources> sortedObjectResources =
                 new TreeSet<>((o1, o2) -> {
@@ -168,13 +141,8 @@ public class GenericResourceAwareStrategy extends 
BaseResourceAwareStrategy impl
                     } else if (execsScheduled1 < execsScheduled2) {
                         return 1;
                     } else {
-                        Collection<Double> o1Values = 
ResourceUtils.getPercentageOfTotalResourceMap(
-                                o1.availableResources, 
allResources.availableResourcesOverall).values();
-                        Collection<Double> o2Values = 
ResourceUtils.getPercentageOfTotalResourceMap(
-                                o2.availableResources, 
allResources.availableResourcesOverall).values();
-
-                        double o1Avg = ResourceUtils.avg(o1Values);
-                        double o2Avg = ResourceUtils.avg(o2Values);
+                        double o1Avg = 
allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
+                        double o2Avg = 
allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
 
                         if (o1Avg > o2Avg) {
                             return -1;

http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java 
b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index bd4c4fe..a64c9b4 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -73,6 +73,7 @@ import org.apache.storm.generated.ReadableBlobMeta;
 import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
 import org.apache.storm.scheduler.resource.ResourceUtils;
 import org.apache.storm.security.auth.SingleUserPrincipal;
 import org.apache.thrift.TException;
@@ -715,30 +716,33 @@ public class ServerUtils {
         return false;
     }
 
-    public static int getEstimatedWorkerCountForRASTopo(Map<String, Object> 
topoConf, StormTopology topology) throws InvalidTopologyException {
+    public static int getEstimatedWorkerCountForRASTopo(Map<String, Object> 
topoConf, StormTopology topology)
+        throws InvalidTopologyException {
         return (int) 
Math.ceil(getEstimatedTotalHeapMemoryRequiredByTopo(topoConf, topology) /
                 
ObjectReader.getDouble(topoConf.get(Config.WORKER_HEAP_MEMORY_MB)));
     }
 
-    public static double getEstimatedTotalHeapMemoryRequiredByTopo(Map<String, 
Object> topoConf, StormTopology topology) throws InvalidTopologyException {
+    public static double getEstimatedTotalHeapMemoryRequiredByTopo(Map<String, 
Object> topoConf, StormTopology topology)
+        throws InvalidTopologyException {
         Map<String, Integer> componentParallelism = 
getComponentParallelism(topoConf, topology);
         double totalMemoryRequired = 0.0;
 
-        for(Map.Entry<String, Map<String, Double>> entry: 
ResourceUtils.getBoltsResources(topology, topoConf).entrySet()) {
+        for (Map.Entry<String, NormalizedResourceRequest> entry: 
ResourceUtils.getBoltsResources(topology, topoConf).entrySet()) {
             int parallelism = 
componentParallelism.getOrDefault(entry.getKey(), 1);
-            double memoryRequirement = 
entry.getValue().get(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
+            double memoryRequirement = entry.getValue().getOnHeapMemoryMb();
             totalMemoryRequired += memoryRequirement * parallelism;
         }
 
-        for(Map.Entry<String, Map<String, Double>> entry: 
ResourceUtils.getSpoutsResources(topology, topoConf).entrySet()) {
+        for (Map.Entry<String, NormalizedResourceRequest> entry: 
ResourceUtils.getSpoutsResources(topology, topoConf).entrySet()) {
             int parallelism = 
componentParallelism.getOrDefault(entry.getKey(), 1);
-            double memoryRequirement = 
entry.getValue().get(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
+            double memoryRequirement = entry.getValue().getOnHeapMemoryMb();
             totalMemoryRequired += memoryRequirement * parallelism;
         }
         return totalMemoryRequired;
     }
 
-    public static Map<String, Integer> getComponentParallelism(Map<String, 
Object> topoConf, StormTopology topology) throws InvalidTopologyException {
+    public static Map<String, Integer> getComponentParallelism(Map<String, 
Object> topoConf, StormTopology topology)
+        throws InvalidTopologyException {
         Map<String, Integer> ret = new HashMap<>();
         Map<String, Object> components = StormCommon.allComponents(topology);
         for (Map.Entry<String, Object> entry : components.entrySet()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 16c4fb2..38b68c0 100644
--- 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -333,7 +333,7 @@ public class TestResourceAwareScheduler {
             executorsOnSupervisor.add(entry.getKey());
         }
         for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : 
supervisorToExecutors.entrySet()) {
-            Double supervisorTotalCpu = entry.getKey().getTotalCPU();
+            Double supervisorTotalCpu = entry.getKey().getTotalCpu();
             Double supervisorTotalMemory = entry.getKey().getTotalMemory();
             Double supervisorUsedCpu = 0.0;
             Double supervisorUsedMemory = 0.0;
@@ -501,6 +501,10 @@ public class TestResourceAwareScheduler {
 
     @Test
     public void testHeterogeneousCluster() {
+        Map<String, Double> test = new HashMap<>();
+        test.put("gpu.count", 0.0);
+        new NormalizedResourceOffer(test);
+        LOG.info("\n\n\t\ttestHeterogeneousCluster");
         INimbus iNimbus = new INimbusTest();
         Map<String, Double> resourceMap1 = new HashMap<>(); // strong 
supervisor node
         resourceMap1.put(Config.SUPERVISOR_CPU_CAPACITY, 800.0);
@@ -509,10 +513,10 @@ public class TestResourceAwareScheduler {
         resourceMap2.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
         resourceMap2.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0);
 
-        resourceMap1 = ResourceUtils.normalizedResourceMap(resourceMap1);
-        resourceMap2 = ResourceUtils.normalizedResourceMap(resourceMap2);
+        resourceMap1 = NormalizedResources.normalizedResourceMap(resourceMap1);
+        resourceMap2 = NormalizedResources.normalizedResourceMap(resourceMap2);
 
-        Map<String, SupervisorDetails> supMap = new HashMap<String, 
SupervisorDetails>();
+        Map<String, SupervisorDetails> supMap = new HashMap<>();
         for (int i = 0; i < 2; i++) {
             List<Number> ports = new LinkedList<>();
             for (int j = 0; j < 4; j++) {
@@ -521,6 +525,7 @@ public class TestResourceAwareScheduler {
             SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" 
+ i, null, ports, i == 0 ? resourceMap1 : resourceMap2);
             supMap.put(sup.getId(), sup);
         }
+        LOG.info("SUPERVISORS = {}", supMap);
 
         // topo1 has one single huge task that can not be handled by the 
small-super
         TopologyBuilder builder1 = new TopologyBuilder();
@@ -569,6 +574,7 @@ public class TestResourceAwareScheduler {
 
         // Test1: Launch topo 1-3 together, it should be able to use up either 
mem or cpu resource due to exact division
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
+        LOG.info("\n\n\t\tScheduling topologies 1, 2 and 3");
         Topologies topologies = new Topologies(topology1, topology2, 
topology3);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), 
topologies, config1);
         rs.prepare(config1);
@@ -583,14 +589,17 @@ public class TestResourceAwareScheduler {
 
         final Double EPSILON = 0.0001;
         for (SupervisorDetails supervisor : supMap.values()) {
-            Double cpuAvailable = supervisor.getTotalCPU();
+            Double cpuAvailable = supervisor.getTotalCpu();
             Double memAvailable = supervisor.getTotalMemory();
             Double cpuUsed = superToCpu.get(supervisor);
             Double memUsed = superToMem.get(supervisor);
-            assertTrue((Math.abs(memAvailable - memUsed) < EPSILON) || 
(Math.abs(cpuAvailable - cpuUsed) < EPSILON));
+
+            assertTrue(supervisor.getId() + " MEM: "+ memAvailable + " == " + 
memUsed + " OR CPU: " + cpuAvailable + " == " + cpuUsed,
+                (Math.abs(memAvailable - memUsed) < EPSILON) || 
(Math.abs(cpuAvailable - cpuUsed) < EPSILON));
         }
         // end of Test1
 
+        LOG.warn("\n\n\t\tSwitching to topologies 1, 2 and 4");
         // Test2: Launch topo 1, 2 and 4, they together request a little more 
mem than available, so one of the 3 topos will not be scheduled
         topologies = new Topologies(topology1, topology2, topology4);
         cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, 
config1);
@@ -598,17 +607,21 @@ public class TestResourceAwareScheduler {
         rs.schedule(topologies, cluster);
         int numTopologiesAssigned = 0;
         if (cluster.getStatusMap().get(topology1.getId()).equals("Running - 
Fully Scheduled by DefaultResourceAwareStrategy")) {
+            LOG.info("TOPO 1 scheduled");
             numTopologiesAssigned++;
         }
         if (cluster.getStatusMap().get(topology2.getId()).equals("Running - 
Fully Scheduled by DefaultResourceAwareStrategy")) {
+            LOG.info("TOPO 2 scheduled");
             numTopologiesAssigned++;
         }
         if (cluster.getStatusMap().get(topology4.getId()).equals("Running - 
Fully Scheduled by DefaultResourceAwareStrategy")) {
+            LOG.info("TOPO 3 scheduled");
             numTopologiesAssigned++;
         }
         assertEquals(2, numTopologiesAssigned);
         //end of Test2
 
+        LOG.info("\n\n\t\tScheduling just topo 5");
         //Test3: "Launch topo5 only, both mem and cpu should be exactly used 
up"
         topologies = new Topologies(topology5);
         cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, 
config1);
@@ -617,7 +630,7 @@ public class TestResourceAwareScheduler {
         superToCpu = getSupervisorToCpuUsage(cluster, topologies);
         superToMem = getSupervisorToMemoryUsage(cluster, topologies);
         for (SupervisorDetails supervisor : supMap.values()) {
-            Double cpuAvailable = supervisor.getTotalCPU();
+            Double cpuAvailable = supervisor.getTotalCpu();
             Double memAvailable = supervisor.getTotalMemory();
             Double cpuUsed = superToCpu.get(supervisor);
             Double memUsed = superToMem.get(supervisor);

http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 7fa3c3b..4a2e644 100644
--- 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -65,7 +65,7 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static 
org.apache.storm.scheduler.resource.ResourceUtils.normalizedResourceMap;
+import static 
org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
 
 public class TestUtilsForResourceAwareScheduler {
     private static final Logger LOG = 
LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/6a4aeb91/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index 5990d2c..0f7fed9 100644
--- 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -201,7 +201,7 @@ public class TestDefaultResourceAwareStrategy {
     @Test
     public void testMultipleRacks() {
 
-        final Map<String, SupervisorDetails> supMap = new HashMap<String, 
SupervisorDetails>();
+        final Map<String, SupervisorDetails> supMap = new HashMap<>();
         final Map<String, SupervisorDetails> supMapRack1 = genSupervisors(10, 
4, 0, 400, 8000);
         //generate another rack of supervisors with less resources
         final Map<String, SupervisorDetails> supMapRack2 = genSupervisors(10, 
4, 10, 200, 4000);
@@ -229,7 +229,7 @@ public class TestDefaultResourceAwareStrategy {
         DNSToSwitchMapping TestNetworkTopographyPlugin = new 
DNSToSwitchMapping() {
             @Override
             public Map<String, String> resolve(List<String> names) {
-                Map<String, String> ret = new HashMap<String, String>();
+                Map<String, String> ret = new HashMap<>();
                 for (SupervisorDetails sup : supMapRack1.values()) {
                     ret.put(sup.getHost(), "rack-0");
                 }
@@ -267,7 +267,7 @@ public class TestDefaultResourceAwareStrategy {
             String rack = entry.getValue();
             List<String> nodesForRack = rackToNodes.get(rack);
             if (nodesForRack == null) {
-                nodesForRack = new ArrayList<String>();
+                nodesForRack = new ArrayList<>();
                 rackToNodes.put(rack, nodesForRack);
             }
             nodesForRack.add(hostName);
@@ -277,7 +277,8 @@ public class TestDefaultResourceAwareStrategy {
         DefaultResourceAwareStrategy rs = new DefaultResourceAwareStrategy();
         
         rs.prepare(cluster);
-        TreeSet<ObjectResources> sortedRacks= rs.sortRacks(null, topo1);
+        TreeSet<ObjectResources> sortedRacks = rs.sortRacks(null, topo1);
+        LOG.info("Sorted Racks {}", sortedRacks);
 
         Assert.assertEquals("# of racks sorted", 5, sortedRacks.size());
         Iterator<ObjectResources> it = sortedRacks.iterator();

Reply via email to