User improvements

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9980b68d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9980b68d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9980b68d

Branch: refs/heads/1.1.x-branch
Commit: 9980b68d521dfdf5a83577ac361949293751c618
Parents: d83b0b9
Author: Robert Evans <[email protected]>
Authored: Fri Jul 21 13:50:15 2017 -0500
Committer: Robert Evans <[email protected]>
Committed: Fri Jul 21 13:50:15 2017 -0500

----------------------------------------------------------------------
 .../backends/trident/TestPlanCompiler.java      |   2 +-
 .../apache/storm/hbase/security/AutoHBase.java  |  34 ++--
 .../storm/hdfs/common/security/AutoHDFS.java    |  40 ++--
 .../src/clj/org/apache/storm/converter.clj      |  11 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |   4 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  71 +++++--
 .../storm/cluster/StormClusterStateImpl.java    |   4 +
 .../storm/daemon/supervisor/AdvancedFSOps.java  |  16 +-
 .../storm/daemon/supervisor/Container.java      |   6 +-
 .../daemon/supervisor/ReadClusterState.java     |   3 +
 .../storm/daemon/supervisor/Supervisor.java     |  16 +-
 .../daemon/supervisor/SupervisorUtils.java      |  11 +-
 .../daemon/supervisor/timer/UpdateBlobs.java    |  17 +-
 .../org/apache/storm/generated/Assignment.java  | 114 +++++++++++-
 .../apache/storm/generated/LocalAssignment.java | 114 +++++++++++-
 .../org/apache/storm/generated/StormBase.java   | 114 +++++++++++-
 .../apache/storm/localizer/AsyncLocalizer.java  |  37 ++--
 .../apache/storm/scheduler/TopologyDetails.java |  33 ++--
 .../multitenant/MultitenantScheduler.java       |   2 +-
 .../storm/security/INimbusCredentialPlugin.java |  24 ++-
 .../security/auth/ICredentialsRenewer.java      |  18 +-
 .../storm/security/auth/kerberos/AutoTGT.java   |   6 +-
 storm-core/src/py/storm/ttypes.py               |  47 ++++-
 storm-core/src/storm.thrift                     |   5 +
 .../test/clj/org/apache/storm/cluster_test.clj  |   8 +-
 .../scheduler/multitenant_scheduler_test.clj    |  77 ++++----
 .../scheduler/resource_aware_scheduler_test.clj |  49 ++---
 .../clj/org/apache/storm/scheduler_test.clj     |  10 +-
 .../storm/daemon/supervisor/ContainerTest.java  |   6 +-
 .../storm/localizer/AsyncLocalizerTest.java     |   8 +-
 .../resource/TestResourceAwareScheduler.java    | 183 ++++++++++---------
 .../storm/scheduler/resource/TestUser.java      |   7 +-
 .../TestUtilsForResourceAwareScheduler.java     |  34 ++--
 .../eviction/TestDefaultEvictionStrategy.java   | 118 ++++++------
 .../TestDefaultResourceAwareStrategy.java       |   8 +-
 35 files changed, 858 insertions(+), 399 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
index 7b936df..1bf6e9c 100644
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
+++ 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
@@ -51,7 +51,7 @@ import java.util.concurrent.Callable;
 import static org.apache.storm.sql.TestUtils.MockState.getCollectedValues;
 
 public class TestPlanCompiler {
-  private static LocalCluster cluster;
+  private static ILocalCluster cluster;
 
   @BeforeClass
   public static void staticSetup() throws Exception {

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
index a2ca68e..f20ee02 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
@@ -71,9 +71,9 @@ public class AutoHBase implements IAutoCredentials, 
ICredentialsRenewer, INimbus
     }
 
     @Override
-    public void populateCredentials(Map<String, String> credentials, Map conf) 
{
+    public void populateCredentials(Map<String, String> credentials, 
Map<String, Object> conf, String owner) {
         try {
-            credentials.put(getCredentialKey(), 
DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
+            credentials.put(getCredentialKey(), 
DatatypeConverter.printBase64Binary(getHadoopCredentials(conf, owner)));
         } catch (Exception e) {
             LOG.error("Could not populate HBase credentials.", e);
         }
@@ -163,12 +163,10 @@ public class AutoHBase implements IAutoCredentials, 
ICredentialsRenewer, INimbus
     }
 
     @SuppressWarnings("unchecked")
-    protected byte[] getHadoopCredentials(Map conf) {
+    protected byte[] getHadoopCredentials(Map<String, Object> conf, final 
String topologyOwnerPrincipal) {
         try {
             final Configuration hbaseConf = HBaseConfiguration.create();
             if(UserGroupInformation.isSecurityEnabled()) {
-                final String topologySubmitterUser = (String) 
conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
-
                 UserProvider provider = UserProvider.instantiate(hbaseConf);
 
                 hbaseConf.set(HBASE_KEYTAB_FILE_KEY, hbaseKeytab);
@@ -180,7 +178,7 @@ public class AutoHBase implements IAutoCredentials, 
ICredentialsRenewer, INimbus
 
                 UserGroupInformation ugi = 
UserGroupInformation.getCurrentUser();
 
-                final UserGroupInformation proxyUser = 
UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
+                final UserGroupInformation proxyUser = 
UserGroupInformation.createProxyUser(topologyOwnerPrincipal, ugi);
 
                 User user = User.create(ugi);
 
@@ -208,9 +206,9 @@ public class AutoHBase implements IAutoCredentials, 
ICredentialsRenewer, INimbus
     }
 
     @Override
-    public void renew(Map<String, String> credentials, Map topologyConf) {
+    public void renew(Map<String, String> credentials, Map<String, Object> 
topologyConf, String ownerPrincipal) {
         //HBASE tokens are not renewable so we always have to get new ones.
-        populateCredentials(credentials, topologyConf);
+        populateCredentials(credentials, topologyConf, ownerPrincipal);
     }
 
     protected String getCredentialKey() {
@@ -220,24 +218,34 @@ public class AutoHBase implements IAutoCredentials, 
ICredentialsRenewer, INimbus
 
     @SuppressWarnings("unchecked")
     public static void main(String[] args) throws Exception {
-        Map conf = new HashMap();
-        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm 
e.g. [email protected]
+        Map<String, Object> conf = new HashMap<>();
+        final String topologyOwnerPrincipal = args[0]; //with realm e.g. 
[email protected]
         conf.put(HBASE_PRINCIPAL_KEY,args[1]); // hbase principal 
[email protected]
         conf.put(HBASE_KEYTAB_FILE_KEY,args[2]); // storm hbase keytab 
/etc/security/keytabs/storm-hbase.keytab
 
         AutoHBase autoHBase = new AutoHBase();
         autoHBase.prepare(conf);
 
-        Map<String,String> creds  = new HashMap<String, String>();
-        autoHBase.populateCredentials(creds, conf);
+        Map<String,String> creds  = new HashMap<>();
+        autoHBase.populateCredentials(creds, conf, topologyOwnerPrincipal);
         LOG.info("Got HBase credentials" + autoHBase.getCredentials(creds));
 
         Subject s = new Subject();
         autoHBase.populateSubject(s, creds);
         LOG.info("Got a Subject " + s);
 
-        autoHBase.renew(creds, conf);
+        autoHBase.renew(creds, conf, topologyOwnerPrincipal);
         LOG.info("renewed credentials" + autoHBase.getCredentials(creds));
     }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials, Map 
topoConf) {
+        throw new IllegalStateException("SHOULD NOT BE CALLED");
+    }
+
+    @Override
+    public void renew(Map<String, String> credentials, Map topologyConf) {
+        throw new IllegalStateException("SHOULD NOT BE CALLED");
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
index ff3f9cc..5f370b8 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
@@ -18,7 +18,6 @@
 
 package org.apache.storm.hdfs.common.security;
 
-import org.apache.storm.Config;
 import org.apache.storm.security.INimbusCredentialPlugin;
 import org.apache.storm.security.auth.IAutoCredentials;
 import org.apache.storm.security.auth.ICredentialsRenewer;
@@ -71,9 +70,9 @@ public class AutoHDFS implements IAutoCredentials, 
ICredentialsRenewer, INimbusC
     }
 
     @Override
-    public void populateCredentials(Map<String, String> credentials, Map conf) 
{
+    public void populateCredentials(Map<String, String> credentials, 
Map<String, Object> topoConf, String topologyOwnerPrincipal) {
         try {
-            credentials.put(getCredentialKey(), 
DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
+            credentials.put(getCredentialKey(), 
DatatypeConverter.printBase64Binary(getHadoopCredentials(topoConf, 
topologyOwnerPrincipal)));
             LOG.info("HDFS tokens added to credentials map.");
         } catch (Exception e) {
             LOG.error("Could not populate HDFS credentials.", e);
@@ -85,8 +84,7 @@ public class AutoHDFS implements IAutoCredentials, 
ICredentialsRenewer, INimbusC
         credentials.put(HDFS_CREDENTIALS, 
DatatypeConverter.printBase64Binary("dummy place holder".getBytes()));
     }
 
-    /*
- *
+    /**
  * @param credentials map with creds.
  * @return instance of org.apache.hadoop.security.Credentials.
  * this class's populateCredentials must have been called before.
@@ -167,7 +165,7 @@ public class AutoHDFS implements IAutoCredentials, 
ICredentialsRenewer, INimbusC
      */
     @Override
     @SuppressWarnings("unchecked")
-    public void renew(Map<String, String> credentials, Map topologyConf) {
+    public void renew(Map<String, String> credentials,Map<String, Object> 
topologyConf, String topologyOwnerPrincipal) {
         try {
             Credentials credential = getCredentials(credentials);
             if (credential != null) {
@@ -189,26 +187,24 @@ public class AutoHDFS implements IAutoCredentials, 
ICredentialsRenewer, INimbusC
         } catch (Exception e) {
             LOG.warn("could not renew the credentials, one of the possible 
reason is tokens are beyond " +
                     "renewal period so attempting to get new tokens.", e);
-            populateCredentials(credentials, topologyConf);
+            populateCredentials(credentials, topologyConf, 
topologyOwnerPrincipal);
         }
     }
 
     @SuppressWarnings("unchecked")
-    protected byte[] getHadoopCredentials(Map conf) {
+    protected byte[] getHadoopCredentials(Map conf, final String 
topologyOwnerPrincipal) {
         try {
             if(UserGroupInformation.isSecurityEnabled()) {
                 final Configuration configuration = new Configuration();
 
                 login(configuration);
 
-                final String topologySubmitterUser = (String) 
conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
-
                 final URI nameNodeURI = conf.containsKey(TOPOLOGY_HDFS_URI) ? 
new URI(conf.get(TOPOLOGY_HDFS_URI).toString())
                         : FileSystem.getDefaultUri(configuration);
 
                 UserGroupInformation ugi = 
UserGroupInformation.getCurrentUser();
 
-                final UserGroupInformation proxyUser = 
UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
+                final UserGroupInformation proxyUser = 
UserGroupInformation.createProxyUser(topologyOwnerPrincipal, ugi);
 
                 Credentials creds = (Credentials) proxyUser.doAs(new 
PrivilegedAction<Object>() {
                     @Override
@@ -218,7 +214,7 @@ public class AutoHDFS implements IAutoCredentials, 
ICredentialsRenewer, INimbusC
                             Credentials credential= proxyUser.getCredentials();
 
                             fileSystem.addDelegationTokens(hdfsPrincipal, 
credential);
-                            LOG.info("Delegation tokens acquired for user {}", 
topologySubmitterUser);
+                            LOG.info("Delegation tokens acquired for user {}", 
topologyOwnerPrincipal);
                             return credential;
                         } catch (IOException e) {
                             throw new RuntimeException(e);
@@ -257,8 +253,8 @@ public class AutoHDFS implements IAutoCredentials, 
ICredentialsRenewer, INimbusC
 
     @SuppressWarnings("unchecked")
     public static void main(String[] args) throws Exception {
-        Map conf = new HashMap();
-        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm 
e.g. [email protected]
+        Map<String, Object> conf = new HashMap();
+        final String topologyOwnerPrincipal = args[0]; //with realm e.g. 
[email protected]
         conf.put(STORM_USER_NAME_KEY, args[1]); //with realm e.g. 
[email protected]
         conf.put(STORM_KEYTAB_FILE_KEY, args[2]);// 
/etc/security/keytabs/storm.keytab
 
@@ -266,16 +262,26 @@ public class AutoHDFS implements IAutoCredentials, 
ICredentialsRenewer, INimbusC
         AutoHDFS autoHDFS = new AutoHDFS();
         autoHDFS.prepare(conf);
 
-        Map<String,String> creds  = new HashMap<String, String>();
-        autoHDFS.populateCredentials(creds, conf);
+        Map<String,String> creds  = new HashMap<>();
+        autoHDFS.populateCredentials(creds, conf, topologyOwnerPrincipal);
         LOG.info("Got HDFS credentials", autoHDFS.getCredentials(creds));
 
         Subject s = new Subject();
         autoHDFS.populateSubject(s, creds);
         LOG.info("Got a Subject "+ s);
 
-        autoHDFS.renew(creds, conf);
+        autoHDFS.renew(creds, conf, topologyOwnerPrincipal);
         LOG.info("renewed credentials", autoHDFS.getCredentials(creds));
     }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials, Map 
topoConf) {
+        throw new IllegalStateException("SHOULD NOT BE CALLED");
+    }
+
+    @Override
+    public void renew(Map<String, String> credentials, Map topologyConf) {
+        throw new IllegalStateException("SHOULD NOT BE CALLED");
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/converter.clj 
b/storm-core/src/clj/org/apache/storm/converter.clj
index bb2dc87..828d425 100644
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ b/storm-core/src/clj/org/apache/storm/converter.clj
@@ -69,6 +69,8 @@
                                                                
(.set_mem_off_heap (second resources))
                                                                (.set_cpu (last 
resources)))])
                                                           (:worker->resources 
assignment)))))
+    (if (:owner assignment)
+      (.set_owner thrift-assignment (:owner assignment)))
     thrift-assignment))
 
 (defn clojurify-executor->node_port [executor->node_port]
@@ -98,7 +100,8 @@
       (clojurify-executor->node_port (into {} (.get_executor_node_port 
assignment)))
       (map-key (fn [executor] (into [] executor))
         (into {} (.get_executor_start_time_secs assignment)))
-      (clojurify-worker->resources (into {} (.get_worker_resources 
assignment))))))
+      (clojurify-worker->resources (into {} (.get_worker_resources 
assignment)))
+      (.get_owner assignment))))
 
 (defn convert-to-symbol-from-status [status]
   (condp = status
@@ -188,7 +191,8 @@
     (.set_owner (:owner storm-base))
     (.set_topology_action_options (thriftify-topology-action-options 
storm-base))
     (.set_prev_status (convert-to-status-from-symbol (:prev-status 
storm-base)))
-    (.set_component_debug (map-val thriftify-debugoptions (:component->debug 
storm-base)))))
+    (.set_component_debug (map-val thriftify-debugoptions (:component->debug 
storm-base)))
+    (.set_principal (:principal storm-base))))
 
 (defn clojurify-storm-base [^StormBase storm-base]
   (if storm-base
@@ -201,7 +205,8 @@
       (.get_owner storm-base)
       (clojurify-topology-action-options (.get_topology_action_options 
storm-base))
       (convert-to-symbol-from-status (.get_prev_status storm-base))
-      (map-val clojurify-debugoptions (.get_component_debug storm-base)))))
+      (map-val clojurify-debugoptions (.get_component_debug storm-base))
+      (.get_principal storm-base))))
 
 (defn thriftify-stats [stats]
   (if stats

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj 
b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index db01727..9ee2261 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -66,11 +66,11 @@
 ;; the task id is the virtual port
 ;; node->host is here so that tasks know who to talk to just from assignment
 ;; this avoid situation where node goes down and task doesn't know what to do 
information-wise
-(defrecord Assignment [master-code-dir node->host executor->node+port 
executor->start-time-secs worker->resources])
+(defrecord Assignment [master-code-dir node->host executor->node+port 
executor->start-time-secs worker->resources owner])
 
 
 ;; component->executors is a map from spout/bolt id to number of executors for 
that component
-(defrecord StormBase [storm-name launch-time-secs status num-workers 
component->executors owner topology-action-options prev-status 
component->debug])
+(defrecord StormBase [storm-name launch-time-secs status num-workers 
component->executors owner topology-action-options prev-status component->debug 
principal])
 
 (defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta 
scheduler-meta uptime-secs version resources-map])
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj 
b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 3bd8a17..7607b1b 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -16,6 +16,7 @@
 (ns org.apache.storm.daemon.nimbus
   (:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
   (:import [org.apache.storm.generated KeyNotFoundException])
+  (:import [org.apache.storm.security INimbusCredentialPlugin])
   (:import [org.apache.storm.blobstore LocalFsBlobStore])
   (:import [org.apache.thrift.protocol TBinaryProtocol 
TBinaryProtocol$Factory])
   (:import [org.apache.thrift.exception])
@@ -540,12 +541,23 @@
     (Utils/fromCompressedJsonConf
       (.readBlob blob-store (master-stormconf-key storm-id) nimbus-subject))))
 
+(defn fixup-storm-base
+  [storm-base topo-conf]
+  (assoc storm-base
+         :owner (.get topo-conf TOPOLOGY-SUBMITTER-USER)
+         :principal (.get topo-conf TOPOLOGY-SUBMITTER-PRINCIPAL)))
+
 (defn read-topology-details [nimbus storm-id]
   (let [blob-store (:blob-store nimbus)
         storm-base (or
                      (.storm-base (:storm-cluster-state nimbus) storm-id nil)
                      (throw (NotAliveException. storm-id)))
         topology-conf (read-storm-conf-as-nimbus storm-id blob-store)
+        storm-base (if (nil? (:principal storm-base))
+                      (let [new-sb (fixup-storm-base storm-base topology-conf)]
+                        (.update-storm! (:storm-cluster-state nimbus) storm-id 
new-sb)
+                        new-sb)
+                      storm-base)
         topology (read-storm-topology-as-nimbus storm-id blob-store)
         executor->component (->> (compute-executor->component nimbus storm-id)
                                  (map-key (fn [[start-task end-task]]
@@ -555,7 +567,8 @@
                       topology
                       (:num-workers storm-base)
                       executor->component
-                      (:launch-time-secs storm-base))))
+                      (:launch-time-secs storm-base)
+                      (:owner storm-base))))
 
 ;; Does not assume that clocks are synchronized. Executor heartbeat is only 
used so that
 ;; nimbus knows when it's received a new heartbeat. All timing is done by 
nimbus and
@@ -959,6 +972,11 @@
 (defn- to-worker-slot [[node port]]
   (WorkerSlot. node port))
 
+(defn- fixup-assignment
+  [assignment td]
+  (assoc assignment
+         :owner (.getTopologySubmitter td)))
+
 ;; get existing assignment (just the executor->node+port map) -> default to {}
 ;; filter out ones which have a executor timeout
 ;; figure out available slots on cluster. add to that the used valid slots to 
get total slots. figure out how many executors should be in each slot (e.g., 4, 
4, 4, 5)
@@ -970,9 +988,9 @@
         storm-cluster-state (:storm-cluster-state nimbus)
         ^INimbus inimbus (:inimbus nimbus)
         ;; read all the topologies
-        topologies (locking (:submit-lock nimbus) (into {} (for [tid 
(.active-storms storm-cluster-state)]
+        tds (locking (:submit-lock nimbus) (into {} (for [tid (.active-storms 
storm-cluster-state)]
                               {tid (read-topology-details nimbus tid)})))
-        topologies (Topologies. topologies)
+        topologies (Topologies. tds)
         ;; read all the assignments
         assigned-topology-ids (.assignments storm-cluster-state nil)
         existing-assignments (into {} (for [tid assigned-topology-ids]
@@ -980,7 +998,14 @@
                                         ;; we exclude its assignment, meaning 
that all the slots occupied by its assignment
                                         ;; will be treated as free slot in the 
scheduler code.
                                         (when (or (nil? scratch-topology-id) 
(not= tid scratch-topology-id))
-                                          {tid (.assignment-info 
storm-cluster-state tid nil)})))]
+                                          (let [assignment (.assignment-info 
storm-cluster-state tid nil)
+                                                td (.get tds tid)
+                                                assignment (if (and (not 
(:owner assignment)) (not (nil? td)))
+                                                             (let 
[new-assignment (fixup-assignment assignment td)]
+                                                               
(.set-assignment! storm-cluster-state tid new-assignment)
+                                                               new-assignment)
+                                                             assignment)]
+                                            {tid assignment}))))]
       ;; make the new assignments for topologies
       (locking (:sched-lock nimbus) (let [
           new-scheduler-assignments (compute-new-scheduler-assignments
@@ -1019,7 +1044,8 @@
                                                    (select-keys all-node->host 
all-nodes)
                                                    executor->node+port
                                                    start-times
-                                                   worker->resources)}))]
+                                                   worker->resources
+                                                   (.getTopologySubmitter 
(.get tds topology-id)))}))]
 
         (when (not= new-assignments existing-assignments)
           (log-debug "RESETTING id->resources and id->worker-resources cache!")
@@ -1052,7 +1078,7 @@
         (catch Exception e
         (log-warn-error e "Ignoring exception from Topology action notifier 
for storm-Id " storm-id))))))
 
-(defn- start-storm [nimbus storm-name storm-id topology-initial-status]
+(defn- start-storm [nimbus storm-name storm-id topology-initial-status owner 
principal]
   {:pre [(#{:active :inactive} topology-initial-status)]}
   (let [storm-cluster-state (:storm-cluster-state nimbus)
         conf (:conf nimbus)
@@ -1068,18 +1094,13 @@
                                   {:type topology-initial-status}
                                   (storm-conf TOPOLOGY-WORKERS)
                                   num-executors
-                                  (storm-conf TOPOLOGY-SUBMITTER-USER)
+                                  owner
                                   nil
                                   nil
-                                  {}))
+                                  {}
+                                  principal))
     (notify-topology-action-listener nimbus storm-name "activate")))
 
-;; Master:
-;; job submit:
-;; 1. read which nodes are available
-;; 2. set assignments
-;; 3. start storm - necessary in case master goes down, when goes back up can 
remember to take down the storm (2 states: on or off)
-
 (defn storm-active? [storm-cluster-state storm-name]
   (not-nil? (get-storm-id storm-cluster-state storm-name)))
 
@@ -1406,12 +1427,18 @@
         (doseq [id assigned-ids]
           (locking update-lock
             (let [orig-creds (.credentials storm-cluster-state id nil)
+                  storm-base (.storm-base storm-cluster-state id nil)
                   topology-conf (try-read-storm-conf (:conf nimbus) id 
blob-store)]
               (if orig-creds
                 (let [new-creds (HashMap. orig-creds)]
                   (doseq [renewer renewers]
                     (log-message "Renewing Creds For " id " with " renewer)
-                    (.renew renewer new-creds (Collections/unmodifiableMap 
topology-conf)))
+                    ;;Instead of trying to use reflection to make this work, 
lets just catch the error
+                    ;; when it does not work
+                    (try
+                      (.renew renewer new-creds (Collections/unmodifiableMap 
topology-conf) (:principal storm-base))
+                      (catch clojure.lang.ArityException e
+                        (.renew renewer new-creds (Collections/unmodifiableMap 
topology-conf)))))
                   (when-not (= orig-creds new-creds)
                     (.set-credentials! storm-cluster-state id new-creds 
topology-conf)
                     ))))))))
@@ -1676,9 +1703,11 @@
               submitter-user (.toLocal principal-to-local principal)
               system-user (System/getProperty "user.name")
               topo-acl (distinct (remove nil? (conj (.get storm-conf-submitted 
TOPOLOGY-USERS) submitter-principal, submitter-user)))
+              submitter-principal (if submitter-principal submitter-principal 
"")
+              submitter-user (if submitter-user submitter-user system-user)
               storm-conf (-> storm-conf-submitted
-                             (assoc TOPOLOGY-SUBMITTER-PRINCIPAL (if 
submitter-principal submitter-principal ""))
-                             (assoc TOPOLOGY-SUBMITTER-USER (if submitter-user 
submitter-user system-user)) ;Don't let the user set who we launch as
+                             (assoc TOPOLOGY-SUBMITTER-PRINCIPAL 
submitter-principal)
+                             (assoc TOPOLOGY-SUBMITTER-USER submitter-user) 
;Don't let the user set who we launch as
                              (assoc TOPOLOGY-USERS topo-acl)
                              (assoc STORM-ZOOKEEPER-SUPERACL (.get conf 
STORM-ZOOKEEPER-SUPERACL)))
               storm-conf (if (Utils/isZkAuthenticationConfiguredStormServer 
conf)
@@ -1688,7 +1717,11 @@
               topology (normalize-topology total-storm-conf topology)
               storm-cluster-state (:storm-cluster-state nimbus)]
           (when credentials (doseq [nimbus-autocred-plugin 
(:nimbus-autocred-plugins nimbus)]
-                              (.populateCredentials nimbus-autocred-plugin 
credentials (Collections/unmodifiableMap storm-conf))))
+                              ;;Instead of using reflection just recover from 
the error it not being there throws
+                              (try
+                                (.populateCredentials ^INimbusCredentialPlugin 
nimbus-autocred-plugin ^Map credentials ^Map (Collections/unmodifiableMap 
storm-conf) ^String submitter-principal)
+                                (catch clojure.lang.ArityException e
+                                  (.populateCredentials 
^INimbusCredentialPlugin nimbus-autocred-plugin ^Map credentials ^Map 
(Collections/unmodifiableMap storm-conf))))))
           (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? 
submitter-user) (.isEmpty (.trim submitter-user))))
             (throw (AuthorizationException. "Could not determine the user to 
run this topology as.")))
           (system-topology! total-storm-conf topology) ;; this validates the 
structure of the topology
@@ -1717,7 +1750,7 @@
             (notify-topology-action-listener nimbus storm-name 
"submitTopology")
             (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE 
:inactive
                                             TopologyInitialStatus/ACTIVE 
:active}]
-              (start-storm nimbus storm-name storm-id 
(thrift-status->kw-status (.get_initial_status submitOptions))))))
+              (start-storm nimbus storm-name storm-id 
(thrift-status->kw-status (.get_initial_status submitOptions)) submitter-user 
submitter-principal))))
         (catch Throwable e
           (log-warn-error e "Topology submission exception. (topology name='" 
storm-name "')")
           (throw e))))

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java 
b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 972d778..7f834fb 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -570,6 +570,10 @@ public class StormClusterStateImpl implements 
IStormClusterState {
         if (StringUtils.isBlank(newElems.get_owner())) {
             newElems.set_owner(stormBase.get_owner());
         }
+        if (StringUtils.isBlank(newElems.get_principal()) && 
stormBase.is_set_principal()) {
+            newElems.set_principal(stormBase.get_principal());
+        }
+
         if (newElems.get_topology_action_options() == null) {
             
newElems.set_topology_action_options(stormBase.get_topology_action_options());
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
index 87d726f..2e0f1ee 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
@@ -102,13 +102,13 @@ public class AdvancedFSOps {
         }
         
         @Override
-        public void setupStormCodeDir(Map<String, Object> topologyConf, File 
path) throws IOException {
-            SupervisorUtils.setupStormCodeDir(_conf, topologyConf, 
path.getCanonicalPath());
+        public void setupStormCodeDir(String user, File path) throws 
IOException {
+            SupervisorUtils.setupStormCodeDir(_conf, user, 
path.getCanonicalPath());
         }
 
         @Override
-        public void setupWorkerArtifactsDir(Map<String, Object> topologyConf, 
File path) throws IOException {
-            SupervisorUtils.setupWorkerArtifactsDir(_conf, topologyConf, 
path.getCanonicalPath());
+        public void setupWorkerArtifactsDir(String user, File path) throws 
IOException {
+            SupervisorUtils.setupWorkerArtifactsDir(_conf, user, 
path.getCanonicalPath());
         }
     }
     
@@ -233,21 +233,21 @@ public class AdvancedFSOps {
 
     /**
      * Setup the permissions for the storm code dir
-     * @param topologyConf the config of the Topology
+     * @param user the user that owns the topology
      * @param path the directory to set the permissions on
      * @throws IOException on any error
      */
-    public void setupStormCodeDir(Map<String, Object> topologyConf, File path) 
throws IOException {
+    public void setupStormCodeDir(String user, File path) throws IOException {
         //By default this is a NOOP
     }
 
     /**
      * Setup the permissions for the worker artifacts dirs
-     * @param topologyConf the config of the Topology
+     * @param user the user that owns the topology
      * @param path the directory to set the permissions on
      * @throws IOException on any error
      */
-    public void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File 
path) throws IOException {
+    public void setupWorkerArtifactsDir(String user, File path) throws 
IOException {
         //By default this is a NOOP
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
index 859ccf1..345454c 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
@@ -317,7 +317,7 @@ public abstract class Container implements Killable {
         File workerArtifacts = new File(ConfigUtils.workerArtifactsRoot(_conf, 
_topologyId, _port));
         if (!_ops.fileExists(workerArtifacts)) {
             _ops.forceMkdir(workerArtifacts);
-            _ops.setupWorkerArtifactsDir(_topoConf, workerArtifacts);
+            _ops.setupWorkerArtifactsDir(_assignment.get_owner(), 
workerArtifacts);
         }
     
         String user = getWorkerUser();
@@ -462,8 +462,8 @@ public abstract class Container implements Killable {
 
         if (_ops.fileExists(file)) {
             return _ops.slurpString(file).trim();
-        } else if (_topoConf != null) { 
-            return (String) _topoConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+        } else if (_assignment != null && _assignment.is_set_owner()) {
+            return _assignment.get_owner();
         }
         if (ConfigUtils.isLocalMode(_conf)) {
             return System.getProperty("user.name");

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
index 8fefe62..8857ef9 100644
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -276,6 +276,9 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
                             if (slotsResources.containsKey(port)) {
                                 
localAssignment.set_resources(slotsResources.get(port));
                             }
+                            if (assignment.is_set_owner()) {
+                                
localAssignment.set_owner(assignment.get_owner());
+                            }
                             portTasks.put(port.intValue(), localAssignment);
                         }
                         List<ExecutorInfo> executorInfoList = 
localAssignment.get_executors();

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
index a6adace..386beb5 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -208,8 +208,20 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
         this.readState = new ReadClusterState(this);
         
         Set<String> downloadedTopoIds = 
SupervisorUtils.readDownloadedTopologyIds(conf);
-        for (String topoId : downloadedTopoIds) {
-            SupervisorUtils.addBlobReferences(localizer, topoId, conf);
+        Map<Integer, LocalAssignment> portToAssignments = 
localState.getLocalAssignmentsMap();
+        if (portToAssignments != null) {
+            Map<String, LocalAssignment> assignments = new HashMap<>();
+            for (LocalAssignment la : 
localState.getLocalAssignmentsMap().values()) {
+                assignments.put(la.get_topology_id(), la);
+            }
+            for (String topoId : downloadedTopoIds) {
+                LocalAssignment la = assignments.get(topoId);
+                if (la != null) {
+                    SupervisorUtils.addBlobReferences(localizer, topoId, conf, 
la.get_owner());
+                } else {
+                    LOG.warn("Could not find an owner for topo {}", topoId);
+                }
+            }
         }
         // do this after adding the references so we don't try to clean things 
being used
         localizer.startCleaner();

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 19d3b78..ef4b54d 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -94,23 +94,23 @@ public class SupervisorUtils {
         return ret;
     }
 
-    public static void setupStormCodeDir(Map<String, Object> conf, Map<String, 
Object> stormConf, String dir) throws IOException {
+    public static void setupStormCodeDir(Map<String, Object> conf, String 
user, String dir) throws IOException {
         if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), 
false)) {
             String logPrefix = "Storm Code Dir Setup for " + dir;
             List<String> commands = new ArrayList<>();
             commands.add("code-dir");
             commands.add(dir);
-            processLauncherAndWait(conf, (String) 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+            processLauncherAndWait(conf, user, commands, null, logPrefix);
         }
     }
 
-    public static void setupWorkerArtifactsDir(Map<String, Object> conf, 
Map<String, Object> stormConf, String dir) throws IOException {
+    public static void setupWorkerArtifactsDir(Map<String, Object> conf, 
String user, String dir) throws IOException {
         if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), 
false)) {
             String logPrefix = "Worker Artifacts Setup for " + dir;
             List<String> commands = new ArrayList<>();
             commands.add("artifacts-dir");
             commands.add(dir);
-            processLauncherAndWait(conf, (String) 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+            processLauncherAndWait(conf, user, commands, null, logPrefix);
         }
     }
 
@@ -161,10 +161,9 @@ public class SupervisorUtils {
      * @param stormId
      * @param conf
      */
-    static void addBlobReferences(Localizer localizer, String stormId, 
Map<String, Object> conf) throws IOException {
+    static void addBlobReferences(Localizer localizer, String stormId, 
Map<String, Object> conf, String user) throws IOException {
         Map<String, Object> stormConf = 
ConfigUtils.readSupervisorStormConf(conf, stormId);
         Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
         String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
         List<LocalResource> localresources = 
SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
         if (blobstoreMap != null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
index 0b6d996..746578e 100644
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
+++ 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.daemon.supervisor.timer;
 
+import java.util.HashMap;
 import org.apache.storm.Config;
 import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.daemon.supervisor.SupervisorUtils;
@@ -59,15 +60,16 @@ public class UpdateBlobs implements Runnable {
             Map<String, Object> conf = supervisor.getConf();
             Set<String> downloadedStormIds = 
SupervisorUtils.readDownloadedTopologyIds(conf);
             AtomicReference<Map<Long, LocalAssignment>> newAssignment = 
supervisor.getCurrAssignment();
-            Set<String> assignedStormIds = new HashSet<>();
+            Map<String, LocalAssignment> assignedStormIds = new HashMap<>();
             for (LocalAssignment localAssignment : 
newAssignment.get().values()) {
-                assignedStormIds.add(localAssignment.get_topology_id());
+                assignedStormIds.put(localAssignment.get_topology_id(), 
localAssignment);
             }
             for (String stormId : downloadedStormIds) {
-                if (assignedStormIds.contains(stormId)) {
+                LocalAssignment la = assignedStormIds.get(stormId);
+                if (la != null) {
                     String stormRoot = 
ConfigUtils.supervisorStormDistRoot(conf, stormId);
                     LOG.debug("Checking Blob updates for storm topology id {} 
With target_dir: {}", stormId, stormRoot);
-                    updateBlobsForTopology(conf, stormId, 
supervisor.getLocalizer());
+                    updateBlobsForTopology(conf, stormId, 
supervisor.getLocalizer(), la.get_owner());
                 }
             }
         } catch (Exception e) {
@@ -89,10 +91,9 @@ public class UpdateBlobs implements Runnable {
      * @param localizer
      * @throws IOException
      */
-    private void updateBlobsForTopology(Map conf, String stormId, Localizer 
localizer) throws IOException {
-        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
-        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+    private void updateBlobsForTopology(Map<String, Object> conf, String 
stormId, Localizer localizer, String user) throws IOException {
+        Map<String, Object> topoConf = 
ConfigUtils.readSupervisorStormConf(conf, stormId);
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
         List<LocalResource> localresources = 
SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
         try {
             localizer.updateBlobs(localresources, user);

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/generated/Assignment.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/Assignment.java 
b/storm-core/src/jvm/org/apache/storm/generated/Assignment.java
index 4c973d5..394f934 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/Assignment.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/Assignment.java
@@ -60,6 +60,7 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
   private static final org.apache.thrift.protocol.TField 
EXECUTOR_NODE_PORT_FIELD_DESC = new 
org.apache.thrift.protocol.TField("executor_node_port", 
org.apache.thrift.protocol.TType.MAP, (short)3);
   private static final org.apache.thrift.protocol.TField 
EXECUTOR_START_TIME_SECS_FIELD_DESC = new 
org.apache.thrift.protocol.TField("executor_start_time_secs", 
org.apache.thrift.protocol.TType.MAP, (short)4);
   private static final org.apache.thrift.protocol.TField 
WORKER_RESOURCES_FIELD_DESC = new 
org.apache.thrift.protocol.TField("worker_resources", 
org.apache.thrift.protocol.TType.MAP, (short)5);
+  private static final org.apache.thrift.protocol.TField OWNER_FIELD_DESC = 
new org.apache.thrift.protocol.TField("owner", 
org.apache.thrift.protocol.TType.STRING, (short)7);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = 
new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -72,6 +73,7 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
   private Map<List<Long>,NodeInfo> executor_node_port; // optional
   private Map<List<Long>,Long> executor_start_time_secs; // optional
   private Map<NodeInfo,WorkerResources> worker_resources; // optional
+  private String owner; // optional
 
   /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -79,7 +81,8 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
     NODE_HOST((short)2, "node_host"),
     EXECUTOR_NODE_PORT((short)3, "executor_node_port"),
     EXECUTOR_START_TIME_SECS((short)4, "executor_start_time_secs"),
-    WORKER_RESOURCES((short)5, "worker_resources");
+    WORKER_RESOURCES((short)5, "worker_resources"),
+    OWNER((short)7, "owner");
 
     private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
 
@@ -104,6 +107,8 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
           return EXECUTOR_START_TIME_SECS;
         case 5: // WORKER_RESOURCES
           return WORKER_RESOURCES;
+        case 7: // OWNER
+          return OWNER;
         default:
           return null;
       }
@@ -144,7 +149,7 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
   }
 
   // isset id assignments
-  private static final _Fields optionals[] = 
{_Fields.NODE_HOST,_Fields.EXECUTOR_NODE_PORT,_Fields.EXECUTOR_START_TIME_SECS,_Fields.WORKER_RESOURCES};
+  private static final _Fields optionals[] = 
{_Fields.NODE_HOST,_Fields.EXECUTOR_NODE_PORT,_Fields.EXECUTOR_START_TIME_SECS,_Fields.WORKER_RESOURCES,_Fields.OWNER};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> 
metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -168,6 +173,8 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
         new 
org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, 
             new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 NodeInfo.class), 
             new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 WorkerResources.class))));
+    tmpMap.put(_Fields.OWNER, new 
org.apache.thrift.meta_data.FieldMetaData("owner", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Assignment.class,
 metaDataMap);
   }
@@ -246,6 +253,9 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
       }
       this.worker_resources = __this__worker_resources;
     }
+    if (other.is_set_owner()) {
+      this.owner = other.owner;
+    }
   }
 
   public Assignment deepCopy() {
@@ -263,6 +273,7 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
 
     this.worker_resources = new HashMap<NodeInfo,WorkerResources>();
 
+    this.owner = null;
   }
 
   public String get_master_code_dir() {
@@ -424,6 +435,29 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
     }
   }
 
+  public String get_owner() {
+    return this.owner;
+  }
+
+  public void set_owner(String owner) {
+    this.owner = owner;
+  }
+
+  public void unset_owner() {
+    this.owner = null;
+  }
+
+  /** Returns true if field owner is set (has been assigned a value) and false 
otherwise */
+  public boolean is_set_owner() {
+    return this.owner != null;
+  }
+
+  public void set_owner_isSet(boolean value) {
+    if (!value) {
+      this.owner = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case MASTER_CODE_DIR:
@@ -466,6 +500,14 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
       }
       break;
 
+    case OWNER:
+      if (value == null) {
+        unset_owner();
+      } else {
+        set_owner((String)value);
+      }
+      break;
+
     }
   }
 
@@ -486,6 +528,9 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
     case WORKER_RESOURCES:
       return get_worker_resources();
 
+    case OWNER:
+      return get_owner();
+
     }
     throw new IllegalStateException();
   }
@@ -507,6 +552,8 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
       return is_set_executor_start_time_secs();
     case WORKER_RESOURCES:
       return is_set_worker_resources();
+    case OWNER:
+      return is_set_owner();
     }
     throw new IllegalStateException();
   }
@@ -569,6 +616,15 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
         return false;
     }
 
+    boolean this_present_owner = true && this.is_set_owner();
+    boolean that_present_owner = true && that.is_set_owner();
+    if (this_present_owner || that_present_owner) {
+      if (!(this_present_owner && that_present_owner))
+        return false;
+      if (!this.owner.equals(that.owner))
+        return false;
+    }
+
     return true;
   }
 
@@ -601,6 +657,11 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
     if (present_worker_resources)
       list.add(worker_resources);
 
+    boolean present_owner = true && (is_set_owner());
+    list.add(present_owner);
+    if (present_owner)
+      list.add(owner);
+
     return list.hashCode();
   }
 
@@ -662,6 +723,16 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
         return lastComparison;
       }
     }
+    lastComparison = 
Boolean.valueOf(is_set_owner()).compareTo(other.is_set_owner());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_owner()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.owner, 
other.owner);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -729,6 +800,16 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
       }
       first = false;
     }
+    if (is_set_owner()) {
+      if (!first) sb.append(", ");
+      sb.append("owner:");
+      if (this.owner == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.owner);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -887,6 +968,14 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
             }
             break;
+          case 7: // OWNER
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.owner = iprot.readString();
+              struct.set_owner_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
         }
@@ -979,6 +1068,13 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
           oprot.writeFieldEnd();
         }
       }
+      if (struct.owner != null) {
+        if (struct.is_set_owner()) {
+          oprot.writeFieldBegin(OWNER_FIELD_DESC);
+          oprot.writeString(struct.owner);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1010,7 +1106,10 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
       if (struct.is_set_worker_resources()) {
         optionals.set(3);
       }
-      oprot.writeBitSet(optionals, 4);
+      if (struct.is_set_owner()) {
+        optionals.set(4);
+      }
+      oprot.writeBitSet(optionals, 5);
       if (struct.is_set_node_host()) {
         {
           oprot.writeI32(struct.node_host.size());
@@ -1063,6 +1162,9 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
           }
         }
       }
+      if (struct.is_set_owner()) {
+        oprot.writeString(struct.owner);
+      }
     }
 
     @Override
@@ -1070,7 +1172,7 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
       TTupleProtocol iprot = (TTupleProtocol) prot;
       struct.master_code_dir = iprot.readString();
       struct.set_master_code_dir_isSet(true);
-      BitSet incoming = iprot.readBitSet(4);
+      BitSet incoming = iprot.readBitSet(5);
       if (incoming.get(0)) {
         {
           org.apache.thrift.protocol.TMap _map652 = new 
org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, 
org.apache.thrift.protocol.TType.STRING, iprot.readI32());
@@ -1152,6 +1254,10 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
         }
         struct.set_worker_resources_isSet(true);
       }
+      if (incoming.get(4)) {
+        struct.owner = iprot.readString();
+        struct.set_owner_isSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/generated/LocalAssignment.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/LocalAssignment.java 
b/storm-core/src/jvm/org/apache/storm/generated/LocalAssignment.java
index b48d342..5071499 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/LocalAssignment.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/LocalAssignment.java
@@ -58,6 +58,7 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
   private static final org.apache.thrift.protocol.TField 
TOPOLOGY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("topology_id", 
org.apache.thrift.protocol.TType.STRING, (short)1);
   private static final org.apache.thrift.protocol.TField EXECUTORS_FIELD_DESC 
= new org.apache.thrift.protocol.TField("executors", 
org.apache.thrift.protocol.TType.LIST, (short)2);
   private static final org.apache.thrift.protocol.TField RESOURCES_FIELD_DESC 
= new org.apache.thrift.protocol.TField("resources", 
org.apache.thrift.protocol.TType.STRUCT, (short)3);
+  private static final org.apache.thrift.protocol.TField OWNER_FIELD_DESC = 
new org.apache.thrift.protocol.TField("owner", 
org.apache.thrift.protocol.TType.STRING, (short)5);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = 
new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -68,12 +69,14 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
   private String topology_id; // required
   private List<ExecutorInfo> executors; // required
   private WorkerResources resources; // optional
+  private String owner; // optional
 
   /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
     TOPOLOGY_ID((short)1, "topology_id"),
     EXECUTORS((short)2, "executors"),
-    RESOURCES((short)3, "resources");
+    RESOURCES((short)3, "resources"),
+    OWNER((short)5, "owner");
 
     private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
 
@@ -94,6 +97,8 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
           return EXECUTORS;
         case 3: // RESOURCES
           return RESOURCES;
+        case 5: // OWNER
+          return OWNER;
         default:
           return null;
       }
@@ -134,7 +139,7 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
   }
 
   // isset id assignments
-  private static final _Fields optionals[] = {_Fields.RESOURCES};
+  private static final _Fields optionals[] = {_Fields.RESOURCES,_Fields.OWNER};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> 
metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -145,6 +150,8 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
             new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 ExecutorInfo.class))));
     tmpMap.put(_Fields.RESOURCES, new 
org.apache.thrift.meta_data.FieldMetaData("resources", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 WorkerResources.class)));
+    tmpMap.put(_Fields.OWNER, new 
org.apache.thrift.meta_data.FieldMetaData("owner", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LocalAssignment.class,
 metaDataMap);
   }
@@ -178,6 +185,9 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
     if (other.is_set_resources()) {
       this.resources = new WorkerResources(other.resources);
     }
+    if (other.is_set_owner()) {
+      this.owner = other.owner;
+    }
   }
 
   public LocalAssignment deepCopy() {
@@ -189,6 +199,7 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
     this.topology_id = null;
     this.executors = null;
     this.resources = null;
+    this.owner = null;
   }
 
   public String get_topology_id() {
@@ -275,6 +286,29 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
     }
   }
 
+  public String get_owner() {
+    return this.owner;
+  }
+
+  public void set_owner(String owner) {
+    this.owner = owner;
+  }
+
+  public void unset_owner() {
+    this.owner = null;
+  }
+
+  /** Returns true if field owner is set (has been assigned a value) and false 
otherwise */
+  public boolean is_set_owner() {
+    return this.owner != null;
+  }
+
+  public void set_owner_isSet(boolean value) {
+    if (!value) {
+      this.owner = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case TOPOLOGY_ID:
@@ -301,6 +335,14 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
       }
       break;
 
+    case OWNER:
+      if (value == null) {
+        unset_owner();
+      } else {
+        set_owner((String)value);
+      }
+      break;
+
     }
   }
 
@@ -315,6 +357,9 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
     case RESOURCES:
       return get_resources();
 
+    case OWNER:
+      return get_owner();
+
     }
     throw new IllegalStateException();
   }
@@ -332,6 +377,8 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
       return is_set_executors();
     case RESOURCES:
       return is_set_resources();
+    case OWNER:
+      return is_set_owner();
     }
     throw new IllegalStateException();
   }
@@ -376,6 +423,15 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
         return false;
     }
 
+    boolean this_present_owner = true && this.is_set_owner();
+    boolean that_present_owner = true && that.is_set_owner();
+    if (this_present_owner || that_present_owner) {
+      if (!(this_present_owner && that_present_owner))
+        return false;
+      if (!this.owner.equals(that.owner))
+        return false;
+    }
+
     return true;
   }
 
@@ -398,6 +454,11 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
     if (present_resources)
       list.add(resources);
 
+    boolean present_owner = true && (is_set_owner());
+    list.add(present_owner);
+    if (present_owner)
+      list.add(owner);
+
     return list.hashCode();
   }
 
@@ -439,6 +500,16 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
         return lastComparison;
       }
     }
+    lastComparison = 
Boolean.valueOf(is_set_owner()).compareTo(other.is_set_owner());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_owner()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.owner, 
other.owner);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -484,6 +555,16 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
       }
       first = false;
     }
+    if (is_set_owner()) {
+      if (!first) sb.append(", ");
+      sb.append("owner:");
+      if (this.owner == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.owner);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -574,6 +655,14 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
             }
             break;
+          case 5: // OWNER
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.owner = iprot.readString();
+              struct.set_owner_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
         }
@@ -611,6 +700,13 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
           oprot.writeFieldEnd();
         }
       }
+      if (struct.owner != null) {
+        if (struct.is_set_owner()) {
+          oprot.writeFieldBegin(OWNER_FIELD_DESC);
+          oprot.writeString(struct.owner);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -640,10 +736,16 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
       if (struct.is_set_resources()) {
         optionals.set(0);
       }
-      oprot.writeBitSet(optionals, 1);
+      if (struct.is_set_owner()) {
+        optionals.set(1);
+      }
+      oprot.writeBitSet(optionals, 2);
       if (struct.is_set_resources()) {
         struct.resources.write(oprot);
       }
+      if (struct.is_set_owner()) {
+        oprot.writeString(struct.owner);
+      }
     }
 
     @Override
@@ -663,12 +765,16 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
         }
       }
       struct.set_executors_isSet(true);
-      BitSet incoming = iprot.readBitSet(1);
+      BitSet incoming = iprot.readBitSet(2);
       if (incoming.get(0)) {
         struct.resources = new WorkerResources();
         struct.resources.read(iprot);
         struct.set_resources_isSet(true);
       }
+      if (incoming.get(1)) {
+        struct.owner = iprot.readString();
+        struct.set_owner_isSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/generated/StormBase.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/StormBase.java 
b/storm-core/src/jvm/org/apache/storm/generated/StormBase.java
index 34b2358..e3114fb 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/StormBase.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/StormBase.java
@@ -64,6 +64,7 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
   private static final org.apache.thrift.protocol.TField 
TOPOLOGY_ACTION_OPTIONS_FIELD_DESC = new 
org.apache.thrift.protocol.TField("topology_action_options", 
org.apache.thrift.protocol.TType.STRUCT, (short)7);
   private static final org.apache.thrift.protocol.TField 
PREV_STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("prev_status", 
org.apache.thrift.protocol.TType.I32, (short)8);
   private static final org.apache.thrift.protocol.TField 
COMPONENT_DEBUG_FIELD_DESC = new 
org.apache.thrift.protocol.TField("component_debug", 
org.apache.thrift.protocol.TType.MAP, (short)9);
+  private static final org.apache.thrift.protocol.TField PRINCIPAL_FIELD_DESC 
= new org.apache.thrift.protocol.TField("principal", 
org.apache.thrift.protocol.TType.STRING, (short)10);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = 
new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -80,6 +81,7 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
   private TopologyActionOptions topology_action_options; // optional
   private TopologyStatus prev_status; // optional
   private Map<String,DebugOptions> component_debug; // optional
+  private String principal; // optional
 
   /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -99,7 +101,8 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
      * @see TopologyStatus
      */
     PREV_STATUS((short)8, "prev_status"),
-    COMPONENT_DEBUG((short)9, "component_debug");
+    COMPONENT_DEBUG((short)9, "component_debug"),
+    PRINCIPAL((short)10, "principal");
 
     private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
 
@@ -132,6 +135,8 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
           return PREV_STATUS;
         case 9: // COMPONENT_DEBUG
           return COMPONENT_DEBUG;
+        case 10: // PRINCIPAL
+          return PRINCIPAL;
         default:
           return null;
       }
@@ -175,7 +180,7 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
   private static final int __NUM_WORKERS_ISSET_ID = 0;
   private static final int __LAUNCH_TIME_SECS_ISSET_ID = 1;
   private byte __isset_bitfield = 0;
-  private static final _Fields optionals[] = 
{_Fields.COMPONENT_EXECUTORS,_Fields.LAUNCH_TIME_SECS,_Fields.OWNER,_Fields.TOPOLOGY_ACTION_OPTIONS,_Fields.PREV_STATUS,_Fields.COMPONENT_DEBUG};
+  private static final _Fields optionals[] = 
{_Fields.COMPONENT_EXECUTORS,_Fields.LAUNCH_TIME_SECS,_Fields.OWNER,_Fields.TOPOLOGY_ACTION_OPTIONS,_Fields.PREV_STATUS,_Fields.COMPONENT_DEBUG,_Fields.PRINCIPAL};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> 
metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -201,6 +206,8 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
         new 
org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, 
             new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING),
 
             new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 DebugOptions.class))));
+    tmpMap.put(_Fields.PRINCIPAL, new 
org.apache.thrift.meta_data.FieldMetaData("principal", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StormBase.class, 
metaDataMap);
   }
@@ -261,6 +268,9 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
       }
       this.component_debug = __this__component_debug;
     }
+    if (other.is_set_principal()) {
+      this.principal = other.principal;
+    }
   }
 
   public StormBase deepCopy() {
@@ -280,6 +290,7 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
     this.topology_action_options = null;
     this.prev_status = null;
     this.component_debug = null;
+    this.principal = null;
   }
 
   public String get_name() {
@@ -525,6 +536,29 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
     }
   }
 
+  public String get_principal() {
+    return this.principal;
+  }
+
+  public void set_principal(String principal) {
+    this.principal = principal;
+  }
+
+  public void unset_principal() {
+    this.principal = null;
+  }
+
+  /** Returns true if field principal is set (has been assigned a value) and 
false otherwise */
+  public boolean is_set_principal() {
+    return this.principal != null;
+  }
+
+  public void set_principal_isSet(boolean value) {
+    if (!value) {
+      this.principal = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case NAME:
@@ -599,6 +633,14 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
       }
       break;
 
+    case PRINCIPAL:
+      if (value == null) {
+        unset_principal();
+      } else {
+        set_principal((String)value);
+      }
+      break;
+
     }
   }
 
@@ -631,6 +673,9 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
     case COMPONENT_DEBUG:
       return get_component_debug();
 
+    case PRINCIPAL:
+      return get_principal();
+
     }
     throw new IllegalStateException();
   }
@@ -660,6 +705,8 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
       return is_set_prev_status();
     case COMPONENT_DEBUG:
       return is_set_component_debug();
+    case PRINCIPAL:
+      return is_set_principal();
     }
     throw new IllegalStateException();
   }
@@ -758,6 +805,15 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
         return false;
     }
 
+    boolean this_present_principal = true && this.is_set_principal();
+    boolean that_present_principal = true && that.is_set_principal();
+    if (this_present_principal || that_present_principal) {
+      if (!(this_present_principal && that_present_principal))
+        return false;
+      if (!this.principal.equals(that.principal))
+        return false;
+    }
+
     return true;
   }
 
@@ -810,6 +866,11 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
     if (present_component_debug)
       list.add(component_debug);
 
+    boolean present_principal = true && (is_set_principal());
+    list.add(present_principal);
+    if (present_principal)
+      list.add(principal);
+
     return list.hashCode();
   }
 
@@ -911,6 +972,16 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
         return lastComparison;
       }
     }
+    lastComparison = 
Boolean.valueOf(is_set_principal()).compareTo(other.is_set_principal());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_principal()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.principal, 
other.principal);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -1006,6 +1077,16 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
       }
       first = false;
     }
+    if (is_set_principal()) {
+      if (!first) sb.append(", ");
+      sb.append("principal:");
+      if (this.principal == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.principal);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -1161,6 +1242,14 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
             }
             break;
+          case 10: // PRINCIPAL
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.principal = iprot.readString();
+              struct.set_principal_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
         }
@@ -1243,6 +1332,13 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
           oprot.writeFieldEnd();
         }
       }
+      if (struct.principal != null) {
+        if (struct.is_set_principal()) {
+          oprot.writeFieldBegin(PRINCIPAL_FIELD_DESC);
+          oprot.writeString(struct.principal);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1282,7 +1378,10 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
       if (struct.is_set_component_debug()) {
         optionals.set(5);
       }
-      oprot.writeBitSet(optionals, 6);
+      if (struct.is_set_principal()) {
+        optionals.set(6);
+      }
+      oprot.writeBitSet(optionals, 7);
       if (struct.is_set_component_executors()) {
         {
           oprot.writeI32(struct.component_executors.size());
@@ -1315,6 +1414,9 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
           }
         }
       }
+      if (struct.is_set_principal()) {
+        oprot.writeString(struct.principal);
+      }
     }
 
     @Override
@@ -1326,7 +1428,7 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
       struct.set_status_isSet(true);
       struct.num_workers = iprot.readI32();
       struct.set_num_workers_isSet(true);
-      BitSet incoming = iprot.readBitSet(6);
+      BitSet incoming = iprot.readBitSet(7);
       if (incoming.get(0)) {
         {
           org.apache.thrift.protocol.TMap _map686 = new 
org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, 
org.apache.thrift.protocol.TType.I32, iprot.readI32());
@@ -1375,6 +1477,10 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
         }
         struct.set_component_debug_isSet(true);
       }
+      if (incoming.get(6)) {
+        struct.principal = iprot.readString();
+        struct.set_principal_isSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java 
b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
index ae5103f..2a9ec24 100644
--- a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
@@ -101,10 +101,12 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
     private class DownloadBaseBlobsDistributed implements Callable<Void> {
         protected final String _topologyId;
         protected final File _stormRoot;
-        
-        public DownloadBaseBlobsDistributed(String topologyId) throws 
IOException {
+        protected final String owner;
+
+        public DownloadBaseBlobsDistributed(String topologyId, String owner) 
throws IOException {
             _topologyId = topologyId;
             _stormRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf, 
_topologyId));
+            this.owner = owner;
         }
         
         protected void downloadBaseBlobs(File tmproot) throws Exception {
@@ -145,7 +147,7 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
                 try {
                     downloadBaseBlobs(tr);
                     _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
-                    
_fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, 
_topologyId), _stormRoot);
+                    _fsOps.setupStormCodeDir(owner, _stormRoot);
                     deleteAll = false;
                 } finally {
                     if (deleteAll) {
@@ -164,8 +166,8 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
     
     private class DownloadBaseBlobsLocal extends DownloadBaseBlobsDistributed {
 
-        public DownloadBaseBlobsLocal(String topologyId) throws IOException {
-            super(topologyId);
+        public DownloadBaseBlobsLocal(String topologyId, String owner) throws 
IOException {
+            super(topologyId, owner);
         }
         
         @Override
@@ -210,21 +212,22 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
     
     private class DownloadBlobs implements Callable<Void> {
         private final String _topologyId;
+        private final String topoOwner;
 
-        public DownloadBlobs(String topologyId) {
+        public DownloadBlobs(String topologyId, String topoOwner) {
             _topologyId = topologyId;
+            this.topoOwner = topoOwner;
         }
 
         @Override
         public Void call() throws Exception {
             try {
                 String stormroot = ConfigUtils.supervisorStormDistRoot(_conf, 
_topologyId);
-                Map<String, Object> stormConf = 
ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
+                Map<String, Object> topoConf = 
ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
 
                 @SuppressWarnings("unchecked")
-                Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-                String user = (String) 
stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
-                String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
+                Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+                String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
 
                 List<LocalResource> localResourceList = new ArrayList<>();
                 if (blobstoreMap != null) {
@@ -247,12 +250,12 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
                 }
 
                 if (!localResourceList.isEmpty()) {
-                    File userDir = _localizer.getLocalUserFileCacheDir(user);
+                    File userDir = 
_localizer.getLocalUserFileCacheDir(topoOwner);
                     if (!_fsOps.fileExists(userDir)) {
                         _fsOps.forceMkdir(userDir);
                     }
-                    List<LocalizedResource> localizedResources = 
_localizer.getBlobs(localResourceList, user, topoName, userDir);
-                    _fsOps.setupBlobPermissions(userDir, user);
+                    List<LocalizedResource> localizedResources = 
_localizer.getBlobs(localResourceList, topoOwner, topoName, userDir);
+                    _fsOps.setupBlobPermissions(userDir, topoOwner);
                     if (!_symlinksDisabled) {
                         for (LocalizedResource localizedResource : 
localizedResources) {
                             String keyName = localizedResource.getKey();
@@ -310,9 +313,9 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
         if (localResource == null) {
             Callable<Void> c;
             if (_isLocalMode) {
-                c = new DownloadBaseBlobsLocal(topologyId);
+                c = new DownloadBaseBlobsLocal(topologyId, 
assignment.get_owner());
             } else {
-                c = new DownloadBaseBlobsDistributed(topologyId);
+                c = new DownloadBaseBlobsDistributed(topologyId, 
assignment.get_owner());
             }
             localResource = new 
LocalDownloadedResource(_execService.submit(c));
             _basicPending.put(topologyId, localResource);
@@ -363,7 +366,7 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
         final String topologyId = assignment.get_topology_id();
         LocalDownloadedResource localResource = _blobPending.get(topologyId);
         if (localResource == null) {
-            Callable<Void> c = new DownloadBlobs(topologyId);
+            Callable<Void> c = new DownloadBlobs(topologyId, 
assignment.get_owner());
             localResource = new 
LocalDownloadedResource(_execService.submit(c));
             _blobPending.put(topologyId, localResource);
         }
@@ -386,7 +389,7 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
             @SuppressWarnings("unchecked")
             Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
             if (blobstoreMap != null) {
-                String user = (String) 
topoConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+                String user = assignment.get_owner();
                 String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
                 
                 for (Map.Entry<String, Map<String, Object>> entry : 
blobstoreMap.entrySet()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java 
b/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
index a0eb4ad..a394e39 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.scheduler;
 
 import java.util.ArrayList;
@@ -37,13 +38,12 @@ import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class TopologyDetails {
-    private String topologyId;
-    private Map topologyConf;
-    private StormTopology topology;
-    private Map<ExecutorDetails, String> executorToComponent;
-    private int numWorkers;
+    private final String topologyId;
+    private final Map topologyConf;
+    private final StormTopology topology;
+    private final Map<ExecutorDetails, String> executorToComponent;
+    private final int numWorkers;
     //<ExecutorDetails - Task, Map<String - Type of resource, Map<String - 
type of that resource, Double - amount>>>
     private Map<ExecutorDetails, Map<String, Double>> resourceList;
     //Max heap size for a worker used by topology
@@ -51,21 +51,23 @@ public class TopologyDetails {
     //topology priority
     private Integer topologyPriority;
     //when topology was launched
-    private int launchTime;
+    private final int launchTime;
+    private final String owner;
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TopologyDetails.class);
 
-    public TopologyDetails(String topologyId, Map topologyConf, StormTopology 
topology, int numWorkers) {
-        this(topologyId, topologyConf, topology,  numWorkers,  null, 0);
+    public TopologyDetails(String topologyId, Map topologyConf, StormTopology 
topology, int numWorkers, String owner) {
+        this(topologyId, topologyConf, topology,  numWorkers,  null, 0, owner);
     }
 
     public TopologyDetails(String topologyId, Map topologyConf, StormTopology 
topology,
-                           int numWorkers, Map<ExecutorDetails, String> 
executorToComponents) {
-        this(topologyId, topologyConf, topology,  numWorkers,  
executorToComponents, 0);
+                           int numWorkers, Map<ExecutorDetails, String> 
executorToComponents, String owner) {
+        this(topologyId, topologyConf, topology,  numWorkers,  
executorToComponents, 0, owner);
     }
 
     public TopologyDetails(String topologyId, Map topologyConf, StormTopology 
topology,
-                           int numWorkers, Map<ExecutorDetails, String> 
executorToComponents, int launchTime) {
+                           int numWorkers, Map<ExecutorDetails, String> 
executorToComponents, int launchTime, String owner) {
+        this.owner = owner;
         this.topologyId = topologyId;
         this.topologyConf = topologyConf;
         this.topology = topology;
@@ -464,12 +466,7 @@ public class TopologyDetails {
      * Get the user that submitted this topology
      */
     public String getTopologySubmitter() {
-        String user = (String) 
this.topologyConf.get(Config.TOPOLOGY_SUBMITTER_USER);
-        if (user == null || user.equals("")) {
-            LOG.debug("Topology {} submitted by anonymous user", 
this.getName());
-            user = System.getProperty("user.name");
-        }
-        return user;
+        return owner;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
 
b/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
index 60a5259..01cfb91 100644
--- 
a/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
+++ 
b/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
@@ -79,7 +79,7 @@ public class MultitenantScheduler implements IScheduler {
     defaultPool.init(cluster, nodeIdToNode);
     
     for (TopologyDetails td: topologies.getTopologies()) {
-      String user = (String)td.getConf().get(Config.TOPOLOGY_SUBMITTER_USER);
+      String user = td.getTopologySubmitter();
       LOG.debug("Found top {} run by user {}",td.getId(), user);
       NodePool pool = userPools.get(user);
       if (pool == null || !pool.canAdd(td)) {

Reply via email to