[ 
https://issues.apache.org/jira/browse/STORM-898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15058836#comment-15058836
 ] 

ASF GitHub Bot commented on STORM-898:
--------------------------------------

Github user rfarivar commented on a diff in the pull request:

    https://github.com/apache/storm/pull/921#discussion_r47699226
  
    --- Diff: 
storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
 ---
    @@ -0,0 +1,1176 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource;
    +
    +import backtype.storm.Config;
    +import backtype.storm.scheduler.Cluster;
    +import backtype.storm.scheduler.ExecutorDetails;
    +import backtype.storm.scheduler.INimbus;
    +import backtype.storm.scheduler.IScheduler;
    +import backtype.storm.scheduler.SchedulerAssignment;
    +import backtype.storm.scheduler.SchedulerAssignmentImpl;
    +import backtype.storm.scheduler.SupervisorDetails;
    +import backtype.storm.scheduler.Topologies;
    +import backtype.storm.scheduler.TopologyDetails;
    +import backtype.storm.scheduler.WorkerSlot;
    +import backtype.storm.utils.Time;
    +import backtype.storm.utils.Utils;
    +import backtype.storm.validation.ConfigValidation;
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +
    +public class TestResourceAwareScheduler {
    +
    +    private static final int NUM_SUPS = 20;
    +    private static final int NUM_WORKERS_PER_SUP = 4;
    +    private final String TOPOLOGY_SUBMITTER = "jerry";
    +
    +    private static final Logger LOG = 
LoggerFactory.getLogger(TestResourceAwareScheduler.class);
    +
    +    @Test
    +    public void TestReadInResourceAwareSchedulerUserPools() {
    +
    +        Map fromFile = 
Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
    +        LOG.info("fromFile: {}", fromFile);
    +        ConfigValidation.validateFields(fromFile);
    +    }
    +
    +    @Test
    +    public void TestTopologySortedInCorrectOrder() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<String, Number>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(20, 4, resourceMap);
    +        Config config = new Config();
    +        config.putAll(Utils.readDefaultConfig());
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, 
backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, 
backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
    +        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, TOPOLOGY_SUBMITTER);
    +
    +        Map<String, Map<String, Number>> resourceUserPool = new 
HashMap<String, Map<String, Number>>();
    +        resourceUserPool.put("jerry", new HashMap<String, Number>());
    +        resourceUserPool.get("jerry").put("cpu", 1000);
    +        resourceUserPool.get("jerry").put("memory", 8192.0);
    +
    +        resourceUserPool.put("bobby", new HashMap<String, Number>());
    +        resourceUserPool.get("bobby").put("cpu", 10000.0);
    +        resourceUserPool.get("bobby").put("memory", 32768);
    +
    +        resourceUserPool.put("derek", new HashMap<String, Number>());
    +        resourceUserPool.get("derek").put("cpu", 5000.0);
    +        resourceUserPool.get("derek").put("memory", 16384.0);
    +
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
    +
    +        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 2, 20);
    +        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 8, 30);
    +        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 16, 30);
    +        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 16, 20);
    +        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 24, 30);
    +
    +
    +        Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
    +        topoMap.put(topo1.getId(), topo1);
    +        topoMap.put(topo2.getId(), topo2);
    +        topoMap.put(topo3.getId(), topo3);
    +        topoMap.put(topo4.getId(), topo4);
    +        topoMap.put(topo5.getId(), topo5);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
    +
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        Set<TopologyDetails> queue = 
rs.getUser("jerry").getTopologiesPending();
    +        Assert.assertEquals("check size", queue.size(), 0);
    +
    +        queue = rs.getUser("jerry").getTopologiesRunning();
    +
    +        Iterator<TopologyDetails> itr = queue.iterator();
    +
    +        TopologyDetails topo = itr.next();
    +        LOG.info("{} - {}", topo.getName(), queue);
    +        Assert.assertEquals("check order", topo.getName(), "topo-4");
    +
    +        topo = itr.next();
    +        LOG.info("{} - {}", topo.getName(), queue);
    +        Assert.assertEquals("check order", topo.getName(), "topo-1");
    +
    +        topo = itr.next();
    +        LOG.info("{} - {}", topo.getName(), queue);
    +        Assert.assertEquals("check order", topo.getName(), "topo-5");
    +
    +        topo = itr.next();
    +        LOG.info("{} - {}", topo.getName(), queue);
    +        Assert.assertEquals("check order", topo.getName(), "topo-3");
    +
    +        topo = itr.next();
    +        LOG.info("{} - {}", topo.getName(), queue);
    +        Assert.assertEquals("check order", topo.getName(), "topo-2");
    +
    +        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 30, 10);
    +        topoMap.put(topo6.getId(), topo6);
    +
    +        topologies = new Topologies(topoMap);
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        queue = rs.getUser("jerry").getTopologiesRunning();
    +        itr = queue.iterator();
    +
    +        topo = itr.next();
    +        Assert.assertEquals("check order", topo.getName(), "topo-6");
    +
    +        topo = itr.next();
    +        Assert.assertEquals("check order", topo.getName(), "topo-4");
    +
    +        topo = itr.next();
    +        Assert.assertEquals("check order", topo.getName(), "topo-1");
    +
    +        topo = itr.next();
    +        Assert.assertEquals("check order", topo.getName(), "topo-5");
    +
    +        topo = itr.next();
    +        Assert.assertEquals("check order", topo.getName(), "topo-3");
    +
    +        topo = itr.next();
    +        Assert.assertEquals("check order", topo.getName(), "topo-2");
    +
    +        queue = rs.getUser("jerry").getTopologiesPending();
    +        Assert.assertEquals("check size", queue.size(), 0);
    +    }
    +
    +    @Test
    +    public void TestMultipleUsers() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<String, Number>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 1000.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0 * 10);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(20, 4, resourceMap);
    +        Config config = new Config();
    +        config.putAll(Utils.readDefaultConfig());
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, 
backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, 
backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
    +        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
    +        Map<String, Map<String, Number>> resourceUserPool = new 
HashMap<String, Map<String, Number>>();
    +        resourceUserPool.put("jerry", new HashMap<String, Number>());
    +        resourceUserPool.get("jerry").put("cpu", 1000);
    +        resourceUserPool.get("jerry").put("memory", 8192.0);
    +
    +        resourceUserPool.put("bobby", new HashMap<String, Number>());
    +        resourceUserPool.get("bobby").put("cpu", 10000.0);
    +        resourceUserPool.get("bobby").put("memory", 32768);
    +
    +        resourceUserPool.put("derek", new HashMap<String, Number>());
    +        resourceUserPool.get("derek").put("cpu", 5000.0);
    +        resourceUserPool.get("derek").put("memory", 16384.0);
    +
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
    +
    +        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 2, 20);
    +        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 8, 29);
    +        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 16, 29);
    +        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 16, 20);
    +        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 24, 29);
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
    +
    +        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 2, 20);
    +        TopologyDetails topo7 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 8, 29);
    +        TopologyDetails topo8 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 16, 29);
    +        TopologyDetails topo9 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 16, 20);
    +        TopologyDetails topo10 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 24, 29);
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
    +
    +        TopologyDetails topo11 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 2, 20);
    +        TopologyDetails topo12 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 8, 29);
    +        TopologyDetails topo13 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 16, 29);
    +        TopologyDetails topo14 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 16, 20);
    +        TopologyDetails topo15 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 24, 29);
    +
    +        Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
    +        topoMap.put(topo1.getId(), topo1);
    +        topoMap.put(topo2.getId(), topo2);
    +        topoMap.put(topo3.getId(), topo3);
    +        topoMap.put(topo4.getId(), topo4);
    +        topoMap.put(topo5.getId(), topo5);
    +        topoMap.put(topo6.getId(), topo6);
    +        topoMap.put(topo7.getId(), topo7);
    +        topoMap.put(topo8.getId(), topo8);
    +        topoMap.put(topo9.getId(), topo9);
    +        topoMap.put(topo10.getId(), topo10);
    +        topoMap.put(topo11.getId(), topo11);
    +        topoMap.put(topo12.getId(), topo12);
    +        topoMap.put(topo13.getId(), topo13);
    +        topoMap.put(topo14.getId(), topo14);
    +        topoMap.put(topo15.getId(), topo15);
    +
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        for (TopologyDetails topo : topoMap.values()) {
    +            
Assert.assertTrue(TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +
    +        for (User user : rs.getUserMap().values()) {
    +            Assert.assertEquals(user.getTopologiesPending().size(), 0);
    +            Assert.assertEquals(user.getTopologiesRunning().size(), 5);
    +        }
    +    }
    +
    +    @Test
    +    public void testHandlingClusterSubscription() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<String, Number>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0 * 10);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(1, 4, resourceMap);
    +        Config config = new Config();
    +        config.putAll(Utils.readDefaultConfig());
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, 
backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, 
backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
    +        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
    +        Map<String, Map<String, Number>> resourceUserPool = new 
HashMap<String, Map<String, Number>>();
    +        resourceUserPool.put("jerry", new HashMap<String, Number>());
    +        resourceUserPool.get("jerry").put("cpu", 1000);
    +        resourceUserPool.get("jerry").put("memory", 8192.0);
    +
    +        resourceUserPool.put("bobby", new HashMap<String, Number>());
    +        resourceUserPool.get("bobby").put("cpu", 10000.0);
    +        resourceUserPool.get("bobby").put("memory", 32768);
    +
    +        resourceUserPool.put("derek", new HashMap<String, Number>());
    +        resourceUserPool.get("derek").put("cpu", 5000.0);
    +        resourceUserPool.get("derek").put("memory", 16384.0);
    +
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
    +
    +        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 2, 20);
    +        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
Time.currentTimeSecs() - 8, 29);
    +
    +        Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
    +        topoMap.put(topo1.getId(), topo1);
    +        topoMap.put(topo2.getId(), topo2);
    +
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        int fullyScheduled = 0;
    +        for (TopologyDetails topo : topoMap.values()) {
    +            if 
(TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())))
 {
    +                fullyScheduled++;
    +            }
    +        }
    +        Assert.assertEquals("# of Fully scheduled", 1, fullyScheduled);
    +        Assert.assertEquals("# of topologies schedule attempted", 1, 
rs.getUser("jerry").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of topologies running", 1, 
rs.getUser("jerry").getTopologiesRunning().size());
    +        Assert.assertEquals("# of topologies schedule pending", 0, 
rs.getUser("jerry").getTopologiesPending().size());
    +    }
    +
    +    /**
    +     * The resources in the cluster is limited. In the first round of 
scheduling, all resources in the cluster is used.
    +     * User jerry submits another toplogy.  Since user jerry has has his 
resource guarantees satisfied, and user bobby
    +     * has exceeded his resource guarantee, topo-3 from user bobby should 
be eviced.
    +     */
    +    @Test
    +    public void testEviction() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<String, Number>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
    +        Config config = new Config();
    +        config.putAll(Utils.readDefaultConfig());
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, 
backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, 
backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
    +        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
    +        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
    +        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
500);
    +        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
500);
    +        Map<String, Map<String, Number>> resourceUserPool = new 
HashMap<String, Map<String, Number>>();
    +        resourceUserPool.put("jerry", new HashMap<String, Number>());
    +        resourceUserPool.get("jerry").put("cpu", 200.0);
    +        resourceUserPool.get("jerry").put("memory", 2000.0);
    +
    +        resourceUserPool.put("bobby", new HashMap<String, Number>());
    +        resourceUserPool.get("bobby").put("cpu", 100.0);
    +        resourceUserPool.get("bobby").put("memory", 1000.0);
    +
    +        resourceUserPool.put("derek", new HashMap<String, Number>());
    +        resourceUserPool.get("derek").put("cpu", 200.0);
    +        resourceUserPool.get("derek").put("memory", 2000.0);
    +
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
    +
    +        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 10);
    +        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 20);
    +
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
    +
    +        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 10);
    +        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 20);
    +
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
    +
    +        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 29);
    +
    +        Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
    +        topoMap.put(topo1.getId(), topo1);
    +        topoMap.put(topo2.getId(), topo2);
    +        topoMap.put(topo3.getId(), topo3);
    +        topoMap.put(topo4.getId(), topo4);
    +
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        for (TopologyDetails topo : 
rs.getUser("jerry").getTopologiesRunning()) {
    +            Assert.assertTrue("correct topology to evict", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("jerry").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("jerry").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("jerry").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("jerry").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("derek").getTopologiesRunning()) {
    +            Assert.assertTrue("correct topology to evict", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("derek").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("derek").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("derek").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("derek").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("bobby").getTopologiesRunning()) {
    +            Assert.assertTrue("correct topology to evict", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 2, 
rs.getUser("bobby").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("bobby").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("bobby").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("bobby").getTopologiesInvalid().size());
    +
    +        //user jerry submits another topology
    +        topoMap.put(topo6.getId(), topo6);
    +        topologies = new Topologies(topoMap);
    +
    +        rs.schedule(topologies, cluster);
    +
    +        for (TopologyDetails topo : 
rs.getUser("jerry").getTopologiesRunning()) {
    +            Assert.assertTrue("correct topology to evict", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 2, 
rs.getUser("jerry").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("jerry").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("jerry").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("jerry").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("derek").getTopologiesRunning()) {
    +            Assert.assertTrue("correct topology to evict", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("derek").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("derek").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("derek").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("derek").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("bobby").getTopologiesRunning()) {
    +            Assert.assertTrue("correct topology to evict", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("bobby").getTopologiesRunning().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("bobby").getTopologiesAttempted()) {
    +            Assert.assertFalse("correct topology to evict", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of attempted topologies", 1, 
rs.getUser("bobby").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("bobby").getTopologiesPending().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("bobby").getTopologiesInvalid().size());
    +    }
    +
    +    @Test
    +    public void TestEvictMultipleTopologies() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<String, Number>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
    +        Config config = new Config();
    +        config.putAll(Utils.readDefaultConfig());
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, 
backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, 
backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
    +        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
    +        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
    +        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
500);
    +        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
500);
    +        Map<String, Map<String, Number>> resourceUserPool = new 
HashMap<String, Map<String, Number>>();
    +        resourceUserPool.put("jerry", new HashMap<String, Number>());
    +        resourceUserPool.get("jerry").put("cpu", 200.0);
    +        resourceUserPool.get("jerry").put("memory", 2000.0);
    +
    +        resourceUserPool.put("derek", new HashMap<String, Number>());
    +        resourceUserPool.get("derek").put("cpu", 100.0);
    +        resourceUserPool.get("derek").put("memory", 1000.0);
    +
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
    +
    +        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 2, 0, 1, 0, 
Time.currentTimeSecs() - 2, 10);
    +
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
    +
    +        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 10);
    +        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 20);
    +
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
    +
    +        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 29);
    +        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 29);
    +
    +
    +        Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
    +        topoMap.put(topo2.getId(), topo2);
    +        topoMap.put(topo3.getId(), topo3);
    +        topoMap.put(topo4.getId(), topo4);
    +        topoMap.put(topo5.getId(), topo5);
    +
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        for (TopologyDetails topo : 
rs.getUser("derek").getTopologiesRunning()) {
    +            Assert.assertTrue("correct topology to evict", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 2, 
rs.getUser("derek").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("derek").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("derek").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("derek").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("bobby").getTopologiesRunning()) {
    +            Assert.assertTrue("correct topology to evict", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 2, 
rs.getUser("bobby").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("bobby").getTopologiesPending().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("bobby").getTopologiesInvalid().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("bobby").getTopologiesAttempted().size());
    +
    +        //user jerry submits another topology
    +        topoMap.put(topo1.getId(), topo1);
    +        topologies = new Topologies(topoMap);
    +        rs.schedule(topologies, cluster);
    +
    +        for (TopologyDetails topo : 
rs.getUser("jerry").getTopologiesRunning()) {
    +            Assert.assertTrue("correct topology to evict", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("jerry").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("jerry").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("jerry").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("jerry").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("derek").getTopologiesRunning()) {
    +            Assert.assertTrue("correct topology to evict", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 2, 
rs.getUser("derek").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("derek").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("derek").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("derek").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("bobby").getTopologiesAttempted()) {
    +            Assert.assertFalse("correct topology to evict", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of attempted topologies", 2, 
rs.getUser("bobby").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("bobby").getTopologiesPending().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("bobby").getTopologiesInvalid().size());
    +        Assert.assertEquals("# of running topologies", 0, 
rs.getUser("bobby").getTopologiesRunning().size());
    +    }
    +
    +    /**
    +     * Eviction order:
    +     * topo-3: since user bobby don't have any resource guarantees and 
topo-3 is the lowest priority for user bobby
    +     * topo-2: since user bobby don't have any resource guarantees and 
topo-2 is the next lowest priority for user bobby
    +     * topo-5: since user derek has exceeded his resource guarantee while 
user jerry has not.  topo-5 and topo-4 has the same priority
    +     * but topo-4 was submitted earlier thus we choose that one to evict
    +     */
    +    @Test
    +    public void 
TestEvictMultipleTopologiesFromMultipleUsersInCorrectOrder() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<String, Number>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
    +        Config config = new Config();
    +        config.putAll(Utils.readDefaultConfig());
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, 
backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, 
backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
    +        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
    +        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
    +        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
500);
    +        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
500);
    +        Map<String, Map<String, Number>> resourceUserPool = new 
HashMap<String, Map<String, Number>>();
    +        resourceUserPool.put("jerry", new HashMap<String, Number>());
    +        resourceUserPool.get("jerry").put("cpu", 300.0);
    +        resourceUserPool.get("jerry").put("memory", 3000.0);
    +
    +        resourceUserPool.put("derek", new HashMap<String, Number>());
    +        resourceUserPool.get("derek").put("cpu", 100.0);
    +        resourceUserPool.get("derek").put("memory", 1000.0);
    +
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
    +
    +        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 10);
    +        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 10);
    +        TopologyDetails topo7 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 10);
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
    +
    +        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 10);
    +        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 20);
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
    +
    +        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 29);
    +        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 15, 29);
    +
    +        Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
    +        topoMap.put(topo2.getId(), topo2);
    +        topoMap.put(topo3.getId(), topo3);
    +        topoMap.put(topo4.getId(), topo4);
    +        topoMap.put(topo5.getId(), topo5);
    +
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        for (TopologyDetails topo : 
rs.getUser("derek").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 2, 
rs.getUser("derek").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("derek").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("derek").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("derek").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("bobby").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 2, 
rs.getUser("bobby").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("bobby").getTopologiesPending().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("bobby").getTopologiesInvalid().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("bobby").getTopologiesAttempted().size());
    +
    +        //user jerry submits another topology
    +        topoMap.put(topo1.getId(), topo1);
    +        topologies = new Topologies(topoMap);
    +        rs.schedule(topologies, cluster);
    +
    +        for (TopologyDetails topo : 
rs.getUser("jerry").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("jerry").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("jerry").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("jerry").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("jerry").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("derek").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 2, 
rs.getUser("derek").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("derek").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("derek").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("derek").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("bobby").getTopologiesAttempted()) {
    +            Assert.assertFalse("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        for (TopologyDetails topo : 
rs.getUser("bobby").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of attempted topologies", 1, 
rs.getUser("bobby").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("bobby").getTopologiesPending().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("bobby").getTopologiesInvalid().size());
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("bobby").getTopologiesRunning().size());
    +        Assert.assertEquals("correct topology to evict", 
rs.getUser("bobby").getTopologiesAttempted().iterator().next().getName(), 
"topo-3");
    +
    +        topoMap.put(topo6.getId(), topo6);
    +        topologies = new Topologies(topoMap);
    +        rs.schedule(topologies, cluster);
    +
    +        for (TopologyDetails topo : 
rs.getUser("jerry").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 2, 
rs.getUser("jerry").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("jerry").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("jerry").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("jerry").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("derek").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 2, 
rs.getUser("derek").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("derek").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("derek").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("derek").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("bobby").getTopologiesAttempted()) {
    +            Assert.assertFalse("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of attempted topologies", 2, 
rs.getUser("bobby").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("bobby").getTopologiesPending().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("bobby").getTopologiesInvalid().size());
    +        Assert.assertEquals("# of running topologies", 0, 
rs.getUser("bobby").getTopologiesRunning().size());
    +
    +        Assert.assertTrue("correct topology to evict", 
TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", 
rs.getUser("bobby").getTopologiesAttempted()) != null);
    +        Assert.assertTrue("correct topology to evict", 
TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", 
rs.getUser("bobby").getTopologiesAttempted()) != null);
    +
    +        topoMap.put(topo7.getId(), topo7);
    +        topologies = new Topologies(topoMap);
    +        rs.schedule(topologies, cluster);
    +
    +        for (TopologyDetails topo : 
rs.getUser("jerry").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 3, 
rs.getUser("jerry").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("jerry").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("jerry").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("jerry").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("derek").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        for (TopologyDetails topo : 
rs.getUser("derek").getTopologiesAttempted()) {
    +            Assert.assertFalse("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("derek").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("derek").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 1, 
rs.getUser("derek").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("derek").getTopologiesInvalid().size());
    +        Assert.assertEquals("correct topology to evict", 
rs.getUser("derek").getTopologiesAttempted().iterator().next().getName(), 
"topo-4");
    +
    +        for (TopologyDetails topo : 
rs.getUser("bobby").getTopologiesAttempted()) {
    +            Assert.assertFalse("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of attempted topologies", 2, 
rs.getUser("bobby").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("bobby").getTopologiesPending().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("bobby").getTopologiesInvalid().size());
    +        Assert.assertEquals("# of running topologies", 0, 
rs.getUser("bobby").getTopologiesRunning().size());
    +
    +        Assert.assertTrue("correct topology to evict", 
TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", 
rs.getUser("bobby").getTopologiesAttempted()) != null);
    +        Assert.assertTrue("correct topology to evict", 
TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", 
rs.getUser("bobby").getTopologiesAttempted()) != null);
    +    }
    +
    +    /**
    +     * If topologies from other users cannot be evicted to make space
    +     * check if there is a topology with lower priority that can be 
evicted from the current user
    +     */
    +    @Test
    +    public void TestEvictTopologyFromItself() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<String, Number>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
    +        Config config = new Config();
    +        config.putAll(Utils.readDefaultConfig());
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, 
backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, 
backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
    +        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
    +        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
    +        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
500);
    +        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
500);
    +        Map<String, Map<String, Number>> resourceUserPool = new 
HashMap<String, Map<String, Number>>();
    +        resourceUserPool.put("jerry", new HashMap<String, Number>());
    +        resourceUserPool.get("jerry").put("cpu", 200.0);
    +        resourceUserPool.get("jerry").put("memory", 2000.0);
    +
    +        resourceUserPool.put("bobby", new HashMap<String, Number>());
    +        resourceUserPool.get("bobby").put("cpu", 100.0);
    +        resourceUserPool.get("bobby").put("memory", 1000.0);
    +
    +
    +        resourceUserPool.put("derek", new HashMap<String, Number>());
    +        resourceUserPool.get("derek").put("cpu", 100.0);
    +        resourceUserPool.get("derek").put("memory", 1000.0);
    +
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
    +
    +        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 20);
    +        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 20);
    +        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 29);
    +        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 10);
    +
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
    +
    +        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 10);
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
    +
    +        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 29);
    +
    +        Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
    +        topoMap.put(topo1.getId(), topo1);
    +        topoMap.put(topo2.getId(), topo2);
    +        topoMap.put(topo5.getId(), topo5);
    +        topoMap.put(topo6.getId(), topo6);
    +
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        for (TopologyDetails topo : 
rs.getUser("jerry").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 2, 
rs.getUser("jerry").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("jerry").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("jerry").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("jerry").getTopologiesInvalid().size());
    +
    +
    +        for (TopologyDetails topo : 
rs.getUser("derek").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("derek").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("derek").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("derek").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("derek").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("bobby").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("bobby").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("bobby").getTopologiesPending().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("bobby").getTopologiesInvalid().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("bobby").getTopologiesAttempted().size());
    +
    +        //user jerry submits another topology into a full cluster
    +        // topo3 should not be able to scheduled
    +        topoMap.put(topo3.getId(), topo3);
    +        topologies = new Topologies(topoMap);
    +        rs.schedule(topologies, cluster);
    +
    +        for (TopologyDetails topo : 
rs.getUser("jerry").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 2, 
rs.getUser("jerry").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("jerry").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 1, 
rs.getUser("jerry").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("jerry").getTopologiesInvalid().size());
    +        //make sure that topo-3 didn't get scheduled.
    +        Assert.assertEquals("correct topology in attempted queue", 
rs.getUser("jerry").getTopologiesAttempted().iterator().next().getName(), 
"topo-3");
    +
    +
    +        for (TopologyDetails topo : 
rs.getUser("derek").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("derek").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("derek").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("derek").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("derek").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("bobby").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("bobby").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("bobby").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("bobby").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("bobby").getTopologiesInvalid().size());
    +
    +        //user jerry submits another topology but this one should be 
scheduled since it has higher priority than than the
    +        //rest of jerry's running topologies
    +        topoMap.put(topo4.getId(), topo4);
    +        topologies = new Topologies(topoMap);
    +        rs.schedule(topologies, cluster);
    +
    +        for (TopologyDetails topo : 
rs.getUser("jerry").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 2, 
rs.getUser("jerry").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("jerry").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 2, 
rs.getUser("jerry").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("jerry").getTopologiesInvalid().size());
    +        Assert.assertTrue("correct topology in attempted queue", 
TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", 
rs.getUser("jerry").getTopologiesAttempted()) != null);
    +        //Either topo-1 or topo-2 should have gotten evicted
    +        Assert.assertTrue("correct topology in attempted queue", 
((TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-1", 
rs.getUser("jerry").getTopologiesAttempted())) != null)
    +                || 
(TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", 
rs.getUser("jerry").getTopologiesAttempted()) != null));
    +        //assert that topo-4 got scheduled
    +        Assert.assertTrue("correct topology in running queue", 
TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-4", 
rs.getUser("jerry").getTopologiesRunning()) != null);
    +
    +        for (TopologyDetails topo : 
rs.getUser("derek").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("derek").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("derek").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("derek").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("derek").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("bobby").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("bobby").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("bobby").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("bobby").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("bobby").getTopologiesInvalid().size());
    +    }
    +
    +    /**
    +     * If topologies from other users cannot be evicted to make space
    +     * check if there is a topology with lower priority that can be 
evicted from the current user
    +     */
    +    @Test
    +    public void TestOverGuaranteeEviction() {
    +        INimbus iNimbus = new 
TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<String, Number>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
    +        Map<String, SupervisorDetails> supMap = 
TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
    +        Config config = new Config();
    +        config.putAll(Utils.readDefaultConfig());
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, 
backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, 
backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
    +        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
    +        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
    +        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
500);
    +        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
500);
    +        Map<String, Map<String, Number>> resourceUserPool = new 
HashMap<String, Map<String, Number>>();
    +        resourceUserPool.put("jerry", new HashMap<String, Number>());
    +        resourceUserPool.get("jerry").put("cpu", 70.0);
    +        resourceUserPool.get("jerry").put("memory", 700.0);
    +
    +        resourceUserPool.put("bobby", new HashMap<String, Number>());
    +        resourceUserPool.get("bobby").put("cpu", 100.0);
    +        resourceUserPool.get("bobby").put("memory", 1000.0);
    +
    +
    +        resourceUserPool.put("derek", new HashMap<String, Number>());
    +        resourceUserPool.get("derek").put("cpu", 25.0);
    +        resourceUserPool.get("derek").put("memory", 250.0);
    +
    +        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
    +
    +        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 20);
    +        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 20);
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
    +
    +        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 10);
    +        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 10);
    +
    +        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
    +
    +        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 29);
    +        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
Time.currentTimeSecs() - 2, 10);
    +
    +        Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
    +        topoMap.put(topo1.getId(), topo1);
    +        topoMap.put(topo3.getId(), topo3);
    +        topoMap.put(topo4.getId(), topo4);
    +        topoMap.put(topo5.getId(), topo5);
    +
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        for (TopologyDetails topo : 
rs.getUser("jerry").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("jerry").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("jerry").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("jerry").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("jerry").getTopologiesInvalid().size());
    +
    +
    +        for (TopologyDetails topo : 
rs.getUser("derek").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("derek").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("derek").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("derek").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("derek").getTopologiesInvalid().size());
    +
    +        for (TopologyDetails topo : 
rs.getUser("bobby").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 2, 
rs.getUser("bobby").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("bobby").getTopologiesPending().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("bobby").getTopologiesInvalid().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("bobby").getTopologiesAttempted().size());
    +
    +        //user derek submits another topology into a full cluster
    +        // topo6 should not be able to scheduled
    +        topoMap.put(topo6.getId(), topo6);
    +        topologies = new Topologies(topoMap);
    +        rs.schedule(topologies, cluster);
    +
    +        for (TopologyDetails topo : 
rs.getUser("jerry").getTopologiesRunning()) {
    +            Assert.assertTrue("Assert scheduling topology success", 
TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
    +        }
    +        Assert.assertEquals("# of running topologies", 1, 
rs.getUser("jerry").getTopologiesRunning().size());
    +        Assert.assertEquals("# of pending topologies", 0, 
rs.getUser("jerry").getTopologiesPending().size());
    +        Assert.assertEquals("# of attempted topologies", 0, 
rs.getUser("jerry").getTopologiesAttempted().size());
    +        Assert.assertEquals("# of invalid topologies", 0, 
rs.getUser("jerry").getTopologiesInvalid().size());
    +
    +        for (TopologyDe
    --- End diff --
    
    extra empty line?


> Add priorities and per user resource guarantees to Resource Aware Scheduler
> ---------------------------------------------------------------------------
>
>                 Key: STORM-898
>                 URL: https://issues.apache.org/jira/browse/STORM-898
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Boyang Jerry Peng
>         Attachments: Resource Aware Scheduler for Storm.pdf
>
>
> In a multi-tenant environment we would like to be able to give individual 
> users a guarantee of how much CPU/Memory/Network they will be able to use in 
> a cluster.  We would also like to know which topologies a user feels are the 
> most important to keep running if there are not enough resources to run all 
> of their topologies.
> Each user should be able to specify if their topology is production, staging, 
> or development. Within each of those categories a user should be able to give 
> a topology a priority, 0 to 10 with 10 being the highest priority (or 
> something like this).
> If there are not enough resources on a cluster to run a topology assume this 
> topology is running using resources and find the user that is most over their 
> guaranteed resources.  Shoot the lowest priority topology for that user, and 
> repeat until, this topology is able to run, or this topology would be the one 
> shot.   Ideally we don't actually shoot anything until we know that we would 
> have made enough room.
> If the cluster is over-subscribed and everyone is under their guarantee, and 
> this topology would not put the user over their guarantee.  Shoot the lowest 
> priority topology in this workers resource pool until there is enough room to 
> run the topology or this topology is the one that would be shot.  We might 
> also want to think about what to do if we are going to shoot a production 
> topology in an oversubscribed case, and perhaps we can shoot a non-production 
> topology instead even if the other user is not over their guarantee.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to