Rebased and added changes suggested by @pdread100. Tested again on 4 node cluster all appears well
Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/e60bbaa7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/e60bbaa7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/e60bbaa7 Branch: refs/heads/master Commit: e60bbaa7af4dc619e7b9bb8b6d99d95a677ba357 Parents: bfbefc3 Author: DarinJ <n...@thanks.com> Authored: Fri Jul 10 01:00:12 2015 -0400 Committer: DarinJ <n...@thanks.com> Committed: Fri Jul 10 01:19:05 2015 -0400 ---------------------------------------------------------------------- .../handlers/ResourceOffersEventHandler.java | 113 ++++++++++++------- 1 file changed, 74 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/e60bbaa7/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java index dae3a03..c009ce8 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java @@ -80,6 +80,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv NodeTask taskToLaunch = schedulerState .getTask(pendingTaskId); NMProfile profile = taskToLaunch.getProfile(); + if (matches(offer, profile) && SchedulerUtils.isUniqueHostname(offer, schedulerState.getActiveTasks())) { @@ -124,54 +125,34 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv } private boolean matches(Offer offer, NMProfile profile) { - double cpus = -1; - double mem = -1; - int ports = 0; + Map<String, Object> results = new HashMap<String, Object>(5); + for (Resource resource : offer.getResourcesList()) { - if (resource.getName().equals("cpus")) { - if (resource.getType().equals(Value.Type.SCALAR)) { - cpus = resource.getScalar().getValue(); - } else { - LOGGER.error("Cpus resource was not a scalar: {}", resource - .getType().toString()); - } - } else if (resource.getName().equals("mem")) { - if (resource.getType().equals(Value.Type.SCALAR)) { - mem = resource.getScalar().getValue(); - } else { - LOGGER.error("Mem resource was not a scalar: {}", resource - .getType().toString()); - } - } else if (resource.getName().equals("disk")) { - LOGGER.warn("Ignoring disk resources from offer"); - } else if (resource.getName().equals("ports")) { - if (resource.getType().equals(Value.Type.RANGES)) { - Value.Ranges ranges = resource.getRanges(); - for (Value.Range range : ranges.getRangeList()) { - if (range.getBegin() < range.getEnd()) { - ports += range.getEnd() - range.getBegin() + 1; - } - } - } else { - LOGGER.error("ports resource was not Ranges: {}", resource - .getType().toString()); - } + if (resourceEvaluators.containsKey(resource.getName())) { + resourceEvaluators.get(resource.getName()).eval(resource, results); } else { LOGGER.warn("Ignoring unknown resource type: {}", resource.getName()); } } + double cpus = (Double) results.get("cpus"); + double mem = (Double) results.get("mem"); + int ports = (Integer) results.get("ports"); - if (cpus < 0) { - LOGGER.error("No cpus resource present"); - } - if (mem < 0) { - LOGGER.error("No mem resource present"); - } - if (ports < 0) { - LOGGER.error("No port resources present"); + checkResource(cpus < 0, "cpus"); + checkResource(mem < 0, "mem"); + checkResource(ports < 0, "port"); + + return checkAggregates(offer, profile, ports, cpus, mem); + } + + private void checkResource(boolean fail, String resource) { + if (fail) { + LOGGER.info("No " + resource + " resources present"); } + } + private boolean checkAggregates(Offer offer, NMProfile profile, int ports, double cpus, double mem) { Map<String, String> requestAttributes = new HashMap<>(); if (taskUtils.getAggregateCpus(profile) <= cpus @@ -185,4 +166,58 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv return false; } } + + private static Double scalarToDouble(Resource resource, String id) { + Double value = new Double(0.0); + if (resource.getType().equals(Value.Type.SCALAR)) { + value = new Double(resource.getScalar().getValue()); + } else { + LOGGER.error(id + " resource was not a scalar: {}", resource + .getType().toString()); + } + return value; + } + + private interface EvalResources { + public void eval(Resource resource, Map<String, Object>results); + } + + private static Map<String, EvalResources> resourceEvaluators; + + static { + resourceEvaluators = new HashMap<String, EvalResources>(4); + resourceEvaluators.put("cpus", new EvalResources() { + public void eval(Resource resource, Map<String, Object> results) { + results.put("cpus", scalarToDouble(resource, "cpus")); + } + }); + resourceEvaluators.put("mem", new EvalResources() { + public void eval(Resource resource, Map<String, Object> results) { + results.put("mem", scalarToDouble(resource, "mem")); + } + }); + resourceEvaluators.put("disk", new EvalResources() { + public void eval(Resource resource, Map<String, Object> results) { + } + }); + resourceEvaluators.put("ports", new EvalResources() { + public void eval(Resource resource, Map<String, Object> results) { + int ports = 0; + if (resource.getType().equals(Value.Type.RANGES)) { + Value.Ranges ranges = resource.getRanges(); + for (Value.Range range : ranges.getRangeList()) { + if (range.getBegin() < range.getEnd()) { + ports += range.getEnd() - range.getBegin() + 1; + } + } + + } else { + LOGGER.error("ports resource was not Ranges: {}", resource + .getType().toString()); + + } + results.put("ports", Integer.valueOf(ports)); + } + }); + } }