Merge two functions
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/58f1161c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/58f1161c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/58f1161c Branch: refs/heads/master Commit: 58f1161cb077f90fb64e8a68f9da9c9aedf4f7dd Parents: 6340601 Author: zhuol <[email protected]> Authored: Thu Mar 24 17:31:13 2016 -0500 Committer: zhuol <[email protected]> Committed: Thu Mar 24 17:31:13 2016 -0500 ---------------------------------------------------------------------- .../resource/TestResourceAwareScheduler.java | 10 +++-- .../TestUtilsForResourceAwareScheduler.java | 44 ++++---------------- 2 files changed, 13 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/58f1161c/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java index 28fd491..9cfdc6e 100644 --- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java +++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java @@ -616,8 +616,9 @@ public class TestResourceAwareScheduler { Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId())); Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology3.getId())); - Map<SupervisorDetails, Double> superToCpu = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies); - Map<SupervisorDetails, Double> superToMem = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies); + Map<SupervisorDetails, Double> superToCpu = new HashMap<>(); + Map<SupervisorDetails, Double> superToMem = new HashMap<>(); + TestUtilsForResourceAwareScheduler.getSupervisorToResourceUsage(cluster, topologies, superToCpu, superToMem); final Double EPSILON = 0.0001; for (SupervisorDetails supervisor : supMap.values()) { @@ -658,8 +659,9 @@ public class TestResourceAwareScheduler { topologies = new Topologies(topoMap); rs.prepare(config1); rs.schedule(topologies, cluster); - superToCpu = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies); - superToMem = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies); + superToCpu = new HashMap<>(); + superToMem = new HashMap<>(); + TestUtilsForResourceAwareScheduler.getSupervisorToResourceUsage(cluster, topologies, superToCpu, superToMem); for (SupervisorDetails supervisor : supMap.values()) { Double cpuAvailable = supervisor.getTotalCPU(); Double memAvailable = supervisor.getTotalMemory(); http://git-wip-us.apache.org/repos/asf/storm/blob/58f1161c/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java index 7cd21ce..8b7f9c8 100644 --- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java +++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java @@ -289,46 +289,14 @@ public class TestUtilsForResourceAwareScheduler { return ret; } - public static Map<SupervisorDetails, Double> getSupervisorToMemoryUsage(Cluster cluster, Topologies topologies) { - Map<SupervisorDetails, Double> superToMem = new HashMap<>(); - Collection<SchedulerAssignment> assignments = cluster.getAssignments().values(); - Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values(); - for (SupervisorDetails supervisor : supervisors) { - superToMem.put(supervisor, 0.0); - } - - for (SchedulerAssignment assignment : assignments) { - Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>(); - Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>(); - TopologyDetails topology = topologies.getById(assignment.getTopologyId()); - for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment.getExecutorToSlot().entrySet()) { - executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId())); - } - for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) { - List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue()); - if (executorsOnSupervisor == null) { - executorsOnSupervisor = new ArrayList<>(); - supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor); - } - executorsOnSupervisor.add(entry.getKey()); - } - for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) { - Double supervisorUsedMemory = 0.0; - for (ExecutorDetails executor: entry.getValue()) { - supervisorUsedMemory += topology.getTotalMemReqTask(executor); - } - superToMem.put(entry.getKey(), superToMem.get(entry.getKey()) + supervisorUsedMemory); - } - } - return superToMem; - } - - public static Map<SupervisorDetails, Double> getSupervisorToCpuUsage(Cluster cluster, Topologies topologies) { - Map<SupervisorDetails, Double> superToCpu = new HashMap<>(); + public static void getSupervisorToResourceUsage(Cluster cluster, Topologies topologies, + Map<SupervisorDetails, Double> superToCpu, + Map<SupervisorDetails, Double> superToMem) { Collection<SchedulerAssignment> assignments = cluster.getAssignments().values(); Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values(); for (SupervisorDetails supervisor : supervisors) { superToCpu.put(supervisor, 0.0); + superToMem.put(supervisor, 0.0); } for (SchedulerAssignment assignment : assignments) { @@ -348,12 +316,14 @@ public class TestUtilsForResourceAwareScheduler { } for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) { Double supervisorUsedCpu = 0.0; + Double supervisorUsedMemory = 0.0; for (ExecutorDetails executor: entry.getValue()) { supervisorUsedCpu += topology.getTotalCpuReqTask(executor); + supervisorUsedMemory += topology.getTotalMemReqTask(executor); } superToCpu.put(entry.getKey(), superToCpu.get(entry.getKey()) + supervisorUsedCpu); + superToMem.put(entry.getKey(), superToMem.get(entry.getKey()) + supervisorUsedMemory); } } - return superToCpu; } }
