http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java 
b/storm-core/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java
index 4de0cfa..bbb1e6c 100644
--- a/storm-core/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java
+++ b/storm-core/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java
@@ -20,6 +20,7 @@ package org.apache.storm.security;
 import org.apache.storm.daemon.Shutdownable;
 
 import java.util.Map;
+import org.apache.storm.generated.StormTopology;
 
 /**
  * Nimbus auto credential plugin that will be called on nimbus host
@@ -29,8 +30,8 @@ import java.util.Map;
 public interface INimbusCredentialPlugin extends Shutdownable {
 
     /**
-     * this method will be called when nimbus initializes.
-     * @param conf
+     * This method will be called when nimbus initializes.
+     * @param conf the cluster config
      */
     void prepare(Map conf);
 
@@ -39,9 +40,22 @@ public interface INimbusCredentialPlugin extends 
Shutdownable {
      * at least once during the submit Topology action. It will be not be 
called during activate instead
      * the credentials return by this method will be merged with the other 
credentials in the topology
      * and stored in zookeeper.
+     * NOTE: THIS METHOD WILL BE CALLED THROUGH REFLECTION.  Existing compiled 
implementations will still
+     * work but new implementations will not compile.  A NOOP implementation 
can be added to make it compile.
      * @param credentials credentials map where more credentials will be added.
-     * @param conf topology configuration
-     * @return
+     * @param topoConf topology configuration
      */
-    void populateCredentials(Map<String, String> credentials, Map conf);
+    @Deprecated
+    void populateCredentials(Map<String, String> credentials, Map topoConf);
+
+    /**
+     * Method that will be called on nimbus as part of submit topology. This 
plugin will be called
+     * at least once during the submit Topology action. It will be not be 
called during activate instead
+     * the credentials return by this method will be merged with the other 
credentials in the topology
+     * and stored in zookeeper.
+     * @param credentials credentials map where more credentials will be added.
+     * @param topoConf topology configuration
+     * @param topologyOwnerPrincipal the full principal name of the owner of 
the topology
+     */
+    void populateCredentials(Map<String, String> credentials, Map<String, 
Object> topoConf, final String topologyOwnerPrincipal);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java 
b/storm-core/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
index 1f947aa..43690b3 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
@@ -35,6 +35,20 @@ public interface ICredentialsRenewer {
      * Renew any credentials that need to be renewed. (Update the credentials 
if needed)
      * @param credentials the credentials that may have something to renew.
      * @param topologyConf topology configuration.
-     */ 
-    public void renew(Map<String, String> credentials, Map topologyConf);
+     * @param topologyOwnerPrincipal the full principal name of the owner of 
the topology
+     */
+    @SuppressWarnings("deprecation")
+    void renew(Map<String, String> credentials, Map<String, Object> 
topologyConf, String topologyOwnerPrincipal);
+
+    /**
+     * Renew any credentials that need to be renewed. (Update the credentials 
if needed)
+     * NOTE: THIS WILL BE CALLED THROUGH REFLECTION.  So if the newer renew 
exists it will be called instead,
+     * but if it does not exist this will be called.  That means that this is 
binary compatible but not source
+     * compatible with older version.  To make the compilation work this can 
become a noop when the new API
+     * is implemented.
+     * @param credentials the credentials that may have something to renew.
+     * @param topologyConf topology configuration.
+     */
+    @Deprecated
+    void renew(Map<String, String>  credentials, Map topologyConf);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java 
b/storm-core/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
index d02c4e3..3d78d48 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
@@ -202,7 +202,7 @@ public class AutoTGT implements IAutoCredentials, 
ICredentialsRenewer {
     }
 
     @Override
-    public void renew(Map<String,String> credentials, Map topologyConf) {
+    public void renew(Map<String,String> credentials, Map<String, Object> 
topologyConf, String topologyOwnerPrincipal) {
         KerberosTicket tgt = getTGT(credentials);
         if (tgt != null) {
             long refreshTime = getRefreshTime(tgt);
@@ -219,6 +219,10 @@ public class AutoTGT implements IAutoCredentials, 
ICredentialsRenewer {
         }
     }
 
+    public void renew(Map<String, String> credentials, Map topologyConf) {
+        throw new IllegalStateException("SHOULD NOT BE CALLED");
+    }
+
     public static void main(String[] args) throws Exception {
         AutoTGT at = new AutoTGT();
         Map conf = new java.util.HashMap();

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py 
b/storm-core/src/py/storm/ttypes.py
index 8262285..8091910 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -9113,6 +9113,7 @@ class Assignment:
    - executor_node_port
    - executor_start_time_secs
    - worker_resources
+   - owner
   """
 
   thrift_spec = (
@@ -9126,9 +9127,11 @@ class Assignment:
     }, ), # 4
     (5, TType.MAP, 'worker_resources', (TType.STRUCT,(NodeInfo, 
NodeInfo.thrift_spec),TType.STRUCT,(WorkerResources, 
WorkerResources.thrift_spec)), {
     }, ), # 5
+    None, # 6
+    (7, TType.STRING, 'owner', None, None, ), # 7
   )
 
-  def __init__(self, master_code_dir=None, node_host=thrift_spec[2][4], 
executor_node_port=thrift_spec[3][4], 
executor_start_time_secs=thrift_spec[4][4], 
worker_resources=thrift_spec[5][4],):
+  def __init__(self, master_code_dir=None, node_host=thrift_spec[2][4], 
executor_node_port=thrift_spec[3][4], 
executor_start_time_secs=thrift_spec[4][4], worker_resources=thrift_spec[5][4], 
owner=None,):
     self.master_code_dir = master_code_dir
     if node_host is self.thrift_spec[2][4]:
       node_host = {
@@ -9146,6 +9149,7 @@ class Assignment:
       worker_resources = {
     }
     self.worker_resources = worker_resources
+    self.owner = owner
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
@@ -9218,6 +9222,11 @@ class Assignment:
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
+      elif fid == 7:
+        if ftype == TType.STRING:
+          self.owner = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -9270,6 +9279,10 @@ class Assignment:
         viter601.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
+    if self.owner is not None:
+      oprot.writeFieldBegin('owner', TType.STRING, 7)
+      oprot.writeString(self.owner.encode('utf-8'))
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -9286,6 +9299,7 @@ class Assignment:
     value = (value * 31) ^ hash(self.executor_node_port)
     value = (value * 31) ^ hash(self.executor_start_time_secs)
     value = (value * 31) ^ hash(self.worker_resources)
+    value = (value * 31) ^ hash(self.owner)
     return value
 
   def __repr__(self):
@@ -9391,6 +9405,7 @@ class StormBase:
    - topology_action_options
    - prev_status
    - component_debug
+   - principal
   """
 
   thrift_spec = (
@@ -9404,9 +9419,10 @@ class StormBase:
     (7, TType.STRUCT, 'topology_action_options', (TopologyActionOptions, 
TopologyActionOptions.thrift_spec), None, ), # 7
     (8, TType.I32, 'prev_status', None, None, ), # 8
     (9, TType.MAP, 'component_debug', 
(TType.STRING,None,TType.STRUCT,(DebugOptions, DebugOptions.thrift_spec)), 
None, ), # 9
+    (10, TType.STRING, 'principal', None, None, ), # 10
   )
 
-  def __init__(self, name=None, status=None, num_workers=None, 
component_executors=None, launch_time_secs=None, owner=None, 
topology_action_options=None, prev_status=None, component_debug=None,):
+  def __init__(self, name=None, status=None, num_workers=None, 
component_executors=None, launch_time_secs=None, owner=None, 
topology_action_options=None, prev_status=None, component_debug=None, 
principal=None,):
     self.name = name
     self.status = status
     self.num_workers = num_workers
@@ -9416,6 +9432,7 @@ class StormBase:
     self.topology_action_options = topology_action_options
     self.prev_status = prev_status
     self.component_debug = component_debug
+    self.principal = principal
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
@@ -9485,6 +9502,11 @@ class StormBase:
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
+      elif fid == 10:
+        if ftype == TType.STRING:
+          self.principal = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -9539,6 +9561,10 @@ class StormBase:
         viter619.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
+    if self.principal is not None:
+      oprot.writeFieldBegin('principal', TType.STRING, 10)
+      oprot.writeString(self.principal.encode('utf-8'))
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -9563,6 +9589,7 @@ class StormBase:
     value = (value * 31) ^ hash(self.topology_action_options)
     value = (value * 31) ^ hash(self.prev_status)
     value = (value * 31) ^ hash(self.component_debug)
+    value = (value * 31) ^ hash(self.principal)
     return value
 
   def __repr__(self):
@@ -9866,6 +9893,7 @@ class LocalAssignment:
    - topology_id
    - executors
    - resources
+   - owner
   """
 
   thrift_spec = (
@@ -9873,12 +9901,15 @@ class LocalAssignment:
     (1, TType.STRING, 'topology_id', None, None, ), # 1
     (2, TType.LIST, 'executors', (TType.STRUCT,(ExecutorInfo, 
ExecutorInfo.thrift_spec)), None, ), # 2
     (3, TType.STRUCT, 'resources', (WorkerResources, 
WorkerResources.thrift_spec), None, ), # 3
+    None, # 4
+    (5, TType.STRING, 'owner', None, None, ), # 5
   )
 
-  def __init__(self, topology_id=None, executors=None, resources=None,):
+  def __init__(self, topology_id=None, executors=None, resources=None, 
owner=None,):
     self.topology_id = topology_id
     self.executors = executors
     self.resources = resources
+    self.owner = owner
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
@@ -9911,6 +9942,11 @@ class LocalAssignment:
           self.resources.read(iprot)
         else:
           iprot.skip(ftype)
+      elif fid == 5:
+        if ftype == TType.STRING:
+          self.owner = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -9936,6 +9972,10 @@ class LocalAssignment:
       oprot.writeFieldBegin('resources', TType.STRUCT, 3)
       self.resources.write(oprot)
       oprot.writeFieldEnd()
+    if self.owner is not None:
+      oprot.writeFieldBegin('owner', TType.STRING, 5)
+      oprot.writeString(self.owner.encode('utf-8'))
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -9952,6 +9992,7 @@ class LocalAssignment:
     value = (value * 31) ^ hash(self.topology_id)
     value = (value * 31) ^ hash(self.executors)
     value = (value * 31) ^ hash(self.resources)
+    value = (value * 31) ^ hash(self.owner)
     return value
 
   def __repr__(self):

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index 146591f..6c395cc 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -473,6 +473,8 @@ struct Assignment {
     3: optional map<list<i64>, NodeInfo> executor_node_port = {};
     4: optional map<list<i64>, i64> executor_start_time_secs = {};
     5: optional map<NodeInfo, WorkerResources> worker_resources = {};
+    //6: from other pull request
+    7: optional string owner;
 }
 
 enum TopologyStatus {
@@ -497,6 +499,7 @@ struct StormBase {
     7: optional TopologyActionOptions topology_action_options;
     8: optional TopologyStatus prev_status;//currently only used during 
rebalance action.
     9: optional map<string, DebugOptions> component_debug; // 
topology/component level debug option.
+   10: optional string principal;
 }
 
 struct ClusterWorkerHeartbeat {
@@ -519,6 +522,8 @@ struct LocalAssignment {
   1: required string topology_id;
   2: required list<ExecutorInfo> executors;
   3: optional WorkerResources resources;
+  //4: other pull request
+  5: optional string owner;
 }
 
 struct LSSupervisorId {

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj 
b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index e082768..55b686e 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -171,14 +171,14 @@
 (deftest test-storm-cluster-state-basics
   (with-inprocess-zookeeper zk-port
     (let [state (mk-storm-state zk-port)
-          assignment1 (Assignment. "/aaa" {} {[1] ["1" 1001 1]} {} {})
-          assignment2 (Assignment. "/aaa" {} {[2] ["2" 2002]} {} {})
+          assignment1 (Assignment. "/aaa" {} {[1] ["1" 1001 1]} {} {} "")
+          assignment2 (Assignment. "/aaa" {} {[2] ["2" 2002]} {} {} "")
           nimbusInfo1 (NimbusInfo. "nimbus1" 6667 false)
           nimbusInfo2 (NimbusInfo. "nimbus2" 6667 false)
           nimbusSummary1 (NimbusSummary. "nimbus1" 6667 (current-time-secs) 
false "v1")
           nimbusSummary2 (NimbusSummary. "nimbus2" 6667 (current-time-secs) 
false "v2")
-          base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "" nil nil {})
-          base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "" nil nil 
{})]
+          base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "" nil nil {} 
"")
+          base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "" nil nil {} 
"")]
       (is (= [] (.assignments state nil)))
       (.set-assignment! state "storm1" assignment1)
       (is (= assignment1 (.assignment-info state "storm1" nil)))

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
----------------------------------------------------------------------
diff --git 
a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj 
b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
index 0c3ee9b..144f18c 100644
--- 
a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
+++ 
b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
@@ -138,7 +138,7 @@
                    2
                    {executor1 "spout1"
                     executor2 "bolt1"
-                    executor3 "bolt2"})]
+                    executor3 "bolt2"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -175,7 +175,7 @@
                    5
                    {executor1 "spout1"
                     executor2 "bolt1"
-                    executor3 "bolt2"})]
+                    executor3 "bolt2"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -216,7 +216,7 @@
                     executor2 "bolt1"
                     executor3 "bolt1"
                     executor4 "bolt1"
-                    executor5 "bolt2"})]
+                    executor5 "bolt2"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -255,7 +255,7 @@
                     executor2 "bolt1"
                     executor3 "bolt2"
                     executor4 "bolt3"
-                    executor5 "bolt4"})]
+                    executor5 "bolt4"} "user")]
     (let [node-map (Node/getAllNodesFrom single-cluster)
          free-pool (FreePool. )
          default-pool (DefaultPool. )]
@@ -302,7 +302,7 @@
                    2
                    {executor1 "spout1"
                     executor2 "bolt1"
-                    executor3 "bolt2"})
+                    executor3 "bolt2"} "user")
        topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"}
                     (StormTopology.)
@@ -310,7 +310,7 @@
                     {executor11 "spout11"
                      executor12 "bolt12"
                      executor13 "bolt13"
-                     executor14 "bolt14"})]
+                     executor14 "bolt14"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -383,7 +383,7 @@
                    {executor1 "spout1"
                     executor2 "bolt1"
                     executor3 "bolt2"
-                    executor4 "bolt4"})]
+                    executor4 "bolt4"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -427,7 +427,7 @@
                    {executor1 "spout1"
                     executor2 "bolt1"
                     executor3 "bolt2"
-                    executor4 "bolt4"})]
+                    executor4 "bolt4"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -474,7 +474,7 @@
                    {executor1 "spout1"
                     executor2 "bolt1"
                     executor3 "bolt2"
-                    executor4 "bolt4"})
+                    executor4 "bolt4"} "user")
        topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"
                      TOPOLOGY-ISOLATED-MACHINES 2}
@@ -483,7 +483,7 @@
                     {executor11 "spout11"
                      executor12 "bolt12"
                      executor13 "bolt13"
-                     executor14 "bolt14"})]
+                     executor14 "bolt14"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -579,7 +579,7 @@
                    {executor1 "spout1"
                     executor2 "bolt1"
                     executor3 "bolt2"
-                    executor4 "bolt4"})
+                    executor4 "bolt4"} "user")
        topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"
                      TOPOLOGY-ISOLATED-MACHINES 2}
@@ -588,7 +588,7 @@
                     {executor11 "spout11"
                      executor12 "bolt12"
                      executor13 "bolt13"
-                     executor14 "bolt14"})]
+                     executor14 "bolt14"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -629,34 +629,31 @@
 (deftest test-multitenant-scheduler
   (let [supers (gen-supervisors 10)
        topology1 (TopologyDetails. "topology1"
-                   {TOPOLOGY-NAME "topology-name-1"
-                    TOPOLOGY-SUBMITTER-USER "userC"}
+                   {TOPOLOGY-NAME "topology-name-1"}
                    (StormTopology.)
                    4
                    (mk-ed-map [["spout1" 0 5]
                                ["bolt1" 5 10]
                                ["bolt2" 10 15]
-                               ["bolt3" 15 20]]))
+                               ["bolt3" 15 20]]) "userC")
        topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"
-                     TOPOLOGY-ISOLATED-MACHINES 2
-                     TOPOLOGY-SUBMITTER-USER "userA"}
+                     TOPOLOGY-ISOLATED-MACHINES 2}
                     (StormTopology.)
                     4
                     (mk-ed-map [["spout11" 0 5]
                                 ["bolt12" 5 6]
                                 ["bolt13" 6 7]
-                                ["bolt14" 7 10]]))
+                                ["bolt14" 7 10]]) "userA")
        topology3 (TopologyDetails. "topology3"
                     {TOPOLOGY-NAME "topology-name-3"
-                     TOPOLOGY-ISOLATED-MACHINES 5
-                     TOPOLOGY-SUBMITTER-USER "userB"}
+                     TOPOLOGY-ISOLATED-MACHINES 5}
                     (StormTopology.)
                     10
                     (mk-ed-map [["spout21" 0 10]
                                 ["bolt22" 10 20]
                                 ["bolt23" 20 30]
-                                ["bolt24" 30 40]]))
+                                ["bolt24" 30 40]]) "userB")
        cluster (Cluster. (nimbus/standalone-nimbus) supers {} nil)
        node-map (Node/getAllNodesFrom cluster)
        topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
@@ -682,14 +679,13 @@
 (deftest test-force-free-slot-in-bad-state
   (let [supers (gen-supervisors 1)
         topology1 (TopologyDetails. "topology1"
-                                    {TOPOLOGY-NAME "topology-name-1"
-                                     TOPOLOGY-SUBMITTER-USER "userC"}
+                                    {TOPOLOGY-NAME "topology-name-1"}
                                     (StormTopology.)
                                     4
                                     (mk-ed-map [["spout1" 0 5]
                                                 ["bolt1" 5 10]
                                                 ["bolt2" 10 15]
-                                                ["bolt3" 15 20]]))
+                                                ["bolt3" 15 20]]) "userC")
         existing-assignments {
                                "topology1" (SchedulerAssignmentImpl. 
"topology1" {(ExecutorDetails. 0 5) (WorkerSlot. "super0" 1)
                                                                                
   (ExecutorDetails. 5 10) (WorkerSlot. "super0" 20)
@@ -719,25 +715,22 @@
   (testing "Assiging same worker slot to different topologies is bad state"
     (let [supers (gen-supervisors 5)
           topology1 (TopologyDetails. "topology1"
-                      {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userC"}
+                      {TOPOLOGY-NAME "topology-name-1"}
                       (StormTopology.)
                       1
-                      (mk-ed-map [["spout1" 0 1]]))
+                      (mk-ed-map [["spout1" 0 1]]) "userC")
           topology2 (TopologyDetails. "topology2"
                       {TOPOLOGY-NAME "topology-name-2"
-                       TOPOLOGY-ISOLATED-MACHINES 2
-                       TOPOLOGY-SUBMITTER-USER "userA"}
+                       TOPOLOGY-ISOLATED-MACHINES 2}
                       (StormTopology.)
                       2
-                      (mk-ed-map [["spout11" 1 2]["bolt11" 3 4]]))
+                      (mk-ed-map [["spout11" 1 2]["bolt11" 3 4]]) "userA")
           topology3 (TopologyDetails. "topology3"
                       {TOPOLOGY-NAME "topology-name-3"
-                       TOPOLOGY-ISOLATED-MACHINES 1
-                       TOPOLOGY-SUBMITTER-USER "userB"}
+                       TOPOLOGY-ISOLATED-MACHINES 1}
                       (StormTopology.)
                       1
-                      (mk-ed-map [["spout21" 2 3]]))
+                      (mk-ed-map [["spout21" 2 3]]) "userB")
           worker-slot-with-multiple-assignments (WorkerSlot. "super1" 1)
           existing-assignments {"topology2" (SchedulerAssignmentImpl. 
"topology2" {(ExecutorDetails. 1 1) worker-slot-with-multiple-assignments})
                                 "topology3" (SchedulerAssignmentImpl. 
"topology3" {(ExecutorDetails. 2 2) worker-slot-with-multiple-assignments})}
@@ -761,11 +754,10 @@
     (let [supers (gen-supervisors 1)
           port-not-reported-by-supervisor 6
           topology1 (TopologyDetails. "topology1"
-                      {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userA"}
+                      {TOPOLOGY-NAME "topology-name-1"}
                       (StormTopology.)
                       1
-                      (mk-ed-map [["spout11" 0 1]]))
+                      (mk-ed-map [["spout11" 0 1]]) "userA")
           existing-assignments {"topology1"
                                 (SchedulerAssignmentImpl. "topology1"
                                   {(ExecutorDetails. 0 0) (WorkerSlot. 
"super0" port-not-reported-by-supervisor)})}
@@ -787,19 +779,17 @@
           dead-supervisor "super1"
           port-not-reported-by-supervisor 6
           topology1 (TopologyDetails. "topology1"
-                      {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userA"}
+                      {TOPOLOGY-NAME "topology-name-1"}
                       (StormTopology.)
                       2
                       (mk-ed-map [["spout11" 0 1]
-                                  ["bolt12" 1 2]]))
+                                  ["bolt12" 1 2]]) "userA")
           topology2 (TopologyDetails. "topology2"
-                      {TOPOLOGY-NAME "topology-name-2"
-                       TOPOLOGY-SUBMITTER-USER "userA"}
+                      {TOPOLOGY-NAME "topology-name-2"}
                       (StormTopology.)
                       2
                       (mk-ed-map [["spout21" 4 5]
-                                  ["bolt22" 5 6]]))
+                                  ["bolt22" 5 6]]) "userA")
           worker-slot-with-multiple-assignments (WorkerSlot. dead-supervisor 1)
           existing-assignments {"topology1"
                                 (SchedulerAssignmentImpl. "topology1"
@@ -837,11 +827,10 @@
         supers {"super1" super1 "super2" super2}
         topology1 (TopologyDetails. "topology1"
                     {TOPOLOGY-NAME "topology-name-1"
-                     TOPOLOGY-SUBMITTER-USER "userA"
                      TOPOLOGY-ISOLATED-MACHINES 1}
                     (StormTopology.)
                     7
-                    (mk-ed-map [["spout21" 0 7]]))
+                    (mk-ed-map [["spout21" 0 7]]) "userA")
         existing-assignments {"topology1"
                               (SchedulerAssignmentImpl. "topology1"
                                 {(ExecutorDetails. 0 0) (WorkerSlot. "super1" 
1)

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
----------------------------------------------------------------------
diff --git 
a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
 
b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
index ec51914..6210c33 100644
--- 
a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
+++ 
b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
@@ -101,8 +101,8 @@
         cluster (Cluster. (nimbus/standalone-nimbus) supers {} {})
         topologies (Topologies. (to-top-map []))
         node-map (RAS_Nodes/getAllNodesFrom cluster topologies)
-        topology1 (TopologyDetails. "topology1" {} nil 0)
-        topology2 (TopologyDetails. "topology2" {} nil 0)]
+        topology1 (TopologyDetails. "topology1" {} nil 0 "user")
+        topology2 (TopologyDetails. "topology2" {} nil 0 "user")]
     (is (= 5 (.size node-map)))
     (let [node (.get node-map "id0")]
       (is (= "id0" (.getId node)))
@@ -152,7 +152,6 @@
         storm-topology (.createTopology builder)
         topology1 (TopologyDetails. "topology1"
                     {TOPOLOGY-NAME "topology-name-1"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -162,7 +161,7 @@
                     storm-topology
                     1
                     (mk-ed-map [["wordSpout" 0 1]
-                                ["wordCountBolt" 1 2]]))
+                                ["wordCountBolt" 1 2]]) "userC")
         cluster (Cluster. (nimbus/standalone-nimbus) supers {}
                   {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                    
"org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
@@ -195,7 +194,6 @@
         storm-topology1 (.createTopology builder1)
         topology1 (TopologyDetails. "topology1"
                     {TOPOLOGY-NAME "topology-name-1"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -210,14 +208,13 @@
                                 ["wordCountBolt2" 3 4]
                                 ["wordCountBolt3" 4 5]
                                 ["wordCountBolt4" 5 6]
-                                ["wordCountBolt5" 6 7]]))
+                                ["wordCountBolt5" 6 7]]) "userC")
         builder2 (TopologyBuilder.)  ;; a topology with two unconnected 
partitions
         _ (.setSpout builder2 "wordSpoutX" (TestWordSpout.) 1)
         _ (.setSpout builder2 "wordSpoutY" (TestWordSpout.) 1)
         storm-topology2 (.createTopology builder1)
         topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -227,7 +224,7 @@
                     storm-topology2
                     1
                     (mk-ed-map [["wordSpoutX" 0 1]
-                                ["wordSpoutY" 1 2]]))
+                                ["wordSpoutY" 1 2]]) "userC")
         supers (gen-supervisors 2 4)
         cluster (Cluster. (nimbus/standalone-nimbus) supers {}
                   {STORM-NETWORK-TOPOGRAPHY-PLUGIN
@@ -264,7 +261,6 @@
         storm-topology (.createTopology builder)
         topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -274,7 +270,7 @@
                     storm-topology
                     2
                     (mk-ed-map [["wordSpout" 0 1]
-                                ["wordCountBolt" 1 2]]))
+                                ["wordCountBolt" 1 2]]) "userC")
         cluster (Cluster. (nimbus/standalone-nimbus) supers {}
                   {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                    "org.apache.storm.testing.AlternateRackDNSToSwitchMapping"})
@@ -305,7 +301,6 @@
         storm-topology (.createTopology builder)
         topology1 (TopologyDetails. "topology1"
                     {TOPOLOGY-NAME "topology-name-1"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -315,7 +310,7 @@
                     storm-topology
                     2 ;; need two workers, each on one node
                     (mk-ed-map [["wordSpout" 0 2]
-                                ["wordCountBolt" 2 3]]))
+                                ["wordCountBolt" 2 3]]) "userC")
         cluster (Cluster. (nimbus/standalone-nimbus) supers {}
                   {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                    
"org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
@@ -365,7 +360,6 @@
          storm-topology1 (.createTopology builder1)
          topology1 (TopologyDetails. "topology1"
                      {TOPOLOGY-NAME "topology-name-1"
-                      TOPOLOGY-SUBMITTER-USER "userC"
                       TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                       TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                       TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -374,13 +368,12 @@
                       TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                      storm-topology1
                      3 ;; three workers to hold three executors
-                     (mk-ed-map [["spout1" 0 3]]))
+                     (mk-ed-map [["spout1" 0 3]]) "userC")
          builder2 (TopologyBuilder.)
          _ (.setSpout builder2 "spout2" (TestWordSpout.) 2)
          storm-topology2 (.createTopology builder2)
          topology2 (TopologyDetails. "topology2"
                      {TOPOLOGY-NAME "topology-name-2"
-                      TOPOLOGY-SUBMITTER-USER "userC"
                       TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 1280.0 ;; 
large enough thus two eds can not be fully assigned to one node
                       TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                       TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -389,7 +382,7 @@
                       TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                      storm-topology2
                      2  ;; two workers, each holds one executor and resides on 
one node
-                     (mk-ed-map [["spout2" 0 2]]))
+                     (mk-ed-map [["spout2" 0 2]]) "userC")
         scheduler (ResourceAwareScheduler.)]
 
     (testing "When a worker fails, RAS does not alter existing assignments on 
healthy workers"
@@ -512,7 +505,6 @@
         storm-topology1 (.createTopology builder1)
         topology1 (TopologyDetails. "topology1"
                     {TOPOLOGY-NAME "topology-name-1"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -521,7 +513,7 @@
                      TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology1
                     1
-                    (mk-ed-map [["spout1" 0 1]]))
+                    (mk-ed-map [["spout1" 0 1]]) "userC")
         builder2 (TopologyBuilder.)  ;; topo2 has 4 large tasks
         _ (doto (.setSpout builder2 "spout2" (TestWordSpout.) 4)
             (.setMemoryLoad 500.0 12.0)
@@ -529,7 +521,6 @@
         storm-topology2 (.createTopology builder2)
         topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -538,7 +529,7 @@
                      TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology2
                     2
-                    (mk-ed-map [["spout2" 0 4]]))
+                    (mk-ed-map [["spout2" 0 4]]) "userC")
         builder3 (TopologyBuilder.) ;; topo3 has 4 medium tasks, launching 
topo 1-3 together requires the same mem as the cluster's mem capacity (5G)
         _ (doto (.setSpout builder3 "spout3" (TestWordSpout.) 4)
             (.setMemoryLoad 200.0 56.0)
@@ -546,7 +537,6 @@
         storm-topology3 (.createTopology builder3)
         topology3 (TopologyDetails. "topology3"
                     {TOPOLOGY-NAME "topology-name-3"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -555,7 +545,7 @@
                      TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology3
                     2
-                    (mk-ed-map [["spout3" 0 4]]))
+                    (mk-ed-map [["spout3" 0 4]]) "userC")
         builder4 (TopologyBuilder.) ;; topo4 has 12 small tasks, each's mem 
req does not exactly divide a node's mem capacity
         _ (doto (.setSpout builder4 "spout4" (TestWordSpout.) 2)
             (.setMemoryLoad 100.0 0.0)
@@ -563,7 +553,6 @@
         storm-topology4 (.createTopology builder4)
         topology4 (TopologyDetails. "topology4"
                     {TOPOLOGY-NAME "topology-name-4"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -572,7 +561,7 @@
                      TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology4
                     2
-                    (mk-ed-map [["spout4" 0 12]]))
+                    (mk-ed-map [["spout4" 0 12]]) "userC")
         builder5 (TopologyBuilder.) ;; topo5 has 40 small tasks, it should be 
able to exactly use up both the cpu and mem in teh cluster
         _ (doto (.setSpout builder5 "spout5" (TestWordSpout.) 40)
             (.setMemoryLoad 100.0 28.0)
@@ -580,7 +569,6 @@
         storm-topology5 (.createTopology builder5)
         topology5 (TopologyDetails. "topology5"
                     {TOPOLOGY-NAME "topology-name-5"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -589,7 +577,7 @@
                      TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology5
                     2
-                    (mk-ed-map [["spout5" 0 40]]))
+                    (mk-ed-map [["spout5" 0 40]]) "userC")
         epsilon 0.000001
         topologies (Topologies. (to-top-map [topology1 topology2]))]
 
@@ -661,7 +649,6 @@
           storm-topology1 (.createTopology builder1)
           topology1 (TopologyDetails. "topology1"
                       {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userA"
                        TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                        TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                        TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -670,7 +657,7 @@
                        TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                       storm-topology1
                       1
-                      (mk-ed-map [["spout1" 0 4]]))
+                      (mk-ed-map [["spout1" 0 4]]) "userA")
           topologies (Topologies. (to-top-map [topology1]))]
       (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY 
DEFAULT_EVICTION_STRATEGY
                            RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY 
DEFAULT_PRIORITY_STRATEGY})
@@ -687,7 +674,6 @@
           storm-topology1 (.createTopology builder1)
           topology1 (TopologyDetails. "topology1"
                       {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userC"
                        TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                        TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                        TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -696,7 +682,7 @@
                        TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                       storm-topology1
                       1
-                      (mk-ed-map [["spout1" 0 5]]))
+                      (mk-ed-map [["spout1" 0 5]]) "userC")
           topologies (Topologies. (to-top-map [topology1]))]
       (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY 
DEFAULT_EVICTION_STRATEGY
                            RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY 
DEFAULT_PRIORITY_STRATEGY})
@@ -715,7 +701,6 @@
           _ (.setSpout builder1 "spout1" (TestWordSpout.) 2)
           storm-topology1 (.createTopology builder1)
           conf  {TOPOLOGY-NAME "topology-name-1"
-                 TOPOLOGY-SUBMITTER-USER "userC"
                  TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 129.0
                  TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                  TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -726,7 +711,7 @@
                       conf
                       storm-topology1
                       1
-                      (mk-ed-map [["spout1" 0 5]]))
+                      (mk-ed-map [["spout1" 0 5]]) "userC")
           topologies (Topologies. (to-top-map [topology1]))]
       (is (thrown? IllegalArgumentException
             (StormSubmitter/submitTopologyWithProgressBar "test" conf 
storm-topology1)))

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/clj/org/apache/storm/scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/scheduler_test.clj 
b/storm-core/test/clj/org/apache/storm/scheduler_test.clj
index fc6e8e3..a6c5268 100644
--- a/storm-core/test/clj/org/apache/storm/scheduler_test.clj
+++ b/storm-core/test/clj/org/apache/storm/scheduler_test.clj
@@ -69,13 +69,13 @@
         executor2 (ExecutorDetails. (int 6) (int 10))
         topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME 
"topology-name-1"} (StormTopology.) 1
                                    {executor1 "spout1"
-                                    executor2 "bolt1"})
+                                    executor2 "bolt1"} "user")
         ;; test topology.selectExecutorToComponent
         executor->comp (.selectExecutorToComponent topology1 (list executor1))
         _ (is (= (clojurify-executor->comp {executor1 "spout1"})
                  (clojurify-executor->comp executor->comp)))
         ;; test topologies.getById
-        topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME 
"topology-name-2"} (StormTopology.) 1 {})
+        topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME 
"topology-name-2"} (StormTopology.) 1 {} "user")
         topologies (Topologies. {"topology1" topology1 "topology2" topology2})
         _ (is (= "topology1" (->> "topology1"
                                   (.getById topologies)
@@ -104,19 +104,19 @@
                                     2
                                     {executor1 "spout1"
                                      executor2 "bolt1"
-                                     executor3 "bolt2"})
+                                     executor3 "bolt2"} "user")
         ;; topology2 is fully scheduled
         topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME 
"topology-name-2"}
                                     (StormTopology.)
                                     2
                                     {executor11 "spout11"
-                                     executor12 "bolt12"})
+                                     executor12 "bolt12"} "user")
         ;; topology3 needs scheduling, since the assignment is squeezed
         topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME 
"topology-name-3"}
                                     (StormTopology.)
                                     2
                                     {executor21 "spout21"
-                                     executor22 "bolt22"})
+                                     executor22 "bolt22"} "user")
         topologies (Topologies. {"topology1" topology1 "topology2" topology2 
"topology3" topology3})
         executor->slot1 {executor1 (WorkerSlot. "supervisor1" (int 1))
                          executor2 (WorkerSlot. "supervisor2" (int 2))}

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java 
b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
index 0958d4d..07427bd 100644
--- a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
+++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
@@ -160,8 +160,7 @@ public class ContainerTest {
         
         final List<String> topoGroups = Arrays.asList("t-group-a", 
"t-group-b");
         final List<String> logGroups = Arrays.asList("l-group-a", "l-group-b");
-        
-        topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
+
         topoConf.put(Config.LOGS_GROUPS, logGroups);
         topoConf.put(Config.TOPOLOGY_GROUPS, topoGroups);
         topoConf.put(Config.LOGS_USERS, logUsers);
@@ -181,6 +180,7 @@ public class ContainerTest {
         
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
+        la.set_owner(user);
         MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, 
                 "SUPERVISOR", 8080, la, workerId, topoConf, ops);
         
@@ -233,7 +233,6 @@ public class ContainerTest {
         final File workerPidsRoot = new File(workerRoot, "pids");
         
         final Map<String, Object> topoConf = new HashMap<>();
-        topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
         
         final Map<String, Object> superConf = new HashMap<>();
         superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
@@ -248,6 +247,7 @@ public class ContainerTest {
         when(ops.getWriter(logMetadataFile)).thenReturn(yamlDump);
         
         LocalAssignment la = new LocalAssignment();
+        la.set_owner(user);
         la.set_topology_id(topoId);
         MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, 
                 "SUPERVISOR", port, la, workerId, topoConf, ops);

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java 
b/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java
index ed2b4ff..fe8392a 100644
--- a/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -50,8 +50,10 @@ public class AsyncLocalizerTest {
     @Test
     public void testRequestDownloadBaseTopologyBlobs() throws Exception {
         final String topoId = "TOPO";
+        final String user = "user";
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
+        la.set_owner(user);
         ExecutorInfo ei = new ExecutorInfo();
         ei.set_task_start(1);
         ei.set_task_end(1);
@@ -97,7 +99,7 @@ public class AsyncLocalizerTest {
             //Extracting the dir from the jar
             verify(mockedU).extractDirFromJarImpl(endsWith("stormjar.jar"), 
eq("resources"), any(File.class));
             verify(ops).moveDirectoryPreferAtomic(any(File.class), 
eq(fStormRoot));
-            verify(ops).setupStormCodeDir(topoConf, fStormRoot);
+            verify(ops).setupStormCodeDir(user, fStormRoot);
             
             verify(ops, never()).deleteIfExists(any(File.class));
         } finally {
@@ -110,15 +112,16 @@ public class AsyncLocalizerTest {
     @Test
     public void testRequestDownloadTopologyBlobs() throws Exception {
         final String topoId = "TOPO-12345";
+        final String user = "user";
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
+        la.set_owner(user);
         ExecutorInfo ei = new ExecutorInfo();
         ei.set_task_start(1);
         ei.set_task_end(1);
         la.add_to_executors(ei);
         final String topoName = "TOPO";
         final int port = 8080;
-        final String user = "user";
         final String simpleLocalName = "simple.txt";
         final String simpleKey = "simple";
         
@@ -150,7 +153,6 @@ public class AsyncLocalizerTest {
         
         Map<String, Object> topoConf = new HashMap<>(conf);
         topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap);
-        topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
         topoConf.put(Config.TOPOLOGY_NAME, topoName);
         
         List<LocalizedResource> localizedList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 44f2136..cecc6d1 100644
--- 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -79,7 +79,6 @@ public class TestResourceAwareScheduler {
         
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
0.0);
         defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 
8192.0);
         defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
-        defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
     }
 
     @Test
@@ -103,7 +102,8 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals(0, node.totalSlotsUsed());
         Assert.assertEquals(4, node.totalSlots());
 
-        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 
0, 2, 0, 0, 0);
+        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 
0, 2, 0, 0, 0,
+            "user");
 
         List<ExecutorDetails> executors11 = new ArrayList<>();
         executors11.add(new ExecutorDetails(1, 1));
@@ -123,7 +123,8 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals(2, node.totalSlotsUsed());
         Assert.assertEquals(4, node.totalSlots());
 
-        TopologyDetails topology2 = 
TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 
0, 2, 0, 0, 0);
+        TopologyDetails topology2 = 
TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 
0, 2, 0, 0, 0,
+            "user");
 
         List<ExecutorDetails> executors21 = new ArrayList<>();
         executors21.add(new ExecutorDetails(1, 1));
@@ -165,7 +166,7 @@ public class TestResourceAwareScheduler {
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
 
-        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 
0, 0);
+        TopologyDetails topology1 = 
TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 
0, 0, "user");
         Map<String, TopologyDetails> topoMap = new HashMap<>();
         topoMap.put(topology1.getId(), topology1);
         Topologies topologies = new Topologies(topoMap);
@@ -208,14 +209,14 @@ public class TestResourceAwareScheduler {
         Config config = new Config();
         config.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 0, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 0, executorMap1, 0, "user");
 
         TopologyBuilder builder2 = new TopologyBuilder(); // a topology with 
two unconnected partitions
         builder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
         builder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
         StormTopology stormTopology2 = builder2.createTopology();
         Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2);
-        TopologyDetails topology2 = new TopologyDetails("topology2", config, 
stormTopology2, 0, executorMap2, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config, 
stormTopology2, 0, executorMap2, 0, "user");
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
@@ -271,7 +272,7 @@ public class TestResourceAwareScheduler {
         Config config = new Config();
         config.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 0, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 0, executorMap1, 0, "user");
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
@@ -319,7 +320,7 @@ public class TestResourceAwareScheduler {
         Config config = new Config();
         config.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 2, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config, 
stormTopology1, 2, executorMap1, 0, "user");
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
@@ -409,7 +410,7 @@ public class TestResourceAwareScheduler {
         Config config1 = new Config();
         config1.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 3, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 3, executorMap1, 0, "user");
 
         TopologyBuilder builder2 = new TopologyBuilder();
         builder2.setSpout("wordSpout2", new TestWordSpout(), 2);
@@ -419,7 +420,7 @@ public class TestResourceAwareScheduler {
         // memory requirement is large enough so that two executors can not be 
fully assigned to one node
         config2.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
1280.0);
         Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2);
-        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 2, executorMap2, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 2, executorMap2, 0, "user");
 
         // Test1: When a worker fails, RAS does not alter existing assignments 
on healthy workers
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config1);
@@ -567,7 +568,7 @@ public class TestResourceAwareScheduler {
         Config config1 = new Config();
         config1.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 1, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 1, executorMap1, 0, "user");
 
         // topo2 has 4 large tasks
         TopologyBuilder builder2 = new TopologyBuilder();
@@ -576,7 +577,7 @@ public class TestResourceAwareScheduler {
         Config config2 = new Config();
         config2.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2);
-        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 1, executorMap2, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 1, executorMap2, 0, "user");
 
         // topo3 has 4 large tasks
         TopologyBuilder builder3 = new TopologyBuilder();
@@ -585,7 +586,7 @@ public class TestResourceAwareScheduler {
         Config config3 = new Config();
         config3.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap3 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology3);
-        TopologyDetails topology3 = new TopologyDetails("topology3", config2, 
stormTopology3, 1, executorMap3, 0);
+        TopologyDetails topology3 = new TopologyDetails("topology3", config2, 
stormTopology3, 1, executorMap3, 0, "user");
 
         // topo4 has 12 small tasks, whose mem usage does not exactly divide a 
node's mem capacity
         TopologyBuilder builder4 = new TopologyBuilder();
@@ -594,7 +595,7 @@ public class TestResourceAwareScheduler {
         Config config4 = new Config();
         config4.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap4 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology4);
-        TopologyDetails topology4 = new TopologyDetails("topology4", config4, 
stormTopology4, 1, executorMap4, 0);
+        TopologyDetails topology4 = new TopologyDetails("topology4", config4, 
stormTopology4, 1, executorMap4, 0, "user");
 
         // topo5 has 40 small tasks, it should be able to exactly use up both 
the cpu and mem in the cluster
         TopologyBuilder builder5 = new TopologyBuilder();
@@ -603,7 +604,7 @@ public class TestResourceAwareScheduler {
         Config config5 = new Config();
         config5.putAll(defaultTopologyConf);
         Map<ExecutorDetails, String> executorMap5 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology5);
-        TopologyDetails topology5 = new TopologyDetails("topology5", config5, 
stormTopology5, 1, executorMap5, 0);
+        TopologyDetails topology5 = new TopologyDetails("topology5", config5, 
stormTopology5, 1, executorMap5, 0, "user");
 
         // Test1: Launch topo 1-3 together, it should be able to use up either 
mem or cpu resource due to exact division
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config1);
@@ -691,7 +692,7 @@ public class TestResourceAwareScheduler {
         config1.putAll(defaultTopologyConf);
         config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
         Map<ExecutorDetails, String> executorMap1 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1);
-        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 1, executorMap1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config1, 
stormTopology1, 1, executorMap1, 0, "user");
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config1);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
         Map<String, TopologyDetails> topoMap = new HashMap<>();
@@ -713,7 +714,7 @@ public class TestResourceAwareScheduler {
         config2.putAll(defaultTopologyConf);
         config2.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
         Map<ExecutorDetails, String> executorMap2 = 
TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2);
-        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 1, executorMap2, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config2, 
stormTopology2, 1, executorMap2, 0, "user");
         cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config2);
         topoMap = new HashMap<>();
         topoMap.put(topology2.getId(), topology2);
@@ -769,16 +770,17 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
 
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 20);
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "bobby");
+        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "bobby");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -828,8 +830,6 @@ public class TestResourceAwareScheduler {
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
128.0);
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 0.0);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, TOPOLOGY_SUBMITTER);
-
         Map<String, Map<String, Number>> resourceUserPool = new 
HashMap<String, Map<String, Number>>();
         resourceUserPool.put("jerry", new HashMap<String, Number>());
         resourceUserPool.get("jerry").put("cpu", 1000);
@@ -845,11 +845,16 @@ public class TestResourceAwareScheduler {
 
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
 
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, 20);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8, 30);
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, 
currentTime - 16, 30);
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, 
currentTime - 16, 20);
-        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, 
currentTime - 24, 30);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2,
+            20, TOPOLOGY_SUBMITTER);
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8,
+            30, TOPOLOGY_SUBMITTER);
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, 
currentTime - 16,
+            30, TOPOLOGY_SUBMITTER);
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, 
currentTime - 16,
+            20, TOPOLOGY_SUBMITTER);
+        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, 
currentTime - 24,
+            30, TOPOLOGY_SUBMITTER);
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -894,7 +899,8 @@ public class TestResourceAwareScheduler {
         LOG.info("{} - {}", topo.getName(), queue);
         Assert.assertEquals("check order", topo.getName(), "topo-2");
 
-        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, 
currentTime - 30, 10);
+        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, 
currentTime - 30,
+            10, TOPOLOGY_SUBMITTER);
         topoMap.put(topo6.getId(), topo6);
 
         topologies = new Topologies(topoMap);
@@ -954,29 +960,38 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, 20);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8, 29);
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, 
currentTime - 16, 29);
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, 
currentTime - 16, 20);
-        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, 
currentTime - 24, 29);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-
-        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, 
currentTime - 2, 20);
-        TopologyDetails topo7 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, 
currentTime - 8, 29);
-        TopologyDetails topo8 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, 
currentTime - 16, 29);
-        TopologyDetails topo9 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, 
currentTime - 16, 20);
-        TopologyDetails topo10 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, 
currentTime - 24, 29);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo11 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, 
currentTime - 2, 20);
-        TopologyDetails topo12 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, 
currentTime - 8, 29);
-        TopologyDetails topo13 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, 
currentTime - 16, 29);
-        TopologyDetails topo14 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, 
currentTime - 16, 20);
-        TopologyDetails topo15 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, 
currentTime - 24, 29);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8, 29,
+            "jerry");
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, 
currentTime - 16,
+            29, "jerry");
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, 
currentTime - 16,
+            20, "jerry");
+        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, 
currentTime - 24,
+            29, "jerry");
+
+        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, 
currentTime - 2, 20,
+            "bobby");
+        TopologyDetails topo7 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, 
currentTime - 8, 29,
+            "bobby");
+        TopologyDetails topo8 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, 
currentTime - 16,
+            29, "bobby");
+        TopologyDetails topo9 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, 
currentTime - 16,
+            20, "bobby");
+        TopologyDetails topo10 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, 
currentTime - 24,
+            29, "bobby");
+
+        TopologyDetails topo11 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, 
currentTime - 2,
+            20, "derek");
+        TopologyDetails topo12 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, 
currentTime - 8,
+            29, "derek");
+        TopologyDetails topo13 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, 
currentTime - 16,
+            29, "derek");
+        TopologyDetails topo14 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, 
currentTime - 16,
+            20, "derek");
+        TopologyDetails topo15 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, 
currentTime - 24,
+            29, "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1040,10 +1055,10 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, 20);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8, 29);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8, 29,
+            "jerry");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1103,20 +1118,20 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, 
resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 20);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, 
currentTime - 2, 20,
+            "jerry");
 
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 10);
-        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 10);
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "bobby");
+        TopologyDetails topo4 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "bobby");
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 29);
-        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
currentTime - 2, 10);
+        TopologyDetails topo5 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, 
currentTime - 2, 29,
+            "derek");
+        TopologyDetails topo6 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, 
currentTime - 2, 10,
+            "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1228,8 +1243,10 @@ public class TestResourceAwareScheduler {
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, 
currentTime - 2, 29);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 2, 0, 
currentTime - 2, 10);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, 
currentTime - 2, 29,
+            "user");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 2, 0, 
currentTime - 2, 10,
+            "user");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1283,11 +1300,12 @@ public class TestResourceAwareScheduler {
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, 
currentTime - 2, 10);
-        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 2, 0, 2, 0, 
currentTime - 2, 20);
-        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 2, 1, 1, 
currentTime - 2, 20);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, 
currentTime - 2, 10,
+            "jerry");
+        TopologyDetails topo2 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 2, 0, 2, 0, 
currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo3 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 2, 1, 1, 
currentTime - 2, 20,
+            "jerry");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1343,13 +1361,10 @@ public class TestResourceAwareScheduler {
 
         StormTopology stormTopology = builder.createTopology();
         TopologyDetails topo = new TopologyDetails("topo-1", config, 
stormTopology,
-                0,
-                genExecsAndComps(stormTopology), 0);
+                0, genExecsAndComps(stormTopology), 0, "jerry");
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
         Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
         topoMap.put(topo.getId(), topo);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java
index 93e4b75..ce4d707 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java
@@ -42,7 +42,7 @@ public class TestUser {
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
 
-        List<TopologyDetails> topos = 
TestUtilsForResourceAwareScheduler.getListOfTopologies(config);
+        List<TopologyDetails> topos = 
TestUtilsForResourceAwareScheduler.getListOfTopologies(config, "user1");
         User user1 = new User("user1");
 
         for (TopologyDetails topo : topos) {
@@ -64,7 +64,7 @@ public class TestUser {
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
 
-        List<TopologyDetails> topos = 
TestUtilsForResourceAwareScheduler.getListOfTopologies(config);
+        List<TopologyDetails> topos = 
TestUtilsForResourceAwareScheduler.getListOfTopologies(config, "user1");
         User user1 = new User("user1");
 
         for (TopologyDetails topo : topos) {
@@ -95,7 +95,8 @@ public class TestUser {
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 200);
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 200);
 
-        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 1, 2, 1, 
Time.currentTimeSecs() - 24, 9);
+        TopologyDetails topo1 = 
TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 1, 2, 1,
+            Time.currentTimeSecs() - 24, 9, "user1");
 
         User user1 = new User("user1", resourceGuaranteeMap);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index bc69725..f745621 100644
--- 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -63,20 +63,20 @@ public class TestUtilsForResourceAwareScheduler {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class);
 
-    public static List<TopologyDetails> getListOfTopologies(Config config) {
-
-        List<TopologyDetails> topos = new LinkedList<TopologyDetails>();
-
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", 
config, 5, 15, 1, 1, currentTime - 2, 20));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", 
config, 5, 15, 1, 1, currentTime - 8, 30));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", 
config, 5, 15, 1, 1, currentTime - 16, 30));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", 
config, 5, 15, 1, 1, currentTime - 16, 20));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", 
config, 5, 15, 1, 1, currentTime - 24, 30));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", 
config, 5, 15, 1, 1, currentTime - 2, 0));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", 
config, 5, 15, 1, 1, currentTime - 8, 0));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", 
config, 5, 15, 1, 1, currentTime - 16, 15));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", 
config, 5, 15, 1, 1, currentTime - 16, 8));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", 
config, 5, 15, 1, 1, currentTime - 24, 9));
+    public static List<TopologyDetails> getListOfTopologies(Config config, 
String user) {
+
+        List<TopologyDetails> topos = new LinkedList<>();
+
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", 
config, 5, 15, 1, 1, currentTime - 2, 20, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", 
config, 5, 15, 1, 1, currentTime - 8, 30, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", 
config, 5, 15, 1, 1, currentTime - 16, 30, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", 
config, 5, 15, 1, 1, currentTime - 16, 20, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", 
config, 5, 15, 1, 1, currentTime - 24, 30, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", 
config, 5, 15, 1, 1, currentTime - 2, 0, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", 
config, 5, 15, 1, 1, currentTime - 8, 0, user ));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", 
config, 5, 15, 1, 1, currentTime - 16, 15, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", 
config, 5, 15, 1, 1, currentTime - 16, 8, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", 
config, 5, 15, 1, 1, currentTime - 24, 9, user));
         return topos;
     }
 
@@ -140,8 +140,8 @@ public class TestUtilsForResourceAwareScheduler {
         return retMap;
     }
 
-    public static TopologyDetails getTopology(String name, Map config, int 
numSpout, int numBolt,
-                                              int spoutParallelism, int 
boltParallelism, int launchTime, int priority) {
+    public static TopologyDetails getTopology(String name, Map<String, Object> 
config, int numSpout, int numBolt,
+                                              int spoutParallelism, int 
boltParallelism, int launchTime, int priority, String owner) {
 
         Config conf = new Config();
         conf.putAll(config);
@@ -151,7 +151,7 @@ public class TestUtilsForResourceAwareScheduler {
         StormTopology topology = buildTopology(numSpout, numBolt, 
spoutParallelism, boltParallelism);
         TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, 
conf, topology,
                 0,
-                genExecsAndComps(topology), launchTime);
+                genExecsAndComps(topology), launchTime, owner);
         return topo;
     }
 

Reply via email to