http://git-wip-us.apache.org/repos/asf/storm/blob/caf525e6/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java index fbc0421..bc69725 100644 --- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java +++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java @@ -22,9 +22,11 @@ import org.apache.storm.Config; import org.apache.storm.generated.Bolt; import org.apache.storm.generated.SpoutSpec; import org.apache.storm.generated.StormTopology; +import org.apache.storm.scheduler.Cluster; import org.apache.storm.scheduler.ExecutorDetails; import org.apache.storm.scheduler.INimbus; import org.apache.storm.scheduler.IScheduler; +import org.apache.storm.scheduler.SchedulerAssignment; import org.apache.storm.scheduler.SupervisorDetails; import org.apache.storm.scheduler.Topologies; import org.apache.storm.scheduler.TopologyDetails; @@ -45,6 +47,7 @@ import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; @@ -112,12 +115,11 @@ public class TestUtilsForResourceAwareScheduler { public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology) { Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, String>(); int startTask = 0; - int endTask = 1; + int endTask = 0; for (Map.Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) { SpoutSpec spout = entry.getValue(); String spoutId = entry.getKey(); int spoutParallelism = spout.get_common().get_parallelism_hint(); - for (int i = 0; i < spoutParallelism; i++) { retMap.put(new ExecutorDetails(startTask, endTask), spoutId); startTask++; @@ -296,4 +298,72 @@ public class TestUtilsForResourceAwareScheduler { } return ret; } + + public static Map<SupervisorDetails, Double> getSupervisorToMemoryUsage(Cluster cluster, Topologies topologies) { + Map<SupervisorDetails, Double> superToMem = new HashMap<>(); + Collection<SchedulerAssignment> assignments = cluster.getAssignments().values(); + Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values(); + for (SupervisorDetails supervisor : supervisors) { + superToMem.put(supervisor, 0.0); + } + + for (SchedulerAssignment assignment : assignments) { + Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>(); + Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>(); + TopologyDetails topology = topologies.getById(assignment.getTopologyId()); + for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment.getExecutorToSlot().entrySet()) { + executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId())); + } + for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) { + List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue()); + if (executorsOnSupervisor == null) { + executorsOnSupervisor = new ArrayList<>(); + supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor); + } + executorsOnSupervisor.add(entry.getKey()); + } + for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) { + Double supervisorUsedMemory = 0.0; + for (ExecutorDetails executor: entry.getValue()) { + supervisorUsedMemory += topology.getTotalMemReqTask(executor); + } + superToMem.put(entry.getKey(), superToMem.get(entry.getKey()) + supervisorUsedMemory); + } + } + return superToMem; + } + + public static Map<SupervisorDetails, Double> getSupervisorToCpuUsage(Cluster cluster, Topologies topologies) { + Map<SupervisorDetails, Double> superToCpu = new HashMap<>(); + Collection<SchedulerAssignment> assignments = cluster.getAssignments().values(); + Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values(); + for (SupervisorDetails supervisor : supervisors) { + superToCpu.put(supervisor, 0.0); + } + + for (SchedulerAssignment assignment : assignments) { + Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>(); + Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>(); + TopologyDetails topology = topologies.getById(assignment.getTopologyId()); + for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment.getExecutorToSlot().entrySet()) { + executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId())); + } + for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) { + List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue()); + if (executorsOnSupervisor == null) { + executorsOnSupervisor = new ArrayList<>(); + supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor); + } + executorsOnSupervisor.add(entry.getKey()); + } + for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) { + Double supervisorUsedCpu = 0.0; + for (ExecutorDetails executor: entry.getValue()) { + supervisorUsedCpu += topology.getTotalCpuReqTask(executor); + } + superToCpu.put(entry.getKey(), superToCpu.get(entry.getKey()) + supervisorUsedCpu); + } + } + return superToCpu; + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/caf525e6/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java new file mode 100644 index 0000000..1d199b1 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java @@ -0,0 +1,736 @@ +package org.apache.storm.scheduler.resource.strategies.eviction; + +import org.apache.storm.Config; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.INimbus; +import org.apache.storm.scheduler.SchedulerAssignmentImpl; +import org.apache.storm.scheduler.SupervisorDetails; +import org.apache.storm.scheduler.Topologies; +import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.scheduler.resource.ResourceAwareScheduler; +import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler; +import org.apache.storm.utils.Utils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Created by jerrypeng on 10/6/16. + */ +public class TestDefaultEvictionStrategy { + + private static int currentTime = 1450418597; + + /** + * The resources in the cluster are limited. In the first round of scheduling, all resources in the cluster is used. + * User jerry submits another toploogy. Since user jerry has his resource guarantees satisfied, and user bobby + * has exceeded his resource guarantee, topo-3 from user bobby should be evicted. + */ + @Test + public void testEviction() { + INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); + Map<String, Number> resourceMap = new HashMap<String, Number>(); + resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0); + resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0); + Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap); + Config config = new Config(); + config.putAll(Utils.readDefaultConfig()); + config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName()); + config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName()); + config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName()); + config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0); + config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500); + config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500); + Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>(); + resourceUserPool.put("jerry", new HashMap<String, Number>()); + resourceUserPool.get("jerry").put("cpu", 200.0); + resourceUserPool.get("jerry").put("memory", 2000.0); + + resourceUserPool.put("bobby", new HashMap<String, Number>()); + resourceUserPool.get("bobby").put("cpu", 100.0); + resourceUserPool.get("bobby").put("memory", 1000.0); + + resourceUserPool.put("derek", new HashMap<String, Number>()); + resourceUserPool.get("derek").put("cpu", 200.0); + resourceUserPool.get("derek").put("memory", 2000.0); + + config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool); + Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); + + config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); + + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10); + TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 20); + + + config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby"); + + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 10); + TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20); + + config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek"); + + TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 29); + + Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); + topoMap.put(topo1.getId(), topo1); + topoMap.put(topo2.getId(), topo2); + topoMap.put(topo3.getId(), topo3); + topoMap.put(topo4.getId(), topo4); + + Topologies topologies = new Topologies(topoMap); + + ResourceAwareScheduler rs = new ResourceAwareScheduler(); + + rs.prepare(config); + rs.schedule(topologies, cluster); + + for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) { + Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) { + Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) { + Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size()); + + //user jerry submits another topology + topoMap.put(topo6.getId(), topo6); + topologies = new Topologies(topoMap); + + rs.schedule(topologies, cluster); + + for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) { + Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) { + Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) { + Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size()); + + for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) { + Assert.assertFalse("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of attempted topologies", 1, rs.getUser("bobby").getTopologiesAttempted().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size()); + Assert.assertEquals("correct topology to evict", "topo-3", rs.getUser("bobby").getTopologiesAttempted().iterator().next().getName()); + } + + @Test + public void TestEvictMultipleTopologies() { + INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); + Map<String, Number> resourceMap = new HashMap<String, Number>(); + resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0); + resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0); + Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap); + Config config = new Config(); + config.putAll(Utils.readDefaultConfig()); + config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName()); + config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName()); + config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName()); + config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0); + config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500); + config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500); + Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>(); + resourceUserPool.put("jerry", new HashMap<String, Number>()); + resourceUserPool.get("jerry").put("cpu", 200.0); + resourceUserPool.get("jerry").put("memory", 2000.0); + + resourceUserPool.put("derek", new HashMap<String, Number>()); + resourceUserPool.get("derek").put("cpu", 100.0); + resourceUserPool.get("derek").put("memory", 1000.0); + + config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool); + Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); + + config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); + + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 2, 0, 1, 0, currentTime - 2, 10); + + config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby"); + + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 10); + TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20); + + config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek"); + + TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 29); + TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29); + + Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); + topoMap.put(topo2.getId(), topo2); + topoMap.put(topo3.getId(), topo3); + topoMap.put(topo4.getId(), topo4); + topoMap.put(topo5.getId(), topo5); + + Topologies topologies = new Topologies(topoMap); + + ResourceAwareScheduler rs = new ResourceAwareScheduler(); + + rs.prepare(config); + rs.schedule(topologies, cluster); + + for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) { + Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) { + Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size()); + + //user jerry submits another topology + topoMap.put(topo1.getId(), topo1); + topologies = new Topologies(topoMap); + rs.schedule(topologies, cluster); + + for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) { + Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) { + Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) { + Assert.assertFalse("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size()); + Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size()); + } + + /** + * Eviction order: + * topo-3: since user bobby don't have any resource guarantees and topo-3 is the lowest priority for user bobby + * topo-2: since user bobby don't have any resource guarantees and topo-2 is the next lowest priority for user bobby + * topo-5: since user derek has exceeded his resource guarantee while user jerry has not. topo-5 and topo-4 has the same priority + * but topo-4 was submitted earlier thus we choose that one to evict + */ + @Test + public void TestEvictMultipleTopologiesFromMultipleUsersInCorrectOrder() { + INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); + Map<String, Number> resourceMap = new HashMap<String, Number>(); + resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0); + resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0); + Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap); + Config config = new Config(); + config.putAll(Utils.readDefaultConfig()); + config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName()); + config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName()); + config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName()); + config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0); + config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500); + config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500); + Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>(); + resourceUserPool.put("jerry", new HashMap<String, Number>()); + resourceUserPool.get("jerry").put("cpu", 300.0); + resourceUserPool.get("jerry").put("memory", 3000.0); + + resourceUserPool.put("derek", new HashMap<String, Number>()); + resourceUserPool.get("derek").put("cpu", 100.0); + resourceUserPool.get("derek").put("memory", 1000.0); + + config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool); + Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); + + config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); + + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10); + TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10); + TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 1, 0, 1, 0, currentTime - 2, 10); + + config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby"); + + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 10); + TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20); + + config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek"); + + TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 29); + TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 15, 29); + + Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); + topoMap.put(topo2.getId(), topo2); + topoMap.put(topo3.getId(), topo3); + topoMap.put(topo4.getId(), topo4); + topoMap.put(topo5.getId(), topo5); + + Topologies topologies = new Topologies(topoMap); + + ResourceAwareScheduler rs = new ResourceAwareScheduler(); + + rs.prepare(config); + rs.schedule(topologies, cluster); + + for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size()); + + //user jerry submits another topology + topoMap.put(topo1.getId(), topo1); + topologies = new Topologies(topoMap); + rs.schedule(topologies, cluster); + + for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) { + Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of attempted topologies", 1, rs.getUser("bobby").getTopologiesAttempted().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size()); + Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size()); + Assert.assertEquals("correct topology to evict", rs.getUser("bobby").getTopologiesAttempted().iterator().next().getName(), "topo-3"); + + topoMap.put(topo6.getId(), topo6); + topologies = new Topologies(topoMap); + rs.schedule(topologies, cluster); + + for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) { + Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size()); + Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size()); + + Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("bobby").getTopologiesAttempted()) != null); + Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("bobby").getTopologiesAttempted()) != null); + + topoMap.put(topo7.getId(), topo7); + topologies = new Topologies(topoMap); + rs.schedule(topologies, cluster); + + for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 3, rs.getUser("jerry").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + for (TopologyDetails topo : rs.getUser("derek").getTopologiesAttempted()) { + Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 1, rs.getUser("derek").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size()); + Assert.assertEquals("correct topology to evict", rs.getUser("derek").getTopologiesAttempted().iterator().next().getName(), "topo-4"); + + for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) { + Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size()); + Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size()); + + Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("bobby").getTopologiesAttempted()) != null); + Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("bobby").getTopologiesAttempted()) != null); + } + + /** + * If topologies from other users cannot be evicted to make space + * check if there is a topology with lower priority that can be evicted from the current user + */ + @Test + public void TestEvictTopologyFromItself() { + INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); + Map<String, Number> resourceMap = new HashMap<String, Number>(); + resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0); + resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0); + Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap); + Config config = new Config(); + config.putAll(Utils.readDefaultConfig()); + config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName()); + config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName()); + config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName()); + config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0); + config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500); + config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500); + Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>(); + resourceUserPool.put("jerry", new HashMap<String, Number>()); + resourceUserPool.get("jerry").put("cpu", 200.0); + resourceUserPool.get("jerry").put("memory", 2000.0); + + resourceUserPool.put("bobby", new HashMap<String, Number>()); + resourceUserPool.get("bobby").put("cpu", 100.0); + resourceUserPool.get("bobby").put("memory", 1000.0); + + resourceUserPool.put("derek", new HashMap<String, Number>()); + resourceUserPool.get("derek").put("cpu", 100.0); + resourceUserPool.get("derek").put("memory", 1000.0); + + config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool); + Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); + + config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); + + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20); + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20); + TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 29); + TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10); + + config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby"); + + TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 10); + + config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek"); + + TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 29); + + Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); + topoMap.put(topo1.getId(), topo1); + topoMap.put(topo2.getId(), topo2); + topoMap.put(topo5.getId(), topo5); + topoMap.put(topo6.getId(), topo6); + + Topologies topologies = new Topologies(topoMap); + + ResourceAwareScheduler rs = new ResourceAwareScheduler(); + + rs.prepare(config); + rs.schedule(topologies, cluster); + + for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size()); + + + for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size()); + + //user jerry submits another topology into a full cluster + // topo3 should not be able to scheduled + topoMap.put(topo3.getId(), topo3); + topologies = new Topologies(topoMap); + rs.schedule(topologies, cluster); + + for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 1, rs.getUser("jerry").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size()); + //make sure that topo-3 didn't get scheduled. + Assert.assertEquals("correct topology in attempted queue", rs.getUser("jerry").getTopologiesAttempted().iterator().next().getName(), "topo-3"); + + + for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size()); + + //user jerry submits another topology but this one should be scheduled since it has higher priority than than the + //rest of jerry's running topologies + topoMap.put(topo4.getId(), topo4); + topologies = new Topologies(topoMap); + rs.schedule(topologies, cluster); + + for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 2, rs.getUser("jerry").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size()); + Assert.assertTrue("correct topology in attempted queue", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("jerry").getTopologiesAttempted()) != null); + //Either topo-1 or topo-2 should have gotten evicted + Assert.assertTrue("correct topology in attempted queue", ((TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-1", rs.getUser("jerry").getTopologiesAttempted())) != null) + || (TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("jerry").getTopologiesAttempted()) != null)); + //assert that topo-4 got scheduled + Assert.assertTrue("correct topology in running queue", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-4", rs.getUser("jerry").getTopologiesRunning()) != null); + + for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size()); + } + + /** + * If users are above his or her guarantee, check if topology eviction works correct + */ + @Test + public void TestOverGuaranteeEviction() { + INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); + Map<String, Number> resourceMap = new HashMap<String, Number>(); + resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0); + resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0); + Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap); + Config config = new Config(); + config.putAll(Utils.readDefaultConfig()); + config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName()); + config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName()); + config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName()); + config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0); + config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500); + config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500); + Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>(); + resourceUserPool.put("jerry", new HashMap<String, Number>()); + resourceUserPool.get("jerry").put("cpu", 70.0); + resourceUserPool.get("jerry").put("memory", 700.0); + + resourceUserPool.put("bobby", new HashMap<String, Number>()); + resourceUserPool.get("bobby").put("cpu", 100.0); + resourceUserPool.get("bobby").put("memory", 1000.0); + + resourceUserPool.put("derek", new HashMap<String, Number>()); + resourceUserPool.get("derek").put("cpu", 25.0); + resourceUserPool.get("derek").put("memory", 250.0); + + config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool); + Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); + + config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); + + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20); + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20); + + config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby"); + + TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 10); + TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10); + + config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek"); + + TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29); + TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10); + + Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); + topoMap.put(topo1.getId(), topo1); + topoMap.put(topo3.getId(), topo3); + topoMap.put(topo4.getId(), topo4); + topoMap.put(topo5.getId(), topo5); + + Topologies topologies = new Topologies(topoMap); + + ResourceAwareScheduler rs = new ResourceAwareScheduler(); + + rs.prepare(config); + rs.schedule(topologies, cluster); + + for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size()); + + //user derek submits another topology into a full cluster + //topo6 should not be able to scheduled intially, but since topo6 has higher priority than topo5 + //topo5 will be evicted so that topo6 can be scheduled + topoMap.put(topo6.getId(), topo6); + topologies = new Topologies(topoMap); + rs.schedule(topologies, cluster); + + for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 1, rs.getUser("derek").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size()); + //topo5 will be evicted since topo6 has higher priority + Assert.assertEquals("correct topology in attempted queue", "topo-5", rs.getUser("derek").getTopologiesAttempted().iterator().next().getName()); + + for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size()); + + //user jerry submits topo2 + topoMap.put(topo2.getId(), topo2); + topologies = new Topologies(topoMap); + rs.schedule(topologies, cluster); + + for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size()); + + for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 0, rs.getUser("derek").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 2, rs.getUser("derek").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size()); + Assert.assertEquals("correct topology in attempted queue", "topo-6", rs.getUser("derek").getTopologiesAttempted().iterator().next().getName()); + + for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) { + Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))); + } + Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size()); + Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size()); + Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size()); + Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size()); + } +}