Repository: storm
Updated Branches:
  refs/heads/1.x-branch 0efb94ce0 -> d182584a6


http://git-wip-us.apache.org/repos/asf/storm/blob/d182584a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 44f2136..cecc6d1 100644
--- 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -79,7 +79,6 @@ public class TestResourceAwareScheduler {
         
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
0.0);
         defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 
8192.0);
         defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
-        defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
     }
 
     @Test
@@ -103,7 +102,8 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals(0, node.totalSlotsUsed());
         Assert.assertEquals(4, node.totalSlots());
 
-        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 
0, 2, 0, 0, 0);
+        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 
0, 2, 0, 0, 0,
+            "user");
 
         List<ExecutorDetails> executors11 = new ArrayList<>();
         executors11.add(new ExecutorDetails(1, 1));
@@ -123,7 +123,8 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals(2, node.totalSlotsUsed());
         Assert.assertEquals(4, node.totalSlots());
 
-        TopologyDetails topology2 = 
TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 
0, 2, 0, 0, 0);
+        TopologyDetails topology2 = 
TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 
0, 2, 0, 0, 0,
+            "user");
 
         List<ExecutorDetails> executors21 = new ArrayList<>();
         executors21.add(new ExecutorDetails(1, 1));
@@ -165,7 +166,7 @@ public class TestResourceAwareScheduler {
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
 
-        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 
0, 0);
+        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 
0, 0, "user");
         Map<String, TopologyDetails> topoMap = new HashMap<>();
         topoMap.put(topology1.getId(), topology1);
         Topologies topologies = new Topologies(topoMap);
@@ -208,14 +209,14 @@ public class TestResourceAwareScheduler {
         Config config = new Config();
         config.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 0, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 0, executorMap1, 0, "user");
 
         TopologyBuilder builder2 = new TopologyBuilder(); // a topology with 
two unconnected partitions
         builder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
         builder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
         StormTopology stormTopology2 = builder2.createTopology();
         Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2);
-        TopologyDetails topology2 = new TopologyDetails("topology2", config, 
stormTopology2, 0, executorMap2, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config, 
stormTopology2, 0, executorMap2, 0, "user");
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
@@ -271,7 +272,7 @@ public class TestResourceAwareScheduler {
         Config config = new Config();
         config.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 0, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 0, executorMap1, 0, "user");
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
@@ -319,7 +320,7 @@ public class TestResourceAwareScheduler {
         Config config = new Config();
         config.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 2, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 2, executorMap1, 0, "user");
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
@@ -409,7 +410,7 @@ public class TestResourceAwareScheduler {
         Config config1 = new Config();
         config1.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 3, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 3, executorMap1, 0, "user");
 
         TopologyBuilder builder2 = new TopologyBuilder();
         builder2.setSpout("wordSpout2", new TestWordSpout(), 2);
@@ -419,7 +420,7 @@ public class TestResourceAwareScheduler {
         // memory requirement is large enough so that two executors can not be 
fully assigned to one node
         config2.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
1280.0);
         Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2);
-        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 2, executorMap2, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 2, executorMap2, 0, "user");
 
         // Test1: When a worker fails, RAS does not alter existing assignments 
on healthy workers
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config1);
@@ -567,7 +568,7 @@ public class TestResourceAwareScheduler {
         Config config1 = new Config();
         config1.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 1, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 1, executorMap1, 0, "user");
 
         // topo2 has 4 large tasks
         TopologyBuilder builder2 = new TopologyBuilder();
@@ -576,7 +577,7 @@ public class TestResourceAwareScheduler {
         Config config2 = new Config();
         config2.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2);
-        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 1, executorMap2, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 1, executorMap2, 0, "user");
 
         // topo3 has 4 large tasks
         TopologyBuilder builder3 = new TopologyBuilder();
@@ -585,7 +586,7 @@ public class TestResourceAwareScheduler {
         Config config3 = new Config();
         config3.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap3 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology3);
-        TopologyDetails topology3 = new TopologyDetails("topology3", config2, 
stormTopology3, 1, executorMap3, 0);
+        TopologyDetails topology3 = new TopologyDetails("topology3", config2, 
stormTopology3, 1, executorMap3, 0, "user");
 
         // topo4 has 12 small tasks, whose mem usage does not exactly divide a 
node's mem capacity
         TopologyBuilder builder4 = new TopologyBuilder();
@@ -594,7 +595,7 @@ public class TestResourceAwareScheduler {
         Config config4 = new Config();
         config4.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap4 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology4);
-        TopologyDetails topology4 = new TopologyDetails("topology4", config4, 
stormTopology4, 1, executorMap4, 0);
+        TopologyDetails topology4 = new TopologyDetails("topology4", config4, 
stormTopology4, 1, executorMap4, 0, "user");
 
         // topo5 has 40 small tasks, it should be able to exactly use up both 
the cpu and mem in the cluster
         TopologyBuilder builder5 = new TopologyBuilder();
@@ -603,7 +604,7 @@ public class TestResourceAwareScheduler {
         Config config5 = new Config();
         config5.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap5 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology5);
-        TopologyDetails topology5 = new TopologyDetails("topology5", config5, 
stormTopology5, 1, executorMap5, 0);
+        TopologyDetails topology5 = new TopologyDetails("topology5", config5, 
stormTopology5, 1, executorMap5, 0, "user");
 
         // Test1: Launch topo 1-3 together, it should be able to use up either 
mem or cpu resource due to exact division
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config1);
@@ -691,7 +692,7 @@ public class TestResourceAwareScheduler {
         config1.putAll(defaultTopologyConf);
         config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 1, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 1, executorMap1, 0, "user");
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config1);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
         Map<String, TopologyDetails> topoMap = new HashMap<>();
@@ -713,7 +714,7 @@ public class TestResourceAwareScheduler {
         config2.putAll(defaultTopologyConf);
         config2.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
         Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2);
-        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 1, executorMap2, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 1, executorMap2, 0, "user");
         cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config2);
         topoMap = new HashMap<>();
         topoMap.put(topology2.getId(), topology2);
@@ -769,16 +770,17 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
 
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 20);
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "bobby");
+        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "bobby");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -828,8 +830,6 @@ public class TestResourceAwareScheduler {
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
128.0);
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 0.0);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, TOPOLOGY_SUBMITTER);
-
         Map<String, Map<String, Number>> resourceUserPool = new 
HashMap<String, Map<String, Number>>();
         resourceUserPool.put("jerry", new HashMap<String, Number>());
         resourceUserPool.get("jerry").put("cpu", 1000);
@@ -845,11 +845,16 @@ public class TestResourceAwareScheduler {
 
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
 
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, 20);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8, 30);
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, 
currentTime - 16, 30);
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, 
currentTime - 16, 20);
-        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, 
currentTime - 24, 30);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2,
+            20, TOPOLOGY_SUBMITTER);
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8,
+            30, TOPOLOGY_SUBMITTER);
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, 
currentTime - 16,
+            30, TOPOLOGY_SUBMITTER);
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, 
currentTime - 16,
+            20, TOPOLOGY_SUBMITTER);
+        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, 
currentTime - 24,
+            30, TOPOLOGY_SUBMITTER);
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -894,7 +899,8 @@ public class TestResourceAwareScheduler {
         LOG.info("{} - {}", topo.getName(), queue);
         Assert.assertEquals("check order", topo.getName(), "topo-2");
 
-        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, 
currentTime - 30, 10);
+        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, 
currentTime - 30,
+            10, TOPOLOGY_SUBMITTER);
         topoMap.put(topo6.getId(), topo6);
 
         topologies = new Topologies(topoMap);
@@ -954,29 +960,38 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, 20);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8, 29);
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, 
currentTime - 16, 29);
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, 
currentTime - 16, 20);
-        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, 
currentTime - 24, 29);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-
-        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, 
currentTime - 2, 20);
-        TopologyDetails topo7 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, 
currentTime - 8, 29);
-        TopologyDetails topo8 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, 
currentTime - 16, 29);
-        TopologyDetails topo9 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, 
currentTime - 16, 20);
-        TopologyDetails topo10 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, 
currentTime - 24, 29);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo11 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, 
currentTime - 2, 20);
-        TopologyDetails topo12 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, 
currentTime - 8, 29);
-        TopologyDetails topo13 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, 
currentTime - 16, 29);
-        TopologyDetails topo14 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, 
currentTime - 16, 20);
-        TopologyDetails topo15 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, 
currentTime - 24, 29);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8, 29,
+            "jerry");
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, 
currentTime - 16,
+            29, "jerry");
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, 
currentTime - 16,
+            20, "jerry");
+        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, 
currentTime - 24,
+            29, "jerry");
+
+        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, 
currentTime - 2, 20,
+            "bobby");
+        TopologyDetails topo7 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, 
currentTime - 8, 29,
+            "bobby");
+        TopologyDetails topo8 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, 
currentTime - 16,
+            29, "bobby");
+        TopologyDetails topo9 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, 
currentTime - 16,
+            20, "bobby");
+        TopologyDetails topo10 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, 
currentTime - 24,
+            29, "bobby");
+
+        TopologyDetails topo11 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, 
currentTime - 2,
+            20, "derek");
+        TopologyDetails topo12 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, 
currentTime - 8,
+            29, "derek");
+        TopologyDetails topo13 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, 
currentTime - 16,
+            29, "derek");
+        TopologyDetails topo14 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, 
currentTime - 16,
+            20, "derek");
+        TopologyDetails topo15 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, 
currentTime - 24,
+            29, "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1040,10 +1055,10 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, 20);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8, 29);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8, 29,
+            "jerry");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1103,20 +1118,20 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
 
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 10);
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "bobby");
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "bobby");
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 29);
-        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
currentTime - 2, 10);
+        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 29,
+            "derek");
+        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1228,8 +1243,10 @@ public class TestResourceAwareScheduler {
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, 
currentTime - 2, 29);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 2, 0, 
currentTime - 2, 10);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, 
currentTime - 2, 29,
+            "user");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 2, 0, 
currentTime - 2, 10,
+            "user");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1283,11 +1300,12 @@ public class TestResourceAwareScheduler {
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, 
currentTime - 2, 10);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 2, 0, 2, 0, 
currentTime - 2, 20);
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 2, 1, 1, 
currentTime - 2, 20);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, 
currentTime - 2, 10,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 2, 0, 2, 0, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 2, 1, 1, 
currentTime - 2, 20,
+            "jerry");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1343,13 +1361,10 @@ public class TestResourceAwareScheduler {
 
         StormTopology stormTopology = builder.createTopology();
         TopologyDetails topo = new TopologyDetails("topo-1", config, 
stormTopology,
-                0,
-                genExecsAndComps(stormTopology), 0);
+                0, genExecsAndComps(stormTopology), 0, "jerry");
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo.getId(), topo);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d182584a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java
index 93e4b75..ce4d707 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java
@@ -42,7 +42,7 @@ public class TestUser {
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
 
-        List<TopologyDetails> topos = 
TestUtilsForResourceAwareScheduler.getListOfTopologies(config);
+        List<TopologyDetails> topos = 
TestUtilsForResourceAwareScheduler.getListOfTopologies(config, "user1");
         User user1 = new User("user1");
 
         for (TopologyDetails topo : topos) {
@@ -64,7 +64,7 @@ public class TestUser {
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
 
-        List<TopologyDetails> topos = 
TestUtilsForResourceAwareScheduler.getListOfTopologies(config);
+        List<TopologyDetails> topos = 
TestUtilsForResourceAwareScheduler.getListOfTopologies(config, "user1");
         User user1 = new User("user1");
 
         for (TopologyDetails topo : topos) {
@@ -95,7 +95,8 @@ public class TestUser {
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 200);
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 200);
 
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 1, 2, 1, 
Time.currentTimeSecs() - 24, 9);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 1, 2, 1,
+            Time.currentTimeSecs() - 24, 9, "user1");
 
         User user1 = new User("user1", resourceGuaranteeMap);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d182584a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index bc69725..f745621 100644
--- 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -63,20 +63,20 @@ public class TestUtilsForResourceAwareScheduler {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class);
 
-    public static List<TopologyDetails> getListOfTopologies(Config config) {
-
-        List<TopologyDetails> topos = new LinkedList<TopologyDetails>();
-
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", 
config, 5, 15, 1, 1, currentTime - 2, 20));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", 
config, 5, 15, 1, 1, currentTime - 8, 30));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", 
config, 5, 15, 1, 1, currentTime - 16, 30));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", 
config, 5, 15, 1, 1, currentTime - 16, 20));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", 
config, 5, 15, 1, 1, currentTime - 24, 30));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", 
config, 5, 15, 1, 1, currentTime - 2, 0));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", 
config, 5, 15, 1, 1, currentTime - 8, 0));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", 
config, 5, 15, 1, 1, currentTime - 16, 15));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", 
config, 5, 15, 1, 1, currentTime - 16, 8));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", 
config, 5, 15, 1, 1, currentTime - 24, 9));
+    public static List<TopologyDetails> getListOfTopologies(Config config, 
String user) {
+
+        List<TopologyDetails> topos = new LinkedList<>();
+
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", 
config, 5, 15, 1, 1, currentTime - 2, 20, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", 
config, 5, 15, 1, 1, currentTime - 8, 30, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", 
config, 5, 15, 1, 1, currentTime - 16, 30, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", 
config, 5, 15, 1, 1, currentTime - 16, 20, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", 
config, 5, 15, 1, 1, currentTime - 24, 30, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", 
config, 5, 15, 1, 1, currentTime - 2, 0, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", 
config, 5, 15, 1, 1, currentTime - 8, 0, user ));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", 
config, 5, 15, 1, 1, currentTime - 16, 15, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", 
config, 5, 15, 1, 1, currentTime - 16, 8, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", 
config, 5, 15, 1, 1, currentTime - 24, 9, user));
         return topos;
     }
 
@@ -140,8 +140,8 @@ public class TestUtilsForResourceAwareScheduler {
         return retMap;
     }
 
-    public static TopologyDetails getTopology(String name, Map config, int 
numSpout, int numBolt,
-                                              int spoutParallelism, int 
boltParallelism, int launchTime, int priority) {
+    public static TopologyDetails getTopology(String name, Map<String, Object> 
config, int numSpout, int numBolt,
+                                              int spoutParallelism, int 
boltParallelism, int launchTime, int priority, String owner) {
 
         Config conf = new Config();
         conf.putAll(config);
@@ -151,7 +151,7 @@ public class TestUtilsForResourceAwareScheduler {
         StormTopology topology = buildTopology(numSpout, numBolt, 
spoutParallelism, boltParallelism);
         TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, 
conf, topology,
                 0,
-                genExecsAndComps(topology), launchTime);
+                genExecsAndComps(topology), launchTime, owner);
         return topo;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d182584a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java
 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java
index f2d65bd..9b24ab8 100644
--- 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java
+++ 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java
@@ -74,20 +74,18 @@ public class TestDefaultEvictionStrategy {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "jerry");
+        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
 
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
currentTime - 2, 20);
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "bobby");
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "bobby");
 
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 29);
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 29,
+            "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -189,19 +187,18 @@ public class TestDefaultEvictionStrategy {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 2, 0, 1, 0, 
currentTime - 2, 10,
+            "jerry");
 
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 2, 0, 1, 0, 
currentTime - 2, 10);
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "bobby");
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "bobby");
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 29);
-        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 29);
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 29,
+            "derek");
+        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 29,
+            "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo2.getId(), topo2);
@@ -296,21 +293,22 @@ public class TestDefaultEvictionStrategy {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-        TopologyDetails topo7 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 1, 0, 1, 0, 
currentTime - 2, 10);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "jerry");
+        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "jerry");
+        TopologyDetails topo7 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "jerry");
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "bobby");
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "bobby");
 
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 29);
-        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 15, 29);
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 29,
+            "derek");
+        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 15, 29,
+            "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo2.getId(), topo2);
@@ -476,20 +474,20 @@ public class TestDefaultEvictionStrategy {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 29);
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 29,
+            "jerry");
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "jerry");
 
-        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 10);
+        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "bobby");
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
currentTime - 2, 29);
+        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
currentTime - 2, 29,
+            "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -633,20 +631,20 @@ public class TestDefaultEvictionStrategy {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 10);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "bobby");
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "bobby");
 
-        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 29);
-        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
currentTime - 2, 10);
+        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 29,
+            "derek");
+        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);

http://git-wip-us.apache.org/repos/asf/storm/blob/d182584a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index 537a89e..6eb6960 100644
--- 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -100,7 +100,7 @@ public class TestDefaultResourceAwareStrategy {
 
         TopologyDetails topo = new TopologyDetails("testTopology-id", conf, 
stormToplogy, 0,
                 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormToplogy)
-                , this.currentTime);
+                , this.currentTime, "user");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo.getId(), topo);
@@ -227,8 +227,10 @@ public class TestDefaultResourceAwareStrategy {
 
         //generate topologies
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, 
currentTime - 2, 10);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 8, 0, 2, 0, 
currentTime - 2, 10);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, 
currentTime - 2, 10,
+            "user");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 8, 0, 2, 0, 
currentTime - 2, 10,
+            "user");
 
         topoMap.put(topo1.getId(), topo1);
 

Reply via email to