Rebased and added changes suggested by @pdread100.

Tested again on 4 node cluster all appears well


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/e60bbaa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/e60bbaa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/e60bbaa7

Branch: refs/heads/master
Commit: e60bbaa7af4dc619e7b9bb8b6d99d95a677ba357
Parents: bfbefc3
Author: DarinJ <n...@thanks.com>
Authored: Fri Jul 10 01:00:12 2015 -0400
Committer: DarinJ <n...@thanks.com>
Committed: Fri Jul 10 01:19:05 2015 -0400

----------------------------------------------------------------------
 .../handlers/ResourceOffersEventHandler.java    | 113 ++++++++++++-------
 1 file changed, 74 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/e60bbaa7/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
index dae3a03..c009ce8 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
@@ -80,6 +80,7 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
                         NodeTask taskToLaunch = schedulerState
                                 .getTask(pendingTaskId);
                         NMProfile profile = taskToLaunch.getProfile();
+
                         if (matches(offer, profile)
                                 && SchedulerUtils.isUniqueHostname(offer,
                                 schedulerState.getActiveTasks())) {
@@ -124,54 +125,34 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
     }
 
     private boolean matches(Offer offer, NMProfile profile) {
-        double cpus = -1;
-        double mem = -1;
-        int ports = 0;
+        Map<String, Object> results = new HashMap<String, Object>(5);
+
         for (Resource resource : offer.getResourcesList()) {
-            if (resource.getName().equals("cpus")) {
-                if (resource.getType().equals(Value.Type.SCALAR)) {
-                    cpus = resource.getScalar().getValue();
-                } else {
-                    LOGGER.error("Cpus resource was not a scalar: {}", resource
-                            .getType().toString());
-                }
-            } else if (resource.getName().equals("mem")) {
-                if (resource.getType().equals(Value.Type.SCALAR)) {
-                    mem = resource.getScalar().getValue();
-                } else {
-                    LOGGER.error("Mem resource was not a scalar: {}", resource
-                            .getType().toString());
-                }
-            } else if (resource.getName().equals("disk")) {
-                LOGGER.warn("Ignoring disk resources from offer");
-            } else if (resource.getName().equals("ports")) {
-                if (resource.getType().equals(Value.Type.RANGES)) {
-                    Value.Ranges ranges =  resource.getRanges();
-                    for (Value.Range range : ranges.getRangeList()) {
-                        if (range.getBegin() < range.getEnd()) {
-                            ports += range.getEnd() - range.getBegin() + 1;
-                        }
-                    }
-                } else {
-                    LOGGER.error("ports resource was not Ranges: {}", resource
-                            .getType().toString());
-                }
+            if (resourceEvaluators.containsKey(resource.getName())) {
+                resourceEvaluators.get(resource.getName()).eval(resource, 
results);
             } else {
                 LOGGER.warn("Ignoring unknown resource type: {}",
                         resource.getName());
             }
         }
+        double cpus = (Double) results.get("cpus");
+        double mem = (Double) results.get("mem");
+        int ports = (Integer) results.get("ports");
 
-        if (cpus < 0) {
-            LOGGER.error("No cpus resource present");
-        }
-        if (mem < 0) {
-            LOGGER.error("No mem resource present");
-        }
-        if (ports < 0) {
-            LOGGER.error("No port resources present");
+        checkResource(cpus < 0, "cpus");
+        checkResource(mem < 0, "mem");
+        checkResource(ports < 0, "port");
+
+        return checkAggregates(offer, profile, ports, cpus, mem);
+    }
+
+    private void checkResource(boolean fail, String resource) {
+        if (fail) {
+            LOGGER.info("No " + resource + " resources present");
         }
+    }
 
+    private boolean checkAggregates(Offer offer, NMProfile profile, int ports, 
double cpus, double mem) {
         Map<String, String> requestAttributes = new HashMap<>();
 
         if (taskUtils.getAggregateCpus(profile) <= cpus
@@ -185,4 +166,58 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
             return false;
         }
     }
+
+    private static Double scalarToDouble(Resource resource, String id) {
+        Double value = new Double(0.0);
+        if (resource.getType().equals(Value.Type.SCALAR)) {
+            value = new Double(resource.getScalar().getValue());
+        } else {
+            LOGGER.error(id + " resource was not a scalar: {}", resource
+                    .getType().toString());
+        }
+        return value;
+    }
+
+    private interface EvalResources {
+        public void eval(Resource resource, Map<String, Object>results);
+    }
+
+    private static Map<String, EvalResources> resourceEvaluators;
+
+    static {
+        resourceEvaluators = new HashMap<String, EvalResources>(4);
+        resourceEvaluators.put("cpus", new EvalResources() {
+            public void eval(Resource resource, Map<String, Object> results) {
+                results.put("cpus", scalarToDouble(resource, "cpus"));
+            }
+        });
+        resourceEvaluators.put("mem", new EvalResources() {
+            public void eval(Resource resource, Map<String, Object> results) {
+                results.put("mem", scalarToDouble(resource, "mem"));
+            }
+        });
+        resourceEvaluators.put("disk", new EvalResources() {
+            public void eval(Resource resource, Map<String, Object> results) {
+            }
+        });
+        resourceEvaluators.put("ports", new EvalResources() {
+            public void eval(Resource resource, Map<String, Object> results) {
+                int ports = 0;
+                if (resource.getType().equals(Value.Type.RANGES)) {
+                    Value.Ranges ranges = resource.getRanges();
+                    for (Value.Range range : ranges.getRangeList()) {
+                        if (range.getBegin() < range.getEnd()) {
+                            ports += range.getEnd() - range.getBegin() + 1;
+                        }
+                    }
+
+                } else {
+                    LOGGER.error("ports resource was not Ranges: {}", resource
+                            .getType().toString());
+
+                }
+                results.put("ports", Integer.valueOf(ports));
+            }
+        });
+    }
 }

Reply via email to