http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
----------------------------------------------------------------------
diff --git 
a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj 
b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
index 339d5b3..99aeb84 100644
--- 
a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
+++ 
b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
@@ -138,7 +138,7 @@
                    2
                    {executor1 "spout1"
                     executor2 "bolt1"
-                    executor3 "bolt2"})]
+                    executor3 "bolt2"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -175,7 +175,7 @@
                    5
                    {executor1 "spout1"
                     executor2 "bolt1"
-                    executor3 "bolt2"})]
+                    executor3 "bolt2"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -216,7 +216,7 @@
                     executor2 "bolt1"
                     executor3 "bolt1"
                     executor4 "bolt1"
-                    executor5 "bolt2"})]
+                    executor5 "bolt2"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -255,7 +255,7 @@
                     executor2 "bolt1"
                     executor3 "bolt2"
                     executor4 "bolt3"
-                    executor5 "bolt4"})]
+                    executor5 "bolt4"} "user")]
     (let [node-map (Node/getAllNodesFrom single-cluster)
          free-pool (FreePool. )
          default-pool (DefaultPool. )]
@@ -302,7 +302,7 @@
                    2
                    {executor1 "spout1"
                     executor2 "bolt1"
-                    executor3 "bolt2"})
+                    executor3 "bolt2"} "user")
        topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"}
                     (StormTopology.)
@@ -310,7 +310,7 @@
                     {executor11 "spout11"
                      executor12 "bolt12"
                      executor13 "bolt13"
-                     executor14 "bolt14"})]
+                     executor14 "bolt14"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -383,7 +383,7 @@
                    {executor1 "spout1"
                     executor2 "bolt1"
                     executor3 "bolt2"
-                    executor4 "bolt4"})]
+                    executor4 "bolt4"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -427,7 +427,7 @@
                    {executor1 "spout1"
                     executor2 "bolt1"
                     executor3 "bolt2"
-                    executor4 "bolt4"})]
+                    executor4 "bolt4"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -474,7 +474,7 @@
                    {executor1 "spout1"
                     executor2 "bolt1"
                     executor3 "bolt2"
-                    executor4 "bolt4"})
+                    executor4 "bolt4"} "user")
        topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"
                      TOPOLOGY-ISOLATED-MACHINES 2}
@@ -483,7 +483,7 @@
                     {executor11 "spout11"
                      executor12 "bolt12"
                      executor13 "bolt13"
-                     executor14 "bolt14"})]
+                     executor14 "bolt14"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -579,7 +579,7 @@
                    {executor1 "spout1"
                     executor2 "bolt1"
                     executor3 "bolt2"
-                    executor4 "bolt4"})
+                    executor4 "bolt4"} "user")
        topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"
                      TOPOLOGY-ISOLATED-MACHINES 2}
@@ -588,7 +588,7 @@
                     {executor11 "spout11"
                      executor12 "bolt12"
                      executor13 "bolt13"
-                     executor14 "bolt14"})]
+                     executor14 "bolt14"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -629,34 +629,31 @@
 (deftest test-multitenant-scheduler
   (let [supers (gen-supervisors 10)
        topology1 (TopologyDetails. "topology1"
-                   {TOPOLOGY-NAME "topology-name-1"
-                    TOPOLOGY-SUBMITTER-USER "userC"}
+                   {TOPOLOGY-NAME "topology-name-1"}
                    (StormTopology.)
                    4
                    (mk-ed-map [["spout1" 0 5]
                                ["bolt1" 5 10]
                                ["bolt2" 10 15]
-                               ["bolt3" 15 20]]))
+                               ["bolt3" 15 20]]) "userC")
        topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"
-                     TOPOLOGY-ISOLATED-MACHINES 2
-                     TOPOLOGY-SUBMITTER-USER "userA"}
+                     TOPOLOGY-ISOLATED-MACHINES 2}
                     (StormTopology.)
                     4
                     (mk-ed-map [["spout11" 0 5]
                                 ["bolt12" 5 6]
                                 ["bolt13" 6 7]
-                                ["bolt14" 7 10]]))
+                                ["bolt14" 7 10]]) "userA")
        topology3 (TopologyDetails. "topology3"
                     {TOPOLOGY-NAME "topology-name-3"
-                     TOPOLOGY-ISOLATED-MACHINES 5
-                     TOPOLOGY-SUBMITTER-USER "userB"}
+                     TOPOLOGY-ISOLATED-MACHINES 5}
                     (StormTopology.)
                     10
                     (mk-ed-map [["spout21" 0 10]
                                 ["bolt22" 10 20]
                                 ["bolt23" 20 30]
-                                ["bolt24" 30 40]]))
+                                ["bolt24" 30 40]]) "userB")
        cluster (Cluster. (Nimbus$StandaloneINimbus.) supers {} nil)
        node-map (Node/getAllNodesFrom cluster)
        topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
@@ -682,14 +679,13 @@
 (deftest test-force-free-slot-in-bad-state
   (let [supers (gen-supervisors 1)
         topology1 (TopologyDetails. "topology1"
-                                    {TOPOLOGY-NAME "topology-name-1"
-                                     TOPOLOGY-SUBMITTER-USER "userC"}
+                                    {TOPOLOGY-NAME "topology-name-1"}
                                     (StormTopology.)
                                     4
                                     (mk-ed-map [["spout1" 0 5]
                                                 ["bolt1" 5 10]
                                                 ["bolt2" 10 15]
-                                                ["bolt3" 15 20]]))
+                                                ["bolt3" 15 20]]) "userC")
         existing-assignments {
                                "topology1" (SchedulerAssignmentImpl. 
"topology1" {(ExecutorDetails. 0 5) (WorkerSlot. "super0" 1)
                                                                                
   (ExecutorDetails. 5 10) (WorkerSlot. "super0" 20)
@@ -719,25 +715,22 @@
   (testing "Assiging same worker slot to different topologies is bad state"
     (let [supers (gen-supervisors 5)
           topology1 (TopologyDetails. "topology1"
-                      {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userC"}
+                      {TOPOLOGY-NAME "topology-name-1"}
                       (StormTopology.)
                       1
-                      (mk-ed-map [["spout1" 0 1]]))
+                      (mk-ed-map [["spout1" 0 1]]) "userC")
           topology2 (TopologyDetails. "topology2"
                       {TOPOLOGY-NAME "topology-name-2"
-                       TOPOLOGY-ISOLATED-MACHINES 2
-                       TOPOLOGY-SUBMITTER-USER "userA"}
+                       TOPOLOGY-ISOLATED-MACHINES 2}
                       (StormTopology.)
                       2
-                      (mk-ed-map [["spout11" 1 2]["bolt11" 3 4]]))
+                      (mk-ed-map [["spout11" 1 2]["bolt11" 3 4]]) "userA")
           topology3 (TopologyDetails. "topology3"
                       {TOPOLOGY-NAME "topology-name-3"
-                       TOPOLOGY-ISOLATED-MACHINES 1
-                       TOPOLOGY-SUBMITTER-USER "userB"}
+                       TOPOLOGY-ISOLATED-MACHINES 1}
                       (StormTopology.)
                       1
-                      (mk-ed-map [["spout21" 2 3]]))
+                      (mk-ed-map [["spout21" 2 3]]) "userB")
           worker-slot-with-multiple-assignments (WorkerSlot. "super1" 1)
           existing-assignments {"topology2" (SchedulerAssignmentImpl. 
"topology2" {(ExecutorDetails. 1 1) worker-slot-with-multiple-assignments})
                                 "topology3" (SchedulerAssignmentImpl. 
"topology3" {(ExecutorDetails. 2 2) worker-slot-with-multiple-assignments})}
@@ -761,11 +754,10 @@
     (let [supers (gen-supervisors 1)
           port-not-reported-by-supervisor 6
           topology1 (TopologyDetails. "topology1"
-                      {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userA"}
+                      {TOPOLOGY-NAME "topology-name-1"}
                       (StormTopology.)
                       1
-                      (mk-ed-map [["spout11" 0 1]]))
+                      (mk-ed-map [["spout11" 0 1]]) "userA")
           existing-assignments {"topology1"
                                 (SchedulerAssignmentImpl. "topology1"
                                   {(ExecutorDetails. 0 0) (WorkerSlot. 
"super0" port-not-reported-by-supervisor)})}
@@ -787,19 +779,17 @@
           dead-supervisor "super1"
           port-not-reported-by-supervisor 6
           topology1 (TopologyDetails. "topology1"
-                      {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userA"}
+                      {TOPOLOGY-NAME "topology-name-1"}
                       (StormTopology.)
                       2
                       (mk-ed-map [["spout11" 0 1]
-                                  ["bolt12" 1 2]]))
+                                  ["bolt12" 1 2]]) "userA")
           topology2 (TopologyDetails. "topology2"
-                      {TOPOLOGY-NAME "topology-name-2"
-                       TOPOLOGY-SUBMITTER-USER "userA"}
+                      {TOPOLOGY-NAME "topology-name-2"}
                       (StormTopology.)
                       2
                       (mk-ed-map [["spout21" 4 5]
-                                  ["bolt22" 5 6]]))
+                                  ["bolt22" 5 6]]) "userA")
           worker-slot-with-multiple-assignments (WorkerSlot. dead-supervisor 1)
           existing-assignments {"topology1"
                                 (SchedulerAssignmentImpl. "topology1"
@@ -837,11 +827,10 @@
         supers {"super1" super1 "super2" super2}
         topology1 (TopologyDetails. "topology1"
                     {TOPOLOGY-NAME "topology-name-1"
-                     TOPOLOGY-SUBMITTER-USER "userA"
                      TOPOLOGY-ISOLATED-MACHINES 1}
                     (StormTopology.)
                     7
-                    (mk-ed-map [["spout21" 0 7]]))
+                    (mk-ed-map [["spout21" 0 7]]) "userA")
         existing-assignments {"topology1"
                               (SchedulerAssignmentImpl. "topology1"
                                 {(ExecutorDetails. 0 0) (WorkerSlot. "super1" 
1)

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-core/test/clj/org/apache/storm/scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/scheduler_test.clj 
b/storm-core/test/clj/org/apache/storm/scheduler_test.clj
index 1c479a7..acd1f9f 100644
--- a/storm-core/test/clj/org/apache/storm/scheduler_test.clj
+++ b/storm-core/test/clj/org/apache/storm/scheduler_test.clj
@@ -69,13 +69,13 @@
         executor2 (ExecutorDetails. (int 6) (int 10))
         topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME 
"topology-name-1"} (StormTopology.) 1
                                    {executor1 "spout1"
-                                    executor2 "bolt1"})
+                                    executor2 "bolt1"} "user")
         ;; test topology.selectExecutorToComponent
         executor->comp (.selectExecutorToComponent topology1 (list executor1))
         _ (is (= (clojurify-executor->comp {executor1 "spout1"})
                  (clojurify-executor->comp executor->comp)))
         ;; test topologies.getById
-        topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME 
"topology-name-2"} (StormTopology.) 1 {})
+        topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME 
"topology-name-2"} (StormTopology.) 1 {} "user")
         topologies (Topologies. {"topology1" topology1 "topology2" topology2})
         _ (is (= "topology1" (->> "topology1"
                                   (.getById topologies)
@@ -104,19 +104,19 @@
                                     2
                                     {executor1 "spout1"
                                      executor2 "bolt1"
-                                     executor3 "bolt2"})
+                                     executor3 "bolt2"} "user")
         ;; topology2 is fully scheduled
         topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME 
"topology-name-2"}
                                     (StormTopology.)
                                     2
                                     {executor11 "spout11"
-                                     executor12 "bolt12"})
+                                     executor12 "bolt12"} "user")
         ;; topology3 needs scheduling, since the assignment is squeezed
         topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME 
"topology-name-3"}
                                     (StormTopology.)
                                     2
                                     {executor21 "spout21"
-                                     executor22 "bolt22"})
+                                     executor22 "bolt22"} "user")
         topologies (Topologies. {"topology1" topology1 "topology2" topology2 
"topology3" topology3})
         executor->slot1 {executor1 (WorkerSlot. "supervisor1" (int 1))
                          executor2 (WorkerSlot. "supervisor2" (int 2))}

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-core/test/jvm/org/apache/storm/MockAutoCred.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/MockAutoCred.java 
b/storm-core/test/jvm/org/apache/storm/MockAutoCred.java
index 457ac50..1a746ca 100644
--- a/storm-core/test/jvm/org/apache/storm/MockAutoCred.java
+++ b/storm-core/test/jvm/org/apache/storm/MockAutoCred.java
@@ -42,7 +42,7 @@ public class MockAutoCred implements INimbusCredentialPlugin, 
IAutoCredentials,
     }
 
     @Override
-    public void populateCredentials(Map<String, String> credentials, 
Map<String, Object> conf) {
+    public void populateCredentials(Map<String, String> credentials, 
Map<String, Object> topoConf) {
         credentials.put(NIMBUS_CRED_KEY, NIMBUS_CRED_VAL);
     }
 
@@ -58,7 +58,7 @@ public class MockAutoCred implements INimbusCredentialPlugin, 
IAutoCredentials,
     }
 
     @Override
-    public void renew(Map<String, String> credentials, Map<String, Object> 
topologyConf) {
+    public void renew(Map<String, String> credentials, Map<String, Object> 
topologyConf, String ownerPrincipal) {
         credentials.put(NIMBUS_CRED_KEY, NIMBUS_CRED_RENEW_VAL);
         credentials.put(GATEWAY_CRED_KEY, GATEWAY_CRED_RENEW_VAL);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 31dc992..69cd649 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -1333,6 +1333,10 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         BlobStore store = blobStore;
         Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, store);
         StormTopology topo = readStormTopologyAsNimbus(topoId, store);
+        if (!base.is_set_principal()) {
+            fixupBase(base, topoConf);
+            stormClusterState.updateStorm(topoId, base);
+        }
         Map<List<Integer>, String> rawExecToComponent = 
computeExecutorToComponent(topoId, base);
         Map<ExecutorDetails, String> executorsToComponent = new HashMap<>();
         for (Entry<List<Integer>, String> entry: 
rawExecToComponent.entrySet()) {
@@ -1341,19 +1345,21 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             executorsToComponent.put(execDetails, entry.getValue());
         }
         
-        return new TopologyDetails(topoId, topoConf, topo, 
base.get_num_workers(), executorsToComponent, base.get_launch_time_secs());
+        return new TopologyDetails(topoId, topoConf, topo, 
base.get_num_workers(), executorsToComponent, base.get_launch_time_secs(),
+            base.get_owner());
     }
     
     private void updateHeartbeats(String topoId, Set<List<Integer>> 
allExecutors, Assignment existingAssignment) {
         LOG.debug("Updating heartbeats for {} {}", topoId, allExecutors);
         IStormClusterState state = stormClusterState;
         Map<List<Integer>, Map<String, Object>> executorBeats = 
StatsUtil.convertExecutorBeats(state.executorBeats(topoId, 
existingAssignment.get_executor_node_port()));
-        Map<List<Integer>, Map<String, Object>> cache = 
StatsUtil.updateHeartbeatCache(heartbeatsCache.get().get(topoId), 
executorBeats, allExecutors, 
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
+        Map<List<Integer>, Map<String, Object>> cache = 
StatsUtil.updateHeartbeatCache(heartbeatsCache.get().get(topoId),
+            executorBeats, allExecutors, 
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
         heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache));
     }
     
     /**
-     * update all the heartbeats for all the topologies' executors
+     * Update all the heartbeats for all the topologies' executors.
      * @param existingAssignments current assignments (thrift)
      * @param topologyToExecutors topology ID to executors.
      */
@@ -1369,8 +1375,6 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         Map<List<Integer>, Map<String, Object>> hbCache = 
heartbeatsCache.get().get(topoId);
         LOG.debug("NEW  Computing alive executors for {}\nExecutors: 
{}\nAssignment: {}\nHeartbeat cache: {}",
                 topoId, allExecutors, assignment, hbCache);
-        //TODO need to consider all executors associated with a dead executor 
(in same slot) dead as well,
-        // don't just rely on heartbeat being the same
         
         int taskLaunchSecs = 
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_LAUNCH_SECS));
         Set<List<Integer>> ret = new HashSet<>();
@@ -1759,7 +1763,15 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             // we exclude its assignment, meaning that all the slots occupied 
by its assignment
             // will be treated as free slot in the scheduler code.
             if (!id.equals(scratchTopoId)) {
-                existingAssignments.put(id, state.assignmentInfo(id, null));
+                Assignment currentAssignment = state.assignmentInfo(id, null);
+                if (!currentAssignment.is_set_owner()) {
+                    TopologyDetails td = tds.get(id);
+                    if (td != null) {
+                        currentAssignment.set_owner(td.getTopologySubmitter());
+                        state.setAssignment(id, currentAssignment);
+                    }
+                }
+                existingAssignments.put(id, currentAssignment);
             }
         }
         // make the new assignments for topologies
@@ -1842,6 +1854,8 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                     workerResources.put(ni, resources);
                 }
                 newAssignment.set_worker_resources(workerResources);
+                TopologyDetails td = tds.get(topoId);
+                newAssignment.set_owner(td.getTopologySubmitter());
                 newAssignments.put(topoId, newAssignment);
             }
 
@@ -1894,7 +1908,13 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         }
     }
 
-    private void startTopology(String topoName, String topoId, TopologyStatus 
initStatus) throws KeyNotFoundException, AuthorizationException, IOException, 
InvalidTopologyException {
+    private void fixupBase(StormBase base, Map<String, Object> topoConf) {
+        base.set_owner((String) topoConf.get(Config.TOPOLOGY_SUBMITTER_USER));
+        base.set_principal((String) 
topoConf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL));
+    }
+
+    private void startTopology(String topoName, String topoId, TopologyStatus 
initStatus, String owner, String principal)
+        throws KeyNotFoundException, AuthorizationException, IOException, 
InvalidTopologyException {
         assert(TopologyStatus.ACTIVE == initStatus || TopologyStatus.INACTIVE 
== initStatus);
         IStormClusterState state = stormClusterState;
         BlobStore store = blobStore;
@@ -1911,7 +1931,8 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         base.set_status(initStatus);
         
base.set_num_workers(ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_WORKERS), 
0));
         base.set_component_executors(numExecutors);
-        base.set_owner((String) topoConf.get(Config.TOPOLOGY_SUBMITTER_USER));
+        base.set_owner(owner);
+        base.set_principal(principal);
         base.set_component_debug(new HashMap<>());
         state.activateStorm(topoId, base);
         notifyTopologyActionListener(topoName, "activate");
@@ -2162,9 +2183,11 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         BlobStore store = blobStore;
         Collection<ICredentialsRenewer> renewers = credRenewers;
         Object lock = credUpdateLock;
-        List<String> assignedIds = state.activeStorms();
-        if (assignedIds != null) {
-            for (String id: assignedIds) {
+        Map<String, StormBase> assignedBases = state.topologyBases();
+        if (assignedBases != null) {
+            for (Entry<String, StormBase> entry: assignedBases.entrySet()) {
+                String id = entry.getKey();
+                String ownerPrincipal = entry.getValue().get_principal();
                 Map<String, Object> topoConf = 
Collections.unmodifiableMap(merge(conf, tryReadTopoConf(id, store)));
                 synchronized(lock) {
                     Credentials origCreds = state.credentials(id, null);
@@ -2172,8 +2195,8 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                         Map<String, String> origCredsMap = 
origCreds.get_creds();
                         Map<String, String> newCredsMap = new 
HashMap<>(origCredsMap);
                         for (ICredentialsRenewer renewer: renewers) {
-                            LOG.info("Renewing Creds For {} with {}", id, 
renewer);
-                            renewer.renew(newCredsMap, topoConf);
+                            LOG.info("Renewing Creds For {} with {} owned by 
{}", id, renewer, ownerPrincipal);
+                            renewer.renew(newCredsMap, topoConf, 
ownerPrincipal);
                         }
                         if (!newCredsMap.equals(origCredsMap)) {
                             state.setCredentials(id, new 
Credentials(newCredsMap), topoConf);
@@ -2340,7 +2363,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             ret.launchTimeSecs = 0;
         }
         ret.assignment = state.assignmentInfo(topoId, null);
-        ret.beats = Utils.OR(heartbeatsCache.get().get(topoId), 
Collections.<List<Integer>, Map<String, Object>>emptyMap());
+        ret.beats = Utils.OR(heartbeatsCache.get().get(topoId), 
Collections.emptyMap());
         ret.allComponents = new HashSet<>(ret.taskToComponent.values());
         return ret;
     }
@@ -2525,9 +2548,11 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             Set<String> topoAcl = new 
HashSet<>((List<String>)topoConf.getOrDefault(Config.TOPOLOGY_USERS, 
Collections.emptyList()));
             topoAcl.add(submitterPrincipal);
             topoAcl.add(submitterUser);
-            
-            topoConf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, 
Utils.OR(submitterPrincipal, ""));
-            topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, 
Utils.OR(submitterUser, systemUser)); //Don't let the user set who we launch as
+
+            String topologyPrincipal = Utils.OR(submitterPrincipal, "");
+            topoConf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, 
topologyPrincipal);
+            String topologyOwner = Utils.OR(submitterUser, systemUser);
+            topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, topologyOwner); 
//Don't let the user set who we launch as
             topoConf.put(Config.TOPOLOGY_USERS, new ArrayList<>(topoAcl));
             topoConf.put(Config.STORM_ZOOKEEPER_SUPERACL, 
conf.get(Config.STORM_ZOOKEEPER_SUPERACL));
             if (!Utils.isZkAuthenticationConfiguredStormServer(conf)) {
@@ -2605,7 +2630,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                         throw new IllegalArgumentException("Inital Status of " 
+ options.get_initial_status() + " is not allowed.");
                             
                 }
-                startTopology(topoName, topoId, status);
+                startTopology(topoName, topoId, status, topologyOwner, 
topologyPrincipal);
             }
         } catch (Exception e) {
             LOG.warn("Topology submission exception. (topology name='{}')", 
topoName, e);

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
index f5eed43..9253169 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -402,7 +402,7 @@ public class BasicContainer extends Container {
      * Compute the classpath for the worker process
      * @param stormJar the topology jar
      * @param dependencyLocations any dependencies from the topology
-     * @param stormVerison the version of the storm framework to use
+     * @param topoVersion the version of the storm framework to use
      * @return the full classpath
      */
     protected String getWorkerClassPath(String stormJar, List<String> 
dependencyLocations, SimpleVersion topoVersion) {

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
index f878507..bfd55da 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
@@ -321,7 +321,7 @@ public abstract class Container implements Killable {
         File workerArtifacts = new File(ConfigUtils.workerArtifactsRoot(_conf, 
_topologyId, _port));
         if (!_ops.fileExists(workerArtifacts)) {
             _ops.forceMkdir(workerArtifacts);
-            _ops.setupWorkerArtifactsDir(_topoConf, workerArtifacts);
+            _ops.setupWorkerArtifactsDir(_assignment.get_owner(), 
workerArtifacts);
         }
     
         String user = getWorkerUser();
@@ -472,8 +472,8 @@ public abstract class Container implements Killable {
 
         if (_ops.fileExists(file)) {
             return _ops.slurpString(file).trim();
-        } else if (_topoConf != null) { 
-            return (String) _topoConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+        } else if (_assignment != null && _assignment.is_set_owner()) {
+            return _assignment.get_owner();
         }
         if (ConfigUtils.isLocalMode(_conf)) {
             return System.getProperty("user.name");

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index 8fdd47c..6acc4b5 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -276,6 +276,9 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
                             if (slotsResources.containsKey(port)) {
                                 
localAssignment.set_resources(slotsResources.get(port));
                             }
+                            if (assignment.is_set_owner()) {
+                                
localAssignment.set_owner(assignment.get_owner());
+                            }
                             portTasks.put(port.intValue(), localAssignment);
                         }
                         List<ExecutorInfo> executorInfoList = 
localAssignment.get_executors();

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index cc4a387..1f8d4c3 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -211,8 +211,20 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
         this.readState = new ReadClusterState(this);
         
         Set<String> downloadedTopoIds = 
SupervisorUtils.readDownloadedTopologyIds(conf);
-        for (String topoId : downloadedTopoIds) {
-            SupervisorUtils.addBlobReferences(localizer, topoId, conf);
+        Map<Integer, LocalAssignment> portToAssignments = 
localState.getLocalAssignmentsMap();
+        if (portToAssignments != null) {
+            Map<String, LocalAssignment> assignments = new HashMap<>();
+            for (LocalAssignment la : 
localState.getLocalAssignmentsMap().values()) {
+                assignments.put(la.get_topology_id(), la);
+            }
+            for (String topoId : downloadedTopoIds) {
+                LocalAssignment la = assignments.get(topoId);
+                if (la != null) {
+                    SupervisorUtils.addBlobReferences(localizer, topoId, conf, 
la.get_owner());
+                } else {
+                    LOG.warn("Could not find an owner for topo {}", topoId);
+                }
+            }
         }
         // do this after adding the references so we don't try to clean things 
being used
         localizer.startCleaner();

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 8582fb4..09c2b5d 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -19,6 +19,7 @@ package org.apache.storm.daemon.supervisor;
 
 import org.apache.storm.Config;
 import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.localizer.LocalResource;
 import org.apache.storm.localizer.Localizer;
 import org.apache.storm.utils.ConfigUtils;
@@ -95,16 +96,16 @@ public class SupervisorUtils {
     }
 
     /**
-     * For each of the downloaded topologies, adds references to the blobs 
that the topologies are using. This is used to reconstruct the cache on restart.
+     * For each of the downloaded topologies, adds references to the blobs 
that the topologies are using. This is used to reconstruct the
+     * cache on restart.
      * 
      * @param localizer
      * @param stormId
      * @param conf
      */
-    static void addBlobReferences(Localizer localizer, String stormId, 
Map<String, Object> conf) throws IOException {
+    static void addBlobReferences(Localizer localizer, String stormId, 
Map<String, Object> conf, String user) throws IOException {
         Map<String, Object> topoConf = 
ConfigUtils.readSupervisorStormConf(conf, stormId);
         Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-        String user = (String) topoConf.get(Config.TOPOLOGY_SUBMITTER_USER);
         String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
         List<LocalResource> localresources = 
SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
         if (blobstoreMap != null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
index af864e8..d0baf90 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.daemon.supervisor.timer;
 
+import java.util.HashMap;
 import org.apache.storm.Config;
 import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.daemon.supervisor.SupervisorUtils;
@@ -59,15 +60,16 @@ public class UpdateBlobs implements Runnable {
             Map<String, Object> conf = supervisor.getConf();
             Set<String> downloadedStormIds = 
SupervisorUtils.readDownloadedTopologyIds(conf);
             AtomicReference<Map<Long, LocalAssignment>> newAssignment = 
supervisor.getCurrAssignment();
-            Set<String> assignedStormIds = new HashSet<>();
+            Map<String, LocalAssignment> assignedStormIds = new HashMap<>();
             for (LocalAssignment localAssignment : 
newAssignment.get().values()) {
-                assignedStormIds.add(localAssignment.get_topology_id());
+                assignedStormIds.put(localAssignment.get_topology_id(), 
localAssignment);
             }
             for (String stormId : downloadedStormIds) {
-                if (assignedStormIds.contains(stormId)) {
+                LocalAssignment la = assignedStormIds.get(stormId);
+                if (la != null) {
                     String stormRoot = 
ConfigUtils.supervisorStormDistRoot(conf, stormId);
                     LOG.debug("Checking Blob updates for storm topology id {} 
With target_dir: {}", stormId, stormRoot);
-                    updateBlobsForTopology(conf, stormId, 
supervisor.getLocalizer());
+                    updateBlobsForTopology(conf, stormId, 
supervisor.getLocalizer(), la.get_owner());
                 }
             }
         } catch (Exception e) {
@@ -89,10 +91,9 @@ public class UpdateBlobs implements Runnable {
      * @param localizer
      * @throws IOException
      */
-    private void updateBlobsForTopology(Map<String, Object> conf, String 
stormId, Localizer localizer) throws IOException {
+    private void updateBlobsForTopology(Map<String, Object> conf, String 
stormId, Localizer localizer, String user) throws IOException {
         Map<String, Object> topoConf = 
ConfigUtils.readSupervisorStormConf(conf, stormId);
         Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-        String user = (String) topoConf.get(Config.TOPOLOGY_SUBMITTER_USER);
         List<LocalResource> localresources = 
SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
         try {
             localizer.updateBlobs(localresources, user);

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java 
b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 038114a..9dc473f 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -101,10 +101,12 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
     private class DownloadBaseBlobsDistributed implements Callable<Void> {
         protected final String _topologyId;
         protected final File _stormRoot;
-        
-        public DownloadBaseBlobsDistributed(String topologyId) throws 
IOException {
+        protected final String owner;
+
+        public DownloadBaseBlobsDistributed(String topologyId, String owner) 
throws IOException {
             _topologyId = topologyId;
             _stormRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf, 
_topologyId));
+            this.owner = owner;
         }
         
         protected void downloadBaseBlobs(File tmproot) throws Exception {
@@ -145,7 +147,7 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
                 try {
                     downloadBaseBlobs(tr);
                     _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
-                    
_fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, 
_topologyId), _stormRoot);
+                    _fsOps.setupStormCodeDir(owner, _stormRoot);
                     deleteAll = false;
                 } finally {
                     if (deleteAll) {
@@ -164,8 +166,8 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
     
     private class DownloadBaseBlobsLocal extends DownloadBaseBlobsDistributed {
 
-        public DownloadBaseBlobsLocal(String topologyId) throws IOException {
-            super(topologyId);
+        public DownloadBaseBlobsLocal(String topologyId, String owner) throws 
IOException {
+            super(topologyId, owner);
         }
         
         @Override
@@ -209,9 +211,11 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
     
     private class DownloadBlobs implements Callable<Void> {
         private final String _topologyId;
+        private final String topoOwner;
 
-        public DownloadBlobs(String topologyId) {
+        public DownloadBlobs(String topologyId, String topoOwner) {
             _topologyId = topologyId;
+            this.topoOwner = topoOwner;
         }
 
         @Override
@@ -222,7 +226,6 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
 
                 @SuppressWarnings("unchecked")
                 Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-                String user = (String) 
topoConf.get(Config.TOPOLOGY_SUBMITTER_USER);
                 String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
 
                 List<LocalResource> localResourceList = new ArrayList<>();
@@ -246,12 +249,12 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
                 }
 
                 if (!localResourceList.isEmpty()) {
-                    File userDir = _localizer.getLocalUserFileCacheDir(user);
+                    File userDir = 
_localizer.getLocalUserFileCacheDir(topoOwner);
                     if (!_fsOps.fileExists(userDir)) {
                         _fsOps.forceMkdir(userDir);
                     }
-                    List<LocalizedResource> localizedResources = 
_localizer.getBlobs(localResourceList, user, topoName, userDir);
-                    _fsOps.setupBlobPermissions(userDir, user);
+                    List<LocalizedResource> localizedResources = 
_localizer.getBlobs(localResourceList, topoOwner, topoName, userDir);
+                    _fsOps.setupBlobPermissions(userDir, topoOwner);
                     if (!_symlinksDisabled) {
                         for (LocalizedResource localizedResource : 
localizedResources) {
                             String keyName = localizedResource.getKey();
@@ -309,9 +312,9 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
         if (localResource == null) {
             Callable<Void> c;
             if (_isLocalMode) {
-                c = new DownloadBaseBlobsLocal(topologyId);
+                c = new DownloadBaseBlobsLocal(topologyId, 
assignment.get_owner());
             } else {
-                c = new DownloadBaseBlobsDistributed(topologyId);
+                c = new DownloadBaseBlobsDistributed(topologyId, 
assignment.get_owner());
             }
             localResource = new 
LocalDownloadedResource(_execService.submit(c));
             _basicPending.put(topologyId, localResource);
@@ -362,7 +365,7 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
         final String topologyId = assignment.get_topology_id();
         LocalDownloadedResource localResource = _blobPending.get(topologyId);
         if (localResource == null) {
-            Callable<Void> c = new DownloadBlobs(topologyId);
+            Callable<Void> c = new DownloadBlobs(topologyId, 
assignment.get_owner());
             localResource = new 
LocalDownloadedResource(_execService.submit(c));
             _blobPending.put(topologyId, localResource);
         }
@@ -385,7 +388,7 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
             @SuppressWarnings("unchecked")
             Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
             if (blobstoreMap != null) {
-                String user = (String) 
topoConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+                String user = assignment.get_owner();
                 String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
                 
                 for (Map.Entry<String, Map<String, Object>> entry : 
blobstoreMap.entrySet()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
index e0f57cd..95cbf63 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
@@ -85,7 +85,7 @@ public class MultitenantScheduler implements IScheduler {
     defaultPool.init(cluster, nodeIdToNode);
     
     for (TopologyDetails td: topologies.getTopologies()) {
-      String user = (String)td.getConf().get(Config.TOPOLOGY_SUBMITTER_USER);
+      String user = td.getTopologySubmitter();
       LOG.debug("Found top {} run by user {}",td.getId(), user);
       NodePool pool = userPools.get(user);
       if (pool == null || !pool.canAdd(td)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/test/java/org/apache/storm/MessagingTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/MessagingTest.java 
b/storm-server/src/test/java/org/apache/storm/MessagingTest.java
index 5686bc3..b0b6625 100644
--- a/storm-server/src/test/java/org/apache/storm/MessagingTest.java
+++ b/storm-server/src/test/java/org/apache/storm/MessagingTest.java
@@ -62,7 +62,4 @@ public class MessagingTest {
             Assert.assertEquals(6 * 4, Testing.readTuples(results, 
"2").size());
         }
     }
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
 
b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
index 2159b4a..35c5790 100644
--- 
a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
+++ 
b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
@@ -162,8 +162,7 @@ public class ContainerTest {
         
         final List<String> topoGroups = Arrays.asList("t-group-a", 
"t-group-b");
         final List<String> logGroups = Arrays.asList("l-group-a", "l-group-b");
-        
-        topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
+
         topoConf.put(DaemonConfig.LOGS_GROUPS, logGroups);
         topoConf.put(Config.TOPOLOGY_GROUPS, topoGroups);
         topoConf.put(DaemonConfig.LOGS_USERS, logUsers);
@@ -183,6 +182,7 @@ public class ContainerTest {
         
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
+        la.set_owner(user);
         MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, 
                 "SUPERVISOR", 8080, la, null, workerId, topoConf, ops);
         
@@ -235,7 +235,6 @@ public class ContainerTest {
         final File workerPidsRoot = new File(workerRoot, "pids");
         
         final Map<String, Object> topoConf = new HashMap<>();
-        topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
         
         final Map<String, Object> superConf = new HashMap<>();
         superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
@@ -252,6 +251,7 @@ public class ContainerTest {
         ResourceIsolationInterface iso = 
mock(ResourceIsolationInterface.class);
         
         LocalAssignment la = new LocalAssignment();
+        la.set_owner(user);
         la.set_topology_id(topoId);
         MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, 
                 "SUPERVISOR", port, la, iso, workerId, topoConf, ops);

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java 
b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index c04cfbe..2461b33 100644
--- 
a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ 
b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -47,8 +47,10 @@ public class AsyncLocalizerTest {
     @Test
     public void testRequestDownloadBaseTopologyBlobs() throws Exception {
         final String topoId = "TOPO";
+        final String user = "user";
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
+        la.set_owner(user);
         ExecutorInfo ei = new ExecutorInfo();
         ei.set_task_start(1);
         ei.set_task_end(1);
@@ -96,7 +98,7 @@ public class AsyncLocalizerTest {
             //Extracting the dir from the jar
             verify(mockedU).extractDirFromJarImpl(endsWith("stormjar.jar"), 
eq("resources"), any(File.class));
             verify(ops).moveDirectoryPreferAtomic(any(File.class), 
eq(fStormRoot));
-            verify(ops).setupStormCodeDir(topoConf, fStormRoot);
+            verify(ops).setupStormCodeDir(user, fStormRoot);
             
             verify(ops, never()).deleteIfExists(any(File.class));
         } finally {
@@ -110,15 +112,16 @@ public class AsyncLocalizerTest {
     @Test
     public void testRequestDownloadTopologyBlobs() throws Exception {
         final String topoId = "TOPO-12345";
+        final String user = "user";
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
+        la.set_owner(user);
         ExecutorInfo ei = new ExecutorInfo();
         ei.set_task_start(1);
         ei.set_task_end(1);
         la.add_to_executors(ei);
         final String topoName = "TOPO";
         final int port = 8080;
-        final String user = "user";
         final String simpleLocalName = "simple.txt";
         final String simpleKey = "simple";
         
@@ -149,7 +152,6 @@ public class AsyncLocalizerTest {
 
         Map<String, Object> topoConf = new HashMap<>(conf);
         topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap);
-        topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
         topoConf.put(Config.TOPOLOGY_NAME, topoName);
         
         List<LocalizedResource> localizedList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index e81f7bf..65cad2d 100644
--- 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -80,7 +80,6 @@ public class TestResourceAwareScheduler {
         
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
0.0);
         defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 
8192.0);
         defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
-        defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
     }
 
     @Test
@@ -104,7 +103,8 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals(0, node.totalSlotsUsed());
         Assert.assertEquals(4, node.totalSlots());
 
-        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap<>(), 1, 
0, 2, 0, 0, 0);
+        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap<>(), 1, 
0, 2, 0, 0, 0,
+            "user");
 
         List<ExecutorDetails> executors11 = new ArrayList<>();
         executors11.add(new ExecutorDetails(1, 1));
@@ -124,7 +124,8 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals(2, node.totalSlotsUsed());
         Assert.assertEquals(4, node.totalSlots());
 
-        TopologyDetails topology2 = 
TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap<>(), 1, 
0, 2, 0, 0, 0);
+        TopologyDetails topology2 = 
TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap<>(), 1, 
0, 2, 0, 0, 0,
+            "user");
 
         List<ExecutorDetails> executors21 = new ArrayList<>();
         executors21.add(new ExecutorDetails(1, 1));
@@ -166,7 +167,7 @@ public class TestResourceAwareScheduler {
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
 
-        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 
0, 0);
+        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 
0, 0, "user");
         Map<String, TopologyDetails> topoMap = new HashMap<>();
         topoMap.put(topology1.getId(), topology1);
         Topologies topologies = new Topologies(topoMap);
@@ -209,14 +210,14 @@ public class TestResourceAwareScheduler {
         Config config = new Config();
         config.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 0, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 0, executorMap1, 0, "user");
 
         TopologyBuilder builder2 = new TopologyBuilder(); // a topology with 
two unconnected partitions
         builder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
         builder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
         StormTopology stormTopology2 = builder2.createTopology();
         Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2);
-        TopologyDetails topology2 = new TopologyDetails("topology2", config, 
stormTopology2, 0, executorMap2, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config, 
stormTopology2, 0, executorMap2, 0, "user");
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
@@ -272,7 +273,7 @@ public class TestResourceAwareScheduler {
         Config config = new Config();
         config.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 0, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 0, executorMap1, 0, "user");
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
@@ -320,7 +321,7 @@ public class TestResourceAwareScheduler {
         Config config = new Config();
         config.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 2, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 2, executorMap1, 0, "user");
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
@@ -410,7 +411,7 @@ public class TestResourceAwareScheduler {
         Config config1 = new Config();
         config1.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 3, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 3, executorMap1, 0, "user");
 
         TopologyBuilder builder2 = new TopologyBuilder();
         builder2.setSpout("wordSpout2", new TestWordSpout(), 2);
@@ -420,7 +421,7 @@ public class TestResourceAwareScheduler {
         // memory requirement is large enough so that two executors can not be 
fully assigned to one node
         config2.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
1280.0);
         Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2);
-        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 2, executorMap2, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 2, executorMap2, 0, "user");
 
         // Test1: When a worker fails, RAS does not alter existing assignments 
on healthy workers
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config1);
@@ -568,7 +569,7 @@ public class TestResourceAwareScheduler {
         Config config1 = new Config();
         config1.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 1, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 1, executorMap1, 0, "user");
 
         // topo2 has 4 large tasks
         TopologyBuilder builder2 = new TopologyBuilder();
@@ -577,7 +578,7 @@ public class TestResourceAwareScheduler {
         Config config2 = new Config();
         config2.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2);
-        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 1, executorMap2, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 1, executorMap2, 0, "user");
 
         // topo3 has 4 large tasks
         TopologyBuilder builder3 = new TopologyBuilder();
@@ -586,7 +587,7 @@ public class TestResourceAwareScheduler {
         Config config3 = new Config();
         config3.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap3 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology3);
-        TopologyDetails topology3 = new TopologyDetails("topology3", config2, 
stormTopology3, 1, executorMap3, 0);
+        TopologyDetails topology3 = new TopologyDetails("topology3", config2, 
stormTopology3, 1, executorMap3, 0, "user");
 
         // topo4 has 12 small tasks, whose mem usage does not exactly divide a 
node's mem capacity
         TopologyBuilder builder4 = new TopologyBuilder();
@@ -595,7 +596,7 @@ public class TestResourceAwareScheduler {
         Config config4 = new Config();
         config4.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap4 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology4);
-        TopologyDetails topology4 = new TopologyDetails("topology4", config4, 
stormTopology4, 1, executorMap4, 0);
+        TopologyDetails topology4 = new TopologyDetails("topology4", config4, 
stormTopology4, 1, executorMap4, 0, "user");
 
         // topo5 has 40 small tasks, it should be able to exactly use up both 
the cpu and mem in the cluster
         TopologyBuilder builder5 = new TopologyBuilder();
@@ -604,7 +605,7 @@ public class TestResourceAwareScheduler {
         Config config5 = new Config();
         config5.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap5 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology5);
-        TopologyDetails topology5 = new TopologyDetails("topology5", config5, 
stormTopology5, 1, executorMap5, 0);
+        TopologyDetails topology5 = new TopologyDetails("topology5", config5, 
stormTopology5, 1, executorMap5, 0, "user");
 
         // Test1: Launch topo 1-3 together, it should be able to use up either 
mem or cpu resource due to exact division
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config1);
@@ -692,7 +693,7 @@ public class TestResourceAwareScheduler {
         config1.putAll(defaultTopologyConf);
         config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 1, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 1, executorMap1, 0, "user");
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config1);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
         Map<String, TopologyDetails> topoMap = new HashMap<>();
@@ -714,7 +715,7 @@ public class TestResourceAwareScheduler {
         config2.putAll(defaultTopologyConf);
         config2.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
         Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2);
-        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 1, executorMap2, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 1, executorMap2, 0, "user");
         cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config2);
         topoMap = new HashMap<>();
         topoMap.put(topology2.getId(), topology2);
@@ -770,16 +771,17 @@ public class TestResourceAwareScheduler {
         config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
 
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 20);
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "bobby");
+        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "bobby");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -829,8 +831,6 @@ public class TestResourceAwareScheduler {
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
128.0);
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 0.0);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, TOPOLOGY_SUBMITTER);
-
         Map<String, Map<String, Number>> resourceUserPool = new 
HashMap<String, Map<String, Number>>();
         resourceUserPool.put("jerry", new HashMap<String, Number>());
         resourceUserPool.get("jerry").put("cpu", 1000);
@@ -846,11 +846,16 @@ public class TestResourceAwareScheduler {
 
         config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
 
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, 20);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8, 30);
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, 
currentTime - 16, 30);
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, 
currentTime - 16, 20);
-        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, 
currentTime - 24, 30);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2,
+            20, TOPOLOGY_SUBMITTER);
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8,
+            30, TOPOLOGY_SUBMITTER);
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, 
currentTime - 16,
+            30, TOPOLOGY_SUBMITTER);
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, 
currentTime - 16,
+            20, TOPOLOGY_SUBMITTER);
+        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, 
currentTime - 24,
+            30, TOPOLOGY_SUBMITTER);
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -895,7 +900,8 @@ public class TestResourceAwareScheduler {
         LOG.info("{} - {}", topo.getName(), queue);
         Assert.assertEquals("check order", topo.getName(), "topo-2");
 
-        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, 
currentTime - 30, 10);
+        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, 
currentTime - 30,
+            10, TOPOLOGY_SUBMITTER);
         topoMap.put(topo6.getId(), topo6);
 
         topologies = new Topologies(topoMap);
@@ -955,29 +961,38 @@ public class TestResourceAwareScheduler {
         config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, 20);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8, 29);
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, 
currentTime - 16, 29);
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, 
currentTime - 16, 20);
-        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, 
currentTime - 24, 29);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-
-        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, 
currentTime - 2, 20);
-        TopologyDetails topo7 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, 
currentTime - 8, 29);
-        TopologyDetails topo8 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, 
currentTime - 16, 29);
-        TopologyDetails topo9 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, 
currentTime - 16, 20);
-        TopologyDetails topo10 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, 
currentTime - 24, 29);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo11 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, 
currentTime - 2, 20);
-        TopologyDetails topo12 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, 
currentTime - 8, 29);
-        TopologyDetails topo13 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, 
currentTime - 16, 29);
-        TopologyDetails topo14 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, 
currentTime - 16, 20);
-        TopologyDetails topo15 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, 
currentTime - 24, 29);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8, 29,
+            "jerry");
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, 
currentTime - 16,
+            29, "jerry");
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, 
currentTime - 16,
+            20, "jerry");
+        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, 
currentTime - 24,
+            29, "jerry");
+
+        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, 
currentTime - 2, 20,
+            "bobby");
+        TopologyDetails topo7 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, 
currentTime - 8, 29,
+            "bobby");
+        TopologyDetails topo8 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, 
currentTime - 16,
+            29, "bobby");
+        TopologyDetails topo9 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, 
currentTime - 16,
+            20, "bobby");
+        TopologyDetails topo10 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, 
currentTime - 24,
+            29, "bobby");
+
+        TopologyDetails topo11 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, 
currentTime - 2,
+            20, "derek");
+        TopologyDetails topo12 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, 
currentTime - 8,
+            29, "derek");
+        TopologyDetails topo13 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, 
currentTime - 16,
+            29, "derek");
+        TopologyDetails topo14 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, 
currentTime - 16,
+            20, "derek");
+        TopologyDetails topo15 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, 
currentTime - 24,
+            29, "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1041,10 +1056,10 @@ public class TestResourceAwareScheduler {
         config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, 20);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8, 29);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8, 29,
+            "jerry");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1104,20 +1119,20 @@ public class TestResourceAwareScheduler {
         config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
 
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 10);
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "bobby");
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "bobby");
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 29);
-        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
currentTime - 2, 10);
+        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 29,
+            "derek");
+        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1229,8 +1244,10 @@ public class TestResourceAwareScheduler {
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, 
currentTime - 2, 29);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 2, 0, 
currentTime - 2, 10);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, 
currentTime - 2, 29,
+            "user");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 2, 0, 
currentTime - 2, 10,
+            "user");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1284,11 +1301,12 @@ public class TestResourceAwareScheduler {
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, 
currentTime - 2, 10);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 2, 0, 2, 0, 
currentTime - 2, 20);
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 2, 1, 1, 
currentTime - 2, 20);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, 
currentTime - 2, 10,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 2, 0, 2, 0, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 2, 1, 1, 
currentTime - 2, 20,
+            "jerry");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1344,13 +1362,10 @@ public class TestResourceAwareScheduler {
 
         StormTopology stormTopology = builder.createTopology();
         TopologyDetails topo = new TopologyDetails("topo-1", config, 
stormTopology,
-                0,
-                genExecsAndComps(stormTopology), 0);
+                0, genExecsAndComps(stormTopology), 0, "jerry");
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo.getId(), topo);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUser.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUser.java 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUser.java
index 73c5d2c..b36d83d 100644
--- 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUser.java
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUser.java
@@ -42,7 +42,7 @@ public class TestUser {
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
 
-        List<TopologyDetails> topos = 
TestUtilsForResourceAwareScheduler.getListOfTopologies(config);
+        List<TopologyDetails> topos = 
TestUtilsForResourceAwareScheduler.getListOfTopologies(config, "user1");
         User user1 = new User("user1");
 
         for (TopologyDetails topo : topos) {
@@ -64,7 +64,7 @@ public class TestUser {
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
 
-        List<TopologyDetails> topos = 
TestUtilsForResourceAwareScheduler.getListOfTopologies(config);
+        List<TopologyDetails> topos = 
TestUtilsForResourceAwareScheduler.getListOfTopologies(config, "user1");
         User user1 = new User("user1");
 
         for (TopologyDetails topo : topos) {
@@ -95,7 +95,8 @@ public class TestUser {
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 200);
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 200);
 
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 1, 2, 1, 
Time.currentTimeSecs() - 24, 9);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 1, 2, 1,
+            Time.currentTimeSecs() - 24, 9, "user1");
 
         User user1 = new User("user1", resourceGuaranteeMap);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index ba17ce3..beca679 100644
--- 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -63,20 +63,20 @@ public class TestUtilsForResourceAwareScheduler {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class);
 
-    public static List<TopologyDetails> getListOfTopologies(Config config) {
-
-        List<TopologyDetails> topos = new LinkedList<TopologyDetails>();
-
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", 
config, 5, 15, 1, 1, currentTime - 2, 20));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", 
config, 5, 15, 1, 1, currentTime - 8, 30));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", 
config, 5, 15, 1, 1, currentTime - 16, 30));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", 
config, 5, 15, 1, 1, currentTime - 16, 20));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", 
config, 5, 15, 1, 1, currentTime - 24, 30));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", 
config, 5, 15, 1, 1, currentTime - 2, 0));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", 
config, 5, 15, 1, 1, currentTime - 8, 0));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", 
config, 5, 15, 1, 1, currentTime - 16, 15));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", 
config, 5, 15, 1, 1, currentTime - 16, 8));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", 
config, 5, 15, 1, 1, currentTime - 24, 9));
+    public static List<TopologyDetails> getListOfTopologies(Config config, 
String user) {
+
+        List<TopologyDetails> topos = new LinkedList<>();
+
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", 
config, 5, 15, 1, 1, currentTime - 2, 20, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", 
config, 5, 15, 1, 1, currentTime - 8, 30, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", 
config, 5, 15, 1, 1, currentTime - 16, 30, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", 
config, 5, 15, 1, 1, currentTime - 16, 20, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", 
config, 5, 15, 1, 1, currentTime - 24, 30, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", 
config, 5, 15, 1, 1, currentTime - 2, 0, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", 
config, 5, 15, 1, 1, currentTime - 8, 0, user ));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", 
config, 5, 15, 1, 1, currentTime - 16, 15, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", 
config, 5, 15, 1, 1, currentTime - 16, 8, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", 
config, 5, 15, 1, 1, currentTime - 24, 9, user));
         return topos;
     }
 
@@ -141,7 +141,7 @@ public class TestUtilsForResourceAwareScheduler {
     }
 
     public static TopologyDetails getTopology(String name, Map<String, Object> 
config, int numSpout, int numBolt,
-                                              int spoutParallelism, int 
boltParallelism, int launchTime, int priority) {
+                                              int spoutParallelism, int 
boltParallelism, int launchTime, int priority, String owner) {
 
         Config conf = new Config();
         conf.putAll(config);
@@ -151,7 +151,7 @@ public class TestUtilsForResourceAwareScheduler {
         StormTopology topology = buildTopology(numSpout, numBolt, 
spoutParallelism, boltParallelism);
         TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, 
conf, topology,
                 0,
-                genExecsAndComps(topology), launchTime);
+                genExecsAndComps(topology), launchTime, owner);
         return topo;
     }
 

Reply via email to