User improvements

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dbf7b998
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dbf7b998
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dbf7b998

Branch: refs/heads/master
Commit: dbf7b998c04efb622f4d6ddc1ad4ae9d382e07a5
Parents: 2aba1c4
Author: Robert Evans <[email protected]>
Authored: Fri Jul 21 12:31:23 2017 -0500
Committer: Robert Evans <[email protected]>
Committed: Fri Jul 21 12:31:23 2017 -0500

----------------------------------------------------------------------
 .../AbstractHadoopNimbusPluginAutoCreds.java    |  14 +-
 .../apache/storm/hbase/security/AutoHBase.java  |   3 +-
 .../storm/hbase/security/AutoHBaseCommand.java  |   5 +-
 .../storm/hbase/security/AutoHBaseNimbus.java   |  16 +-
 .../apache/storm/hdfs/security/AutoHDFS.java    |   3 +-
 .../storm/hdfs/security/AutoHDFSCommand.java    |   5 +-
 .../storm/hdfs/security/AutoHDFSNimbus.java     |  16 +-
 .../apache/storm/hive/security/AutoHive.java    |   3 +-
 .../storm/hive/security/AutoHiveCommand.java    |   5 +-
 .../storm/hive/security/AutoHiveNimbus.java     |  13 +-
 .../storm/cluster/StormClusterStateImpl.java    |   4 +
 .../storm/daemon/supervisor/AdvancedFSOps.java  |  16 +-
 .../supervisor/ClientSupervisorUtils.java       |   8 +-
 .../storm/daemon/supervisor/IAdvancedFSOps.java |  10 +-
 .../org/apache/storm/generated/Assignment.java  | 114 +++++++++++-
 .../apache/storm/generated/LocalAssignment.java | 114 +++++++++++-
 .../org/apache/storm/generated/StormBase.java   | 114 +++++++++++-
 .../apache/storm/scheduler/TopologyDetails.java |  35 ++--
 .../storm/security/INimbusCredentialPlugin.java |  25 ++-
 .../security/auth/ICredentialsRenewer.java      |  20 +-
 .../storm/security/auth/kerberos/AutoTGT.java   |   2 +-
 storm-client/src/py/storm/ttypes.py             |  47 ++++-
 storm-client/src/storm.thrift                   |   5 +
 .../storm/security/auth/AuthUtilsTestMock.java  |   4 +-
 .../scheduler/multitenant_scheduler_test.clj    |  77 ++++----
 .../clj/org/apache/storm/scheduler_test.clj     |  10 +-
 .../test/jvm/org/apache/storm/MockAutoCred.java |   4 +-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |  61 +++++--
 .../storm/daemon/supervisor/BasicContainer.java |   2 +-
 .../storm/daemon/supervisor/Container.java      |   6 +-
 .../daemon/supervisor/ReadClusterState.java     |   3 +
 .../storm/daemon/supervisor/Supervisor.java     |  16 +-
 .../daemon/supervisor/SupervisorUtils.java      |   7 +-
 .../daemon/supervisor/timer/UpdateBlobs.java    |  13 +-
 .../apache/storm/localizer/AsyncLocalizer.java  |  31 ++--
 .../multitenant/MultitenantScheduler.java       |   2 +-
 .../java/org/apache/storm/MessagingTest.java    |   3 -
 .../storm/daemon/supervisor/ContainerTest.java  |   6 +-
 .../storm/localizer/AsyncLocalizerTest.java     |   8 +-
 .../resource/TestResourceAwareScheduler.java    | 183 ++++++++++---------
 .../storm/scheduler/resource/TestUser.java      |   7 +-
 .../TestUtilsForResourceAwareScheduler.java     |  32 ++--
 .../eviction/TestDefaultEvictionStrategy.java   | 118 ++++++------
 .../TestDefaultResourceAwareStrategy.java       |   8 +-
 44 files changed, 819 insertions(+), 379 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java
----------------------------------------------------------------------
diff --git 
a/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java
 
b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java
index 89ec6f5..6f76b9f 100644
--- 
a/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java
+++ 
b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java
@@ -49,7 +49,7 @@ public abstract class AbstractHadoopNimbusPluginAutoCreds
     }
 
     @Override
-    public void populateCredentials(Map<String, String> credentials, 
Map<String, Object> topologyConf) {
+    public void populateCredentials(Map<String, String> credentials, 
Map<String, Object> topologyConf, final String topologyOwnerPrincipal) {
         try {
             List<String> configKeys = getConfigKeys(topologyConf);
             if (!configKeys.isEmpty()) {
@@ -59,7 +59,7 @@ public abstract class AbstractHadoopNimbusPluginAutoCreds
                 }
             } else {
                 credentials.put(getCredentialKey(StringUtils.EMPTY),
-                        
DatatypeConverter.printBase64Binary(getHadoopCredentials(topologyConf)));
+                        
DatatypeConverter.printBase64Binary(getHadoopCredentials(topologyConf, 
topologyOwnerPrincipal)));
             }
             LOG.info("Tokens added to credentials map.");
         } catch (Exception e) {
@@ -68,8 +68,8 @@ public abstract class AbstractHadoopNimbusPluginAutoCreds
     }
 
     @Override
-    public void renew(Map<String, String> credentials, Map<String, Object> 
topologyConf) {
-        doRenew(credentials, topologyConf);
+    public void renew(Map<String, String> credentials, Map<String, Object> 
topologyConf, final String topologyOwnerPrincipal) {
+        doRenew(credentials, topologyConf, topologyOwnerPrincipal);
     }
 
     protected Set<Pair<String, Credentials>> getCredentials(Map<String, 
String> credentials,
@@ -113,11 +113,11 @@ public abstract class AbstractHadoopNimbusPluginAutoCreds
      */
     protected abstract String getConfigKeyString();
 
-    protected abstract byte[] getHadoopCredentials(Map topologyConf, String 
configKey);
+    protected abstract byte[] getHadoopCredentials(Map topologyConf, String 
configKey, final String topologyOwnerPrincipal);
 
-    protected abstract byte[] getHadoopCredentials(Map topologyConf);
+    protected abstract byte[] getHadoopCredentials(Map topologyConf, final 
String topologyOwnerPrincipal);
 
-    protected abstract void doRenew(Map<String, String> credentials, Map 
topologyConf);
+    protected abstract void doRenew(Map<String, String> credentials, Map 
topologyConf, final String topologyOwnerPrincipal);
 
     protected List<String> getConfigKeys(Map conf) {
         String configKeyString = getConfigKeyString();

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
----------------------------------------------------------------------
diff --git 
a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
 
b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
index 0218be5..5c3d0ce 100644
--- 
a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
+++ 
b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
@@ -42,4 +42,5 @@ public class AutoHBase extends AbstractHadoopAutoCreds {
     public String getCredentialKey(String configKey) {
         return HBASE_CREDENTIALS + configKey;
     }
-}
\ No newline at end of file
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseCommand.java
----------------------------------------------------------------------
diff --git 
a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseCommand.java
 
b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseCommand.java
index b239816..e9e2a83 100644
--- 
a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseCommand.java
+++ 
b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseCommand.java
@@ -40,7 +40,6 @@ public final class AutoHBaseCommand {
   @SuppressWarnings("unchecked")
   public static void main(String[] args) throws Exception {
     Map conf = new HashMap();
-    conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. 
[email protected]
     conf.put(HBASE_PRINCIPAL_KEY, args[1]); // hbase principal 
[email protected]
     conf.put(HBASE_KEYTAB_FILE_KEY,
         args[2]); // storm hbase keytab 
/etc/security/keytabs/storm-hbase.keytab
@@ -51,14 +50,14 @@ public final class AutoHBaseCommand {
     autoHBaseNimbus.prepare(conf);
 
     Map<String, String> creds = new HashMap<>();
-    autoHBaseNimbus.populateCredentials(creds, conf);
+    autoHBaseNimbus.populateCredentials(creds, conf, args[0]); //with realm 
e.g. [email protected]
     LOG.info("Got HBase credentials" + autoHBase.getCredentials(creds));
 
     Subject s = new Subject();
     autoHBase.populateSubject(s, creds);
     LOG.info("Got a Subject " + s);
 
-    autoHBaseNimbus.renew(creds, conf);
+    autoHBaseNimbus.renew(creds, conf, args[0]);
     LOG.info("renewed credentials" + autoHBase.getCredentials(creds));
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java
----------------------------------------------------------------------
diff --git 
a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java
 
b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java
index ec85135..bd1e03a 100644
--- 
a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java
+++ 
b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java
@@ -66,14 +66,14 @@ public class AutoHBaseNimbus extends 
AbstractHadoopNimbusPluginAutoCreds {
     }
 
     @Override
-    protected  byte[] getHadoopCredentials(Map conf, String configKey) {
+    protected  byte[] getHadoopCredentials(Map conf, String configKey, final 
String topologyOwnerPrincipal) {
         Configuration configuration = getHadoopConfiguration(conf, configKey);
-        return getHadoopCredentials(conf, configuration);
+        return getHadoopCredentials(conf, configuration, 
topologyOwnerPrincipal);
     }
 
     @Override
-    protected byte[] getHadoopCredentials(Map conf) {
-        return getHadoopCredentials(conf, HBaseConfiguration.create());
+    protected byte[] getHadoopCredentials(Map conf, final String 
topologyOwnerPrincipal) {
+        return getHadoopCredentials(conf, HBaseConfiguration.create(), 
topologyOwnerPrincipal);
     }
 
     private Configuration getHadoopConfiguration(Map topoConf, String 
configKey) {
@@ -83,11 +83,9 @@ public class AutoHBaseNimbus extends 
AbstractHadoopNimbusPluginAutoCreds {
     }
 
     @SuppressWarnings("unchecked")
-    protected byte[] getHadoopCredentials(Map conf, Configuration hbaseConf) {
+    protected byte[] getHadoopCredentials(Map conf, Configuration hbaseConf, 
final String topologySubmitterUser) {
         try {
             if(UserGroupInformation.isSecurityEnabled()) {
-                final String topologySubmitterUser = (String) 
conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
-
                 UserProvider provider = UserProvider.instantiate(hbaseConf);
                 provider.login(HBASE_KEYTAB_FILE_KEY, HBASE_PRINCIPAL_KEY, 
InetAddress.getLocalHost().getCanonicalHostName());
 
@@ -130,9 +128,9 @@ public class AutoHBaseNimbus extends 
AbstractHadoopNimbusPluginAutoCreds {
     }
 
     @Override
-    public void doRenew(Map<String, String> credentials, Map topologyConf) {
+    public void doRenew(Map<String, String> credentials, Map topologyConf, 
final String topologySubmitterUser) {
         //HBASE tokens are not renewable so we always have to get new ones.
-        populateCredentials(credentials, topologyConf);
+        populateCredentials(credentials, topologyConf, topologySubmitterUser);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
----------------------------------------------------------------------
diff --git 
a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
 
b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
index 4c2cb62..1af47d8 100644
--- 
a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
+++ 
b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
@@ -42,4 +42,5 @@ public class AutoHDFS extends AbstractHadoopAutoCreds {
     public String getCredentialKey(String configKey) {
         return HDFS_CREDENTIALS + configKey;
     }
-}
\ No newline at end of file
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSCommand.java
----------------------------------------------------------------------
diff --git 
a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSCommand.java
 
b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSCommand.java
index 803e625..5f56d61 100644
--- 
a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSCommand.java
+++ 
b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSCommand.java
@@ -40,7 +40,6 @@ public final class AutoHDFSCommand {
   @SuppressWarnings("unchecked")
   public static void main(String[] args) throws Exception {
     Map conf = new HashMap();
-    conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. 
[email protected]
     conf.put(STORM_USER_NAME_KEY, args[1]); //with realm e.g. [email protected]
     conf.put(STORM_KEYTAB_FILE_KEY, args[2]);// 
/etc/security/keytabs/storm.keytab
 
@@ -50,14 +49,14 @@ public final class AutoHDFSCommand {
     autoHDFSNimbus.prepare(conf);
 
     Map<String,String> creds  = new HashMap<>();
-    autoHDFSNimbus.populateCredentials(creds, conf);
+    autoHDFSNimbus.populateCredentials(creds, conf, args[0]);
     LOG.info("Got HDFS credentials", autoHDFS.getCredentials(creds));
 
     Subject s = new Subject();
     autoHDFS.populateSubject(s, creds);
     LOG.info("Got a Subject "+ s);
 
-    autoHDFSNimbus.renew(creds, conf);
+    autoHDFSNimbus.renew(creds, conf, args[0]);
     LOG.info("renewed credentials", autoHDFS.getCredentials(creds));
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java
----------------------------------------------------------------------
diff --git 
a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java
 
b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java
index 78aef12..99b4ba8 100644
--- 
a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java
+++ 
b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java
@@ -74,14 +74,14 @@ public class AutoHDFSNimbus extends 
AbstractHadoopNimbusPluginAutoCreds {
     }
 
     @Override
-    protected  byte[] getHadoopCredentials(Map conf, String configKey) {
+    protected  byte[] getHadoopCredentials(Map conf, String configKey, final 
String topologyOwnerPrincipal) {
         Configuration configuration = getHadoopConfiguration(conf, configKey);
-        return getHadoopCredentials(conf, configuration);
+        return getHadoopCredentials(conf, configuration, 
topologyOwnerPrincipal);
     }
 
     @Override
-    protected byte[] getHadoopCredentials(Map conf) {
-        return getHadoopCredentials(conf, new Configuration());
+    protected byte[] getHadoopCredentials(Map conf, final String 
topologyOwnerPrincipal) {
+        return getHadoopCredentials(conf, new Configuration(), 
topologyOwnerPrincipal);
     }
 
     private Configuration getHadoopConfiguration(Map topoConf, String 
configKey) {
@@ -91,13 +91,11 @@ public class AutoHDFSNimbus extends 
AbstractHadoopNimbusPluginAutoCreds {
     }
 
     @SuppressWarnings("unchecked")
-    private byte[] getHadoopCredentials(Map conf, final Configuration 
configuration) {
+    private byte[] getHadoopCredentials(Map conf, final Configuration 
configuration, final String topologySubmitterUser) {
         try {
             if(UserGroupInformation.isSecurityEnabled()) {
                 login(configuration);
 
-                final String topologySubmitterUser = (String) 
conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
-
                 final URI nameNodeURI = conf.containsKey(TOPOLOGY_HDFS_URI) ? 
new URI(conf.get(TOPOLOGY_HDFS_URI).toString())
                         : FileSystem.getDefaultUri(configuration);
 
@@ -147,7 +145,7 @@ public class AutoHDFSNimbus extends 
AbstractHadoopNimbusPluginAutoCreds {
      */
     @Override
     @SuppressWarnings("unchecked")
-    public void doRenew(Map<String, String> credentials, Map topologyConf) {
+    public void doRenew(Map<String, String> credentials, Map topologyConf, 
final String topologyOwnerPrincipal) {
         List<String> confKeys = getConfigKeys(topologyConf);
         for (Pair<String, Credentials> cred : getCredentials(credentials, 
confKeys)) {
             try {
@@ -168,7 +166,7 @@ public class AutoHDFSNimbus extends 
AbstractHadoopNimbusPluginAutoCreds {
             } catch (Exception e) {
                 LOG.warn("could not renew the credentials, one of the possible 
reason is tokens are beyond " +
                         "renewal period so attempting to get new tokens.", e);
-                populateCredentials(credentials, topologyConf);
+                populateCredentials(credentials, topologyConf, 
topologyOwnerPrincipal);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java
----------------------------------------------------------------------
diff --git 
a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java
 
b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java
index d88f197..6010dd1 100644
--- 
a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java
+++ 
b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java
@@ -44,4 +44,5 @@ public class AutoHive extends AbstractHadoopAutoCreds {
         return HIVE_CREDENTIALS + configKey;
     }
 
-}
\ No newline at end of file
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java
----------------------------------------------------------------------
diff --git 
a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java
 
b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java
index 7aded11..9009a53 100644
--- 
a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java
+++ 
b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java
@@ -41,7 +41,6 @@ public final class AutoHiveCommand {
   @SuppressWarnings("unchecked")
   public static void main(String[] args) throws Exception {
     Map conf = new HashMap();
-    conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. 
[email protected]
     conf.put(HIVE_PRINCIPAL_KEY, args[1]); // hive principal 
[email protected]
     conf.put(HIVE_KEYTAB_FILE_KEY, args[2]); // storm hive keytab 
/etc/security/keytabs/storm-hive.keytab
     conf.put(HiveConf.ConfVars.METASTOREURIS.varname, args[3]); // 
hive.metastore.uris : "thrift://pm-eng1-cluster1.field.hortonworks.com:9083"
@@ -52,14 +51,14 @@ public final class AutoHiveCommand {
     autoHiveNimbus.prepare(conf);
 
     Map<String, String> creds = new HashMap<>();
-    autoHiveNimbus.populateCredentials(creds, conf);
+    autoHiveNimbus.populateCredentials(creds, conf, args[0]);
     LOG.info("Got Hive credentials" + autoHive.getCredentials(creds));
 
     Subject subject = new Subject();
     autoHive.populateSubject(subject, creds);
     LOG.info("Got a Subject " + subject);
 
-    autoHiveNimbus.renew(creds, conf);
+    autoHiveNimbus.renew(creds, conf, args[0]);
     LOG.info("Renewed credentials" + autoHive.getCredentials(creds));
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveNimbus.java
----------------------------------------------------------------------
diff --git 
a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveNimbus.java
 
b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveNimbus.java
index 5d76018..b60e15a 100644
--- 
a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveNimbus.java
+++ 
b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveNimbus.java
@@ -79,15 +79,15 @@ public class AutoHiveNimbus extends 
AbstractHadoopNimbusPluginAutoCreds {
     }
 
     @Override
-    protected byte[] getHadoopCredentials(Map conf, String configKey) {
+    protected byte[] getHadoopCredentials(Map conf, String configKey, final 
String topologyOwnerPrincipal) {
         Configuration configuration = getHadoopConfiguration(conf, configKey);
-        return getHadoopCredentials(conf, configuration);
+        return getHadoopCredentials(conf, configuration, 
topologyOwnerPrincipal);
     }
 
     @Override
-    protected byte[] getHadoopCredentials(Map conf) {
+    protected byte[] getHadoopCredentials(Map conf, final String 
topologyOwnerPrincipal) {
         Configuration configuration = new Configuration();
-        return getHadoopCredentials(conf, configuration);
+        return getHadoopCredentials(conf, configuration, 
topologyOwnerPrincipal);
     }
 
     private Configuration getHadoopConfiguration(Map topoConf, String 
configKey) {
@@ -107,10 +107,9 @@ public class AutoHiveNimbus extends 
AbstractHadoopNimbusPluginAutoCreds {
     }
 
     @SuppressWarnings("unchecked")
-    protected byte[] getHadoopCredentials(Map conf, final Configuration 
configuration) {
+    protected byte[] getHadoopCredentials(Map conf, final Configuration 
configuration, final String topologySubmitterUser) {
         try {
             if (UserGroupInformation.isSecurityEnabled()) {
-                String topologySubmitterUser = (String) 
conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
                 String hiveMetaStoreURI = getMetaStoreURI(configuration);
                 String hiveMetaStorePrincipal = 
getMetaStorePrincipal(configuration);
                 HiveConf hcatConf = createHiveConf(hiveMetaStoreURI, 
hiveMetaStorePrincipal);
@@ -193,7 +192,7 @@ public class AutoHiveNimbus extends 
AbstractHadoopNimbusPluginAutoCreds {
     }
 
     @Override
-    public void doRenew(Map<String, String> credentials, Map topologyConf) {
+    public void doRenew(Map<String, String> credentials, Map topologyConf, 
final String topologyOwnerPrincipal) {
         List<String> configKeys = getConfigKeys(topologyConf);
         for (Pair<String, Credentials> cred : getCredentials(credentials, 
configKeys)) {
             try {

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java 
b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 1655323..1357584 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -567,6 +567,10 @@ public class StormClusterStateImpl implements 
IStormClusterState {
         if (StringUtils.isBlank(newElems.get_owner())) {
             newElems.set_owner(stormBase.get_owner());
         }
+        if (StringUtils.isBlank(newElems.get_principal()) && 
stormBase.is_set_principal()) {
+            newElems.set_principal(stormBase.get_principal());
+        }
+
         if (newElems.get_topology_action_options() == null) {
             
newElems.set_topology_action_options(stormBase.get_topology_action_options());
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java 
b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
index ba7f9db..04c4496 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
@@ -101,13 +101,13 @@ public class AdvancedFSOps implements IAdvancedFSOps {
         }
         
         @Override
-        public void setupStormCodeDir(Map<String, Object> topologyConf, File 
path) throws IOException {
-            ClientSupervisorUtils.setupStormCodeDir(_conf, topologyConf, 
path.getCanonicalPath());
+        public void setupStormCodeDir(String user, File path) throws 
IOException {
+            ClientSupervisorUtils.setupStormCodeDir(_conf, user, 
path.getCanonicalPath());
         }
 
         @Override
-        public void setupWorkerArtifactsDir(Map<String, Object> topologyConf, 
File path) throws IOException {
-            ClientSupervisorUtils.setupWorkerArtifactsDir(_conf, topologyConf, 
path.getCanonicalPath());
+        public void setupWorkerArtifactsDir(String user, File path) throws 
IOException {
+            ClientSupervisorUtils.setupWorkerArtifactsDir(_conf, user, 
path.getCanonicalPath());
         }
     }
     
@@ -232,21 +232,21 @@ public class AdvancedFSOps implements IAdvancedFSOps {
 
     /**
      * Setup the permissions for the storm code dir
-     * @param topologyConf the config of the Topology
+     * @param user the user that owns the topology
      * @param path the directory to set the permissions on
      * @throws IOException on any error
      */
-    public void setupStormCodeDir(Map<String, Object> topologyConf, File path) 
throws IOException {
+    public void setupStormCodeDir(String user, File path) throws IOException {
         //By default this is a NOOP
     }
 
     /**
      * Setup the permissions for the worker artifacts dirs
-     * @param topologyConf the config of the Topology
+     * @param user the user that owns the topology
      * @param path the directory to set the permissions on
      * @throws IOException on any error
      */
-    public void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File 
path) throws IOException {
+    public void setupWorkerArtifactsDir(String user, File path) throws 
IOException {
         //By default this is a NOOP
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
 
b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
index b898aee..29e75ed 100644
--- 
a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
+++ 
b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
@@ -145,23 +145,23 @@ public class ClientSupervisorUtils {
         return process;
     }
 
-    public static void setupStormCodeDir(Map<String, Object> conf, Map<String, 
Object> topoConf, String dir) throws IOException {
+    public static void setupStormCodeDir(Map<String, Object> conf, String 
user, String dir) throws IOException {
         if 
(ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), 
false)) {
             String logPrefix = "Storm Code Dir Setup for " + dir;
             List<String> commands = new ArrayList<>();
             commands.add("code-dir");
             commands.add(dir);
-            processLauncherAndWait(conf, (String) 
(topoConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+            processLauncherAndWait(conf, user, commands, null, logPrefix);
         }
     }
 
-    public static void setupWorkerArtifactsDir(Map<String, Object> conf, 
Map<String, Object> topoConf, String dir) throws IOException {
+    public static void setupWorkerArtifactsDir(Map<String, Object> conf, 
String user, String dir) throws IOException {
         if 
(ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), 
false)) {
             String logPrefix = "Worker Artifacts Setup for " + dir;
             List<String> commands = new ArrayList<>();
             commands.add("artifacts-dir");
             commands.add(dir);
-            processLauncherAndWait(conf, (String) 
(topoConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+            processLauncherAndWait(conf, user, commands, null, logPrefix);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java 
b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java
index e5f5db0..5f23774 100644
--- 
a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java
+++ 
b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java
@@ -83,19 +83,19 @@ public interface IAdvancedFSOps {
 
     /**
      * Setup the permissions for the storm code dir
-     * @param topologyConf the config of the Topology
+     * @param user the owner of the topology
      * @param path the directory to set the permissions on
      * @throws IOException on any error
      */
-    void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws 
IOException;
+    void setupStormCodeDir(String user, File path) throws IOException;
 
     /**
      * Setup the permissions for the worker artifacts dirs
-     * @param topologyConf the config of the Topology
+     * @param user the owner of the topology
      * @param path the directory to set the permissions on
      * @throws IOException on any error
      */
-    void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File path) 
throws IOException;
+    void setupWorkerArtifactsDir(String user, File path) throws IOException;
 
     /**
      * Sanity check if everything the topology needs is there for it to run.
@@ -157,7 +157,7 @@ public interface IAdvancedFSOps {
 
     /**
      * Read the contents of a file into a byte array.
-     * @param localtion the file to read
+     * @param location the file to read
      * @return the contents of the file
      * @throws IOException on any error
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/generated/Assignment.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/Assignment.java 
b/storm-client/src/jvm/org/apache/storm/generated/Assignment.java
index 4c973d5..394f934 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/Assignment.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/Assignment.java
@@ -60,6 +60,7 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
   private static final org.apache.thrift.protocol.TField 
EXECUTOR_NODE_PORT_FIELD_DESC = new 
org.apache.thrift.protocol.TField("executor_node_port", 
org.apache.thrift.protocol.TType.MAP, (short)3);
   private static final org.apache.thrift.protocol.TField 
EXECUTOR_START_TIME_SECS_FIELD_DESC = new 
org.apache.thrift.protocol.TField("executor_start_time_secs", 
org.apache.thrift.protocol.TType.MAP, (short)4);
   private static final org.apache.thrift.protocol.TField 
WORKER_RESOURCES_FIELD_DESC = new 
org.apache.thrift.protocol.TField("worker_resources", 
org.apache.thrift.protocol.TType.MAP, (short)5);
+  private static final org.apache.thrift.protocol.TField OWNER_FIELD_DESC = 
new org.apache.thrift.protocol.TField("owner", 
org.apache.thrift.protocol.TType.STRING, (short)7);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = 
new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -72,6 +73,7 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
   private Map<List<Long>,NodeInfo> executor_node_port; // optional
   private Map<List<Long>,Long> executor_start_time_secs; // optional
   private Map<NodeInfo,WorkerResources> worker_resources; // optional
+  private String owner; // optional
 
   /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -79,7 +81,8 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
     NODE_HOST((short)2, "node_host"),
     EXECUTOR_NODE_PORT((short)3, "executor_node_port"),
     EXECUTOR_START_TIME_SECS((short)4, "executor_start_time_secs"),
-    WORKER_RESOURCES((short)5, "worker_resources");
+    WORKER_RESOURCES((short)5, "worker_resources"),
+    OWNER((short)7, "owner");
 
     private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
 
@@ -104,6 +107,8 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
           return EXECUTOR_START_TIME_SECS;
         case 5: // WORKER_RESOURCES
           return WORKER_RESOURCES;
+        case 7: // OWNER
+          return OWNER;
         default:
           return null;
       }
@@ -144,7 +149,7 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
   }
 
   // isset id assignments
-  private static final _Fields optionals[] = 
{_Fields.NODE_HOST,_Fields.EXECUTOR_NODE_PORT,_Fields.EXECUTOR_START_TIME_SECS,_Fields.WORKER_RESOURCES};
+  private static final _Fields optionals[] = 
{_Fields.NODE_HOST,_Fields.EXECUTOR_NODE_PORT,_Fields.EXECUTOR_START_TIME_SECS,_Fields.WORKER_RESOURCES,_Fields.OWNER};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> 
metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -168,6 +173,8 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
         new 
org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, 
             new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 NodeInfo.class), 
             new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 WorkerResources.class))));
+    tmpMap.put(_Fields.OWNER, new 
org.apache.thrift.meta_data.FieldMetaData("owner", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Assignment.class,
 metaDataMap);
   }
@@ -246,6 +253,9 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
       }
       this.worker_resources = __this__worker_resources;
     }
+    if (other.is_set_owner()) {
+      this.owner = other.owner;
+    }
   }
 
   public Assignment deepCopy() {
@@ -263,6 +273,7 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
 
     this.worker_resources = new HashMap<NodeInfo,WorkerResources>();
 
+    this.owner = null;
   }
 
   public String get_master_code_dir() {
@@ -424,6 +435,29 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
     }
   }
 
+  public String get_owner() {
+    return this.owner;
+  }
+
+  public void set_owner(String owner) {
+    this.owner = owner;
+  }
+
+  public void unset_owner() {
+    this.owner = null;
+  }
+
+  /** Returns true if field owner is set (has been assigned a value) and false 
otherwise */
+  public boolean is_set_owner() {
+    return this.owner != null;
+  }
+
+  public void set_owner_isSet(boolean value) {
+    if (!value) {
+      this.owner = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case MASTER_CODE_DIR:
@@ -466,6 +500,14 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
       }
       break;
 
+    case OWNER:
+      if (value == null) {
+        unset_owner();
+      } else {
+        set_owner((String)value);
+      }
+      break;
+
     }
   }
 
@@ -486,6 +528,9 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
     case WORKER_RESOURCES:
       return get_worker_resources();
 
+    case OWNER:
+      return get_owner();
+
     }
     throw new IllegalStateException();
   }
@@ -507,6 +552,8 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
       return is_set_executor_start_time_secs();
     case WORKER_RESOURCES:
       return is_set_worker_resources();
+    case OWNER:
+      return is_set_owner();
     }
     throw new IllegalStateException();
   }
@@ -569,6 +616,15 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
         return false;
     }
 
+    boolean this_present_owner = true && this.is_set_owner();
+    boolean that_present_owner = true && that.is_set_owner();
+    if (this_present_owner || that_present_owner) {
+      if (!(this_present_owner && that_present_owner))
+        return false;
+      if (!this.owner.equals(that.owner))
+        return false;
+    }
+
     return true;
   }
 
@@ -601,6 +657,11 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
     if (present_worker_resources)
       list.add(worker_resources);
 
+    boolean present_owner = true && (is_set_owner());
+    list.add(present_owner);
+    if (present_owner)
+      list.add(owner);
+
     return list.hashCode();
   }
 
@@ -662,6 +723,16 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
         return lastComparison;
       }
     }
+    lastComparison = 
Boolean.valueOf(is_set_owner()).compareTo(other.is_set_owner());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_owner()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.owner, 
other.owner);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -729,6 +800,16 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
       }
       first = false;
     }
+    if (is_set_owner()) {
+      if (!first) sb.append(", ");
+      sb.append("owner:");
+      if (this.owner == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.owner);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -887,6 +968,14 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
             }
             break;
+          case 7: // OWNER
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.owner = iprot.readString();
+              struct.set_owner_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
         }
@@ -979,6 +1068,13 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
           oprot.writeFieldEnd();
         }
       }
+      if (struct.owner != null) {
+        if (struct.is_set_owner()) {
+          oprot.writeFieldBegin(OWNER_FIELD_DESC);
+          oprot.writeString(struct.owner);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1010,7 +1106,10 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
       if (struct.is_set_worker_resources()) {
         optionals.set(3);
       }
-      oprot.writeBitSet(optionals, 4);
+      if (struct.is_set_owner()) {
+        optionals.set(4);
+      }
+      oprot.writeBitSet(optionals, 5);
       if (struct.is_set_node_host()) {
         {
           oprot.writeI32(struct.node_host.size());
@@ -1063,6 +1162,9 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
           }
         }
       }
+      if (struct.is_set_owner()) {
+        oprot.writeString(struct.owner);
+      }
     }
 
     @Override
@@ -1070,7 +1172,7 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
       TTupleProtocol iprot = (TTupleProtocol) prot;
       struct.master_code_dir = iprot.readString();
       struct.set_master_code_dir_isSet(true);
-      BitSet incoming = iprot.readBitSet(4);
+      BitSet incoming = iprot.readBitSet(5);
       if (incoming.get(0)) {
         {
           org.apache.thrift.protocol.TMap _map652 = new 
org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, 
org.apache.thrift.protocol.TType.STRING, iprot.readI32());
@@ -1152,6 +1254,10 @@ public class Assignment implements 
org.apache.thrift.TBase<Assignment, Assignmen
         }
         struct.set_worker_resources_isSet(true);
       }
+      if (incoming.get(4)) {
+        struct.owner = iprot.readString();
+        struct.set_owner_isSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/generated/LocalAssignment.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/generated/LocalAssignment.java 
b/storm-client/src/jvm/org/apache/storm/generated/LocalAssignment.java
index b48d342..5071499 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/LocalAssignment.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/LocalAssignment.java
@@ -58,6 +58,7 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
   private static final org.apache.thrift.protocol.TField 
TOPOLOGY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("topology_id", 
org.apache.thrift.protocol.TType.STRING, (short)1);
   private static final org.apache.thrift.protocol.TField EXECUTORS_FIELD_DESC 
= new org.apache.thrift.protocol.TField("executors", 
org.apache.thrift.protocol.TType.LIST, (short)2);
   private static final org.apache.thrift.protocol.TField RESOURCES_FIELD_DESC 
= new org.apache.thrift.protocol.TField("resources", 
org.apache.thrift.protocol.TType.STRUCT, (short)3);
+  private static final org.apache.thrift.protocol.TField OWNER_FIELD_DESC = 
new org.apache.thrift.protocol.TField("owner", 
org.apache.thrift.protocol.TType.STRING, (short)5);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = 
new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -68,12 +69,14 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
   private String topology_id; // required
   private List<ExecutorInfo> executors; // required
   private WorkerResources resources; // optional
+  private String owner; // optional
 
   /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
     TOPOLOGY_ID((short)1, "topology_id"),
     EXECUTORS((short)2, "executors"),
-    RESOURCES((short)3, "resources");
+    RESOURCES((short)3, "resources"),
+    OWNER((short)5, "owner");
 
     private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
 
@@ -94,6 +97,8 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
           return EXECUTORS;
         case 3: // RESOURCES
           return RESOURCES;
+        case 5: // OWNER
+          return OWNER;
         default:
           return null;
       }
@@ -134,7 +139,7 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
   }
 
   // isset id assignments
-  private static final _Fields optionals[] = {_Fields.RESOURCES};
+  private static final _Fields optionals[] = {_Fields.RESOURCES,_Fields.OWNER};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> 
metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -145,6 +150,8 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
             new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 ExecutorInfo.class))));
     tmpMap.put(_Fields.RESOURCES, new 
org.apache.thrift.meta_data.FieldMetaData("resources", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 WorkerResources.class)));
+    tmpMap.put(_Fields.OWNER, new 
org.apache.thrift.meta_data.FieldMetaData("owner", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LocalAssignment.class,
 metaDataMap);
   }
@@ -178,6 +185,9 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
     if (other.is_set_resources()) {
       this.resources = new WorkerResources(other.resources);
     }
+    if (other.is_set_owner()) {
+      this.owner = other.owner;
+    }
   }
 
   public LocalAssignment deepCopy() {
@@ -189,6 +199,7 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
     this.topology_id = null;
     this.executors = null;
     this.resources = null;
+    this.owner = null;
   }
 
   public String get_topology_id() {
@@ -275,6 +286,29 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
     }
   }
 
+  public String get_owner() {
+    return this.owner;
+  }
+
+  public void set_owner(String owner) {
+    this.owner = owner;
+  }
+
+  public void unset_owner() {
+    this.owner = null;
+  }
+
+  /** Returns true if field owner is set (has been assigned a value) and false 
otherwise */
+  public boolean is_set_owner() {
+    return this.owner != null;
+  }
+
+  public void set_owner_isSet(boolean value) {
+    if (!value) {
+      this.owner = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case TOPOLOGY_ID:
@@ -301,6 +335,14 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
       }
       break;
 
+    case OWNER:
+      if (value == null) {
+        unset_owner();
+      } else {
+        set_owner((String)value);
+      }
+      break;
+
     }
   }
 
@@ -315,6 +357,9 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
     case RESOURCES:
       return get_resources();
 
+    case OWNER:
+      return get_owner();
+
     }
     throw new IllegalStateException();
   }
@@ -332,6 +377,8 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
       return is_set_executors();
     case RESOURCES:
       return is_set_resources();
+    case OWNER:
+      return is_set_owner();
     }
     throw new IllegalStateException();
   }
@@ -376,6 +423,15 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
         return false;
     }
 
+    boolean this_present_owner = true && this.is_set_owner();
+    boolean that_present_owner = true && that.is_set_owner();
+    if (this_present_owner || that_present_owner) {
+      if (!(this_present_owner && that_present_owner))
+        return false;
+      if (!this.owner.equals(that.owner))
+        return false;
+    }
+
     return true;
   }
 
@@ -398,6 +454,11 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
     if (present_resources)
       list.add(resources);
 
+    boolean present_owner = true && (is_set_owner());
+    list.add(present_owner);
+    if (present_owner)
+      list.add(owner);
+
     return list.hashCode();
   }
 
@@ -439,6 +500,16 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
         return lastComparison;
       }
     }
+    lastComparison = 
Boolean.valueOf(is_set_owner()).compareTo(other.is_set_owner());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_owner()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.owner, 
other.owner);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -484,6 +555,16 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
       }
       first = false;
     }
+    if (is_set_owner()) {
+      if (!first) sb.append(", ");
+      sb.append("owner:");
+      if (this.owner == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.owner);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -574,6 +655,14 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
             }
             break;
+          case 5: // OWNER
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.owner = iprot.readString();
+              struct.set_owner_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
         }
@@ -611,6 +700,13 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
           oprot.writeFieldEnd();
         }
       }
+      if (struct.owner != null) {
+        if (struct.is_set_owner()) {
+          oprot.writeFieldBegin(OWNER_FIELD_DESC);
+          oprot.writeString(struct.owner);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -640,10 +736,16 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
       if (struct.is_set_resources()) {
         optionals.set(0);
       }
-      oprot.writeBitSet(optionals, 1);
+      if (struct.is_set_owner()) {
+        optionals.set(1);
+      }
+      oprot.writeBitSet(optionals, 2);
       if (struct.is_set_resources()) {
         struct.resources.write(oprot);
       }
+      if (struct.is_set_owner()) {
+        oprot.writeString(struct.owner);
+      }
     }
 
     @Override
@@ -663,12 +765,16 @@ public class LocalAssignment implements 
org.apache.thrift.TBase<LocalAssignment,
         }
       }
       struct.set_executors_isSet(true);
-      BitSet incoming = iprot.readBitSet(1);
+      BitSet incoming = iprot.readBitSet(2);
       if (incoming.get(0)) {
         struct.resources = new WorkerResources();
         struct.resources.read(iprot);
         struct.set_resources_isSet(true);
       }
+      if (incoming.get(1)) {
+        struct.owner = iprot.readString();
+        struct.set_owner_isSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/generated/StormBase.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/StormBase.java 
b/storm-client/src/jvm/org/apache/storm/generated/StormBase.java
index 34b2358..e3114fb 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/StormBase.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/StormBase.java
@@ -64,6 +64,7 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
   private static final org.apache.thrift.protocol.TField 
TOPOLOGY_ACTION_OPTIONS_FIELD_DESC = new 
org.apache.thrift.protocol.TField("topology_action_options", 
org.apache.thrift.protocol.TType.STRUCT, (short)7);
   private static final org.apache.thrift.protocol.TField 
PREV_STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("prev_status", 
org.apache.thrift.protocol.TType.I32, (short)8);
   private static final org.apache.thrift.protocol.TField 
COMPONENT_DEBUG_FIELD_DESC = new 
org.apache.thrift.protocol.TField("component_debug", 
org.apache.thrift.protocol.TType.MAP, (short)9);
+  private static final org.apache.thrift.protocol.TField PRINCIPAL_FIELD_DESC 
= new org.apache.thrift.protocol.TField("principal", 
org.apache.thrift.protocol.TType.STRING, (short)10);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = 
new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -80,6 +81,7 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
   private TopologyActionOptions topology_action_options; // optional
   private TopologyStatus prev_status; // optional
   private Map<String,DebugOptions> component_debug; // optional
+  private String principal; // optional
 
   /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -99,7 +101,8 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
      * @see TopologyStatus
      */
     PREV_STATUS((short)8, "prev_status"),
-    COMPONENT_DEBUG((short)9, "component_debug");
+    COMPONENT_DEBUG((short)9, "component_debug"),
+    PRINCIPAL((short)10, "principal");
 
     private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
 
@@ -132,6 +135,8 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
           return PREV_STATUS;
         case 9: // COMPONENT_DEBUG
           return COMPONENT_DEBUG;
+        case 10: // PRINCIPAL
+          return PRINCIPAL;
         default:
           return null;
       }
@@ -175,7 +180,7 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
   private static final int __NUM_WORKERS_ISSET_ID = 0;
   private static final int __LAUNCH_TIME_SECS_ISSET_ID = 1;
   private byte __isset_bitfield = 0;
-  private static final _Fields optionals[] = 
{_Fields.COMPONENT_EXECUTORS,_Fields.LAUNCH_TIME_SECS,_Fields.OWNER,_Fields.TOPOLOGY_ACTION_OPTIONS,_Fields.PREV_STATUS,_Fields.COMPONENT_DEBUG};
+  private static final _Fields optionals[] = 
{_Fields.COMPONENT_EXECUTORS,_Fields.LAUNCH_TIME_SECS,_Fields.OWNER,_Fields.TOPOLOGY_ACTION_OPTIONS,_Fields.PREV_STATUS,_Fields.COMPONENT_DEBUG,_Fields.PRINCIPAL};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> 
metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -201,6 +206,8 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
         new 
org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, 
             new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING),
 
             new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 DebugOptions.class))));
+    tmpMap.put(_Fields.PRINCIPAL, new 
org.apache.thrift.meta_data.FieldMetaData("principal", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StormBase.class, 
metaDataMap);
   }
@@ -261,6 +268,9 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
       }
       this.component_debug = __this__component_debug;
     }
+    if (other.is_set_principal()) {
+      this.principal = other.principal;
+    }
   }
 
   public StormBase deepCopy() {
@@ -280,6 +290,7 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
     this.topology_action_options = null;
     this.prev_status = null;
     this.component_debug = null;
+    this.principal = null;
   }
 
   public String get_name() {
@@ -525,6 +536,29 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
     }
   }
 
+  public String get_principal() {
+    return this.principal;
+  }
+
+  public void set_principal(String principal) {
+    this.principal = principal;
+  }
+
+  public void unset_principal() {
+    this.principal = null;
+  }
+
+  /** Returns true if field principal is set (has been assigned a value) and 
false otherwise */
+  public boolean is_set_principal() {
+    return this.principal != null;
+  }
+
+  public void set_principal_isSet(boolean value) {
+    if (!value) {
+      this.principal = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case NAME:
@@ -599,6 +633,14 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
       }
       break;
 
+    case PRINCIPAL:
+      if (value == null) {
+        unset_principal();
+      } else {
+        set_principal((String)value);
+      }
+      break;
+
     }
   }
 
@@ -631,6 +673,9 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
     case COMPONENT_DEBUG:
       return get_component_debug();
 
+    case PRINCIPAL:
+      return get_principal();
+
     }
     throw new IllegalStateException();
   }
@@ -660,6 +705,8 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
       return is_set_prev_status();
     case COMPONENT_DEBUG:
       return is_set_component_debug();
+    case PRINCIPAL:
+      return is_set_principal();
     }
     throw new IllegalStateException();
   }
@@ -758,6 +805,15 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
         return false;
     }
 
+    boolean this_present_principal = true && this.is_set_principal();
+    boolean that_present_principal = true && that.is_set_principal();
+    if (this_present_principal || that_present_principal) {
+      if (!(this_present_principal && that_present_principal))
+        return false;
+      if (!this.principal.equals(that.principal))
+        return false;
+    }
+
     return true;
   }
 
@@ -810,6 +866,11 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
     if (present_component_debug)
       list.add(component_debug);
 
+    boolean present_principal = true && (is_set_principal());
+    list.add(present_principal);
+    if (present_principal)
+      list.add(principal);
+
     return list.hashCode();
   }
 
@@ -911,6 +972,16 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
         return lastComparison;
       }
     }
+    lastComparison = 
Boolean.valueOf(is_set_principal()).compareTo(other.is_set_principal());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_principal()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.principal, 
other.principal);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -1006,6 +1077,16 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
       }
       first = false;
     }
+    if (is_set_principal()) {
+      if (!first) sb.append(", ");
+      sb.append("principal:");
+      if (this.principal == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.principal);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -1161,6 +1242,14 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
             }
             break;
+          case 10: // PRINCIPAL
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.principal = iprot.readString();
+              struct.set_principal_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
         }
@@ -1243,6 +1332,13 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
           oprot.writeFieldEnd();
         }
       }
+      if (struct.principal != null) {
+        if (struct.is_set_principal()) {
+          oprot.writeFieldBegin(PRINCIPAL_FIELD_DESC);
+          oprot.writeString(struct.principal);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1282,7 +1378,10 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
       if (struct.is_set_component_debug()) {
         optionals.set(5);
       }
-      oprot.writeBitSet(optionals, 6);
+      if (struct.is_set_principal()) {
+        optionals.set(6);
+      }
+      oprot.writeBitSet(optionals, 7);
       if (struct.is_set_component_executors()) {
         {
           oprot.writeI32(struct.component_executors.size());
@@ -1315,6 +1414,9 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
           }
         }
       }
+      if (struct.is_set_principal()) {
+        oprot.writeString(struct.principal);
+      }
     }
 
     @Override
@@ -1326,7 +1428,7 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
       struct.set_status_isSet(true);
       struct.num_workers = iprot.readI32();
       struct.set_num_workers_isSet(true);
-      BitSet incoming = iprot.readBitSet(6);
+      BitSet incoming = iprot.readBitSet(7);
       if (incoming.get(0)) {
         {
           org.apache.thrift.protocol.TMap _map686 = new 
org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, 
org.apache.thrift.protocol.TType.I32, iprot.readI32());
@@ -1375,6 +1477,10 @@ public class StormBase implements 
org.apache.thrift.TBase<StormBase, StormBase._
         }
         struct.set_component_debug_isSet(true);
       }
+      if (incoming.get(6)) {
+        struct.principal = iprot.readString();
+        struct.set_principal_isSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/scheduler/TopologyDetails.java 
b/storm-client/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
index c2ddc15..520aa4f 100644
--- a/storm-client/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-client/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.scheduler;
 
 import java.util.ArrayList;
@@ -39,13 +40,12 @@ import org.apache.storm.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class TopologyDetails {
-    private String topologyId;
-    private Map<String, Object> topologyConf;
-    private StormTopology topology;
-    private Map<ExecutorDetails, String> executorToComponent;
-    private int numWorkers;
+    private final String topologyId;
+    private final Map<String, Object> topologyConf;
+    private final StormTopology topology;
+    private final Map<ExecutorDetails, String> executorToComponent;
+    private final int numWorkers;
     //<ExecutorDetails - Task, Map<String - Type of resource, Map<String - 
type of that resource, Double - amount>>>
     private Map<ExecutorDetails, Map<String, Double>> resourceList;
     //Max heap size for a worker used by topology
@@ -53,21 +53,23 @@ public class TopologyDetails {
     //topology priority
     private Integer topologyPriority;
     //when topology was launched
-    private int launchTime;
+    private final int launchTime;
+    private final String owner;
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TopologyDetails.class);
 
-    public TopologyDetails(String topologyId, Map<String, Object> 
topologyConf, StormTopology topology, int numWorkers) {
-        this(topologyId, topologyConf, topology,  numWorkers,  null, 0);
+    public TopologyDetails(String topologyId, Map<String, Object> 
topologyConf, StormTopology topology, int numWorkers, String owner) {
+        this(topologyId, topologyConf, topology,  numWorkers,  null, 0, owner);
     }
 
     public TopologyDetails(String topologyId, Map<String, Object> 
topologyConf, StormTopology topology,
-                           int numWorkers, Map<ExecutorDetails, String> 
executorToComponents) {
-        this(topologyId, topologyConf, topology,  numWorkers,  
executorToComponents, 0);
+                           int numWorkers, Map<ExecutorDetails, String> 
executorToComponents, String owner) {
+        this(topologyId, topologyConf, topology,  numWorkers,  
executorToComponents, 0, owner);
     }
 
-    public TopologyDetails(String topologyId, Map<String, Object> 
topologyConf, StormTopology topology,
-                           int numWorkers, Map<ExecutorDetails, String> 
executorToComponents, int launchTime) {
+    public TopologyDetails(String topologyId, Map<String, Object> 
topologyConf, StormTopology topology, int numWorkers,
+                           Map<ExecutorDetails, String> executorToComponents, 
int launchTime, String owner) {
+        this.owner = owner;
         this.topologyId = topologyId;
         this.topologyConf = topologyConf;
         this.topology = topology;
@@ -466,12 +468,7 @@ public class TopologyDetails {
      * Get the user that submitted this topology
      */
     public String getTopologySubmitter() {
-        String user = (String) 
this.topologyConf.get(Config.TOPOLOGY_SUBMITTER_USER);
-        if (user == null || user.equals("")) {
-            LOG.debug("Topology {} submitted by anonymous user", 
this.getName());
-            user = System.getProperty("user.name");
-        }
-        return user;
+        return owner;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java 
b/storm-client/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java
index f23063d..cec005e 100644
--- 
a/storm-client/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java
+++ 
b/storm-client/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java
@@ -20,6 +20,7 @@ package org.apache.storm.security;
 import org.apache.storm.daemon.Shutdownable;
 
 import java.util.Map;
+import org.apache.storm.generated.StormTopology;
 
 /**
  * Nimbus auto credential plugin that will be called on nimbus host
@@ -29,8 +30,8 @@ import java.util.Map;
 public interface INimbusCredentialPlugin extends Shutdownable {
 
     /**
-     * this method will be called when nimbus initializes.
-     * @param conf
+     * This method will be called when nimbus initializes.
+     * @param conf the cluster config
      */
     void prepare(Map<String, Object> conf);
 
@@ -41,7 +42,23 @@ public interface INimbusCredentialPlugin extends 
Shutdownable {
      * and stored in zookeeper.
      * @param credentials credentials map where more credentials will be added.
      * @param topologyConf topology configuration
-     * @return
      */
-    void populateCredentials(Map<String, String> credentials, Map<String, 
Object> topologyConf);
+    @Deprecated
+    default void populateCredentials(Map<String, String> credentials, 
Map<String, Object> topologyConf) {
+        throw new IllegalStateException("One of the populateCredentials 
methods must be overridden");
+    }
+
+    /**
+     * Method that will be called on nimbus as part of submit topology. This 
plugin will be called
+     * at least once during the submit Topology action. It will be not be 
called during activate instead
+     * the credentials return by this method will be merged with the other 
credentials in the topology
+     * and stored in zookeeper.
+     * @param credentials credentials map where more credentials will be added.
+     * @param topoConf topology configuration
+     * @param topologyOwnerPrincipal the full principal name of the owner of 
the topology
+     */
+    @SuppressWarnings("deprecation")
+    default void populateCredentials(Map<String, String> credentials, 
Map<String, Object> topoConf, final String topologyOwnerPrincipal) {
+        populateCredentials(credentials, topoConf);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java 
b/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
index 0f529e6..9662479 100644
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
+++ 
b/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
@@ -29,12 +29,26 @@ public interface ICredentialsRenewer {
     * Called when initializing the service.
     * @param conf the storm cluster configuration.
     */ 
-   public void prepare(Map<String, Object> conf);
+   void prepare(Map<String, Object> conf);
 
     /**
      * Renew any credentials that need to be renewed. (Update the credentials 
if needed)
      * @param credentials the credentials that may have something to renew.
      * @param topologyConf topology configuration.
-     */ 
-    public void renew(Map<String, String> credentials, Map<String, Object> 
topologyConf);
+     * @param topologyOwnerPrincipal the full principal name of the owner of 
the topology
+     */
+    @SuppressWarnings("deprecation")
+    default void renew(Map<String, String> credentials, Map<String, Object> 
topologyConf, String topologyOwnerPrincipal) {
+        renew(credentials, topologyConf);
+    }
+
+    /**
+     * Renew any credentials that need to be renewed. (Update the credentials 
if needed)
+     * @param credentials the credentials that may have something to renew.
+     * @param topologyConf topology configuration.
+     */
+    @Deprecated
+    default void renew(Map<String, String>  credentials, Map<String, Object> 
topologyConf) {
+        throw new IllegalStateException("At least one of the renew methods 
must be overridden");
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java 
b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
index 5c9fa75..4a4c52b 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
@@ -202,7 +202,7 @@ public class AutoTGT implements IAutoCredentials, 
ICredentialsRenewer {
     }
 
     @Override
-    public void renew(Map<String,String> credentials, Map<String, Object> 
topologyConf) {
+    public void renew(Map<String,String> credentials, Map<String, Object> 
topologyConf, String topologyOwnerPrincipal) {
         KerberosTicket tgt = getTGT(credentials);
         if (tgt != null) {
             long refreshTime = getRefreshTime(tgt);

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-client/src/py/storm/ttypes.py 
b/storm-client/src/py/storm/ttypes.py
index 4058f60..21c0dbf 100644
--- a/storm-client/src/py/storm/ttypes.py
+++ b/storm-client/src/py/storm/ttypes.py
@@ -9169,6 +9169,7 @@ class Assignment:
    - executor_node_port
    - executor_start_time_secs
    - worker_resources
+   - owner
   """
 
   thrift_spec = (
@@ -9182,9 +9183,11 @@ class Assignment:
     }, ), # 4
     (5, TType.MAP, 'worker_resources', (TType.STRUCT,(NodeInfo, 
NodeInfo.thrift_spec),TType.STRUCT,(WorkerResources, 
WorkerResources.thrift_spec)), {
     }, ), # 5
+    None, # 6
+    (7, TType.STRING, 'owner', None, None, ), # 7
   )
 
-  def __init__(self, master_code_dir=None, node_host=thrift_spec[2][4], 
executor_node_port=thrift_spec[3][4], 
executor_start_time_secs=thrift_spec[4][4], 
worker_resources=thrift_spec[5][4],):
+  def __init__(self, master_code_dir=None, node_host=thrift_spec[2][4], 
executor_node_port=thrift_spec[3][4], 
executor_start_time_secs=thrift_spec[4][4], worker_resources=thrift_spec[5][4], 
owner=None,):
     self.master_code_dir = master_code_dir
     if node_host is self.thrift_spec[2][4]:
       node_host = {
@@ -9202,6 +9205,7 @@ class Assignment:
       worker_resources = {
     }
     self.worker_resources = worker_resources
+    self.owner = owner
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
@@ -9274,6 +9278,11 @@ class Assignment:
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
+      elif fid == 7:
+        if ftype == TType.STRING:
+          self.owner = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -9326,6 +9335,10 @@ class Assignment:
         viter601.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
+    if self.owner is not None:
+      oprot.writeFieldBegin('owner', TType.STRING, 7)
+      oprot.writeString(self.owner.encode('utf-8'))
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -9342,6 +9355,7 @@ class Assignment:
     value = (value * 31) ^ hash(self.executor_node_port)
     value = (value * 31) ^ hash(self.executor_start_time_secs)
     value = (value * 31) ^ hash(self.worker_resources)
+    value = (value * 31) ^ hash(self.owner)
     return value
 
   def __repr__(self):
@@ -9447,6 +9461,7 @@ class StormBase:
    - topology_action_options
    - prev_status
    - component_debug
+   - principal
   """
 
   thrift_spec = (
@@ -9460,9 +9475,10 @@ class StormBase:
     (7, TType.STRUCT, 'topology_action_options', (TopologyActionOptions, 
TopologyActionOptions.thrift_spec), None, ), # 7
     (8, TType.I32, 'prev_status', None, None, ), # 8
     (9, TType.MAP, 'component_debug', 
(TType.STRING,None,TType.STRUCT,(DebugOptions, DebugOptions.thrift_spec)), 
None, ), # 9
+    (10, TType.STRING, 'principal', None, None, ), # 10
   )
 
-  def __init__(self, name=None, status=None, num_workers=None, 
component_executors=None, launch_time_secs=None, owner=None, 
topology_action_options=None, prev_status=None, component_debug=None,):
+  def __init__(self, name=None, status=None, num_workers=None, 
component_executors=None, launch_time_secs=None, owner=None, 
topology_action_options=None, prev_status=None, component_debug=None, 
principal=None,):
     self.name = name
     self.status = status
     self.num_workers = num_workers
@@ -9472,6 +9488,7 @@ class StormBase:
     self.topology_action_options = topology_action_options
     self.prev_status = prev_status
     self.component_debug = component_debug
+    self.principal = principal
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
@@ -9541,6 +9558,11 @@ class StormBase:
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
+      elif fid == 10:
+        if ftype == TType.STRING:
+          self.principal = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -9595,6 +9617,10 @@ class StormBase:
         viter619.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
+    if self.principal is not None:
+      oprot.writeFieldBegin('principal', TType.STRING, 10)
+      oprot.writeString(self.principal.encode('utf-8'))
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -9619,6 +9645,7 @@ class StormBase:
     value = (value * 31) ^ hash(self.topology_action_options)
     value = (value * 31) ^ hash(self.prev_status)
     value = (value * 31) ^ hash(self.component_debug)
+    value = (value * 31) ^ hash(self.principal)
     return value
 
   def __repr__(self):
@@ -9922,6 +9949,7 @@ class LocalAssignment:
    - topology_id
    - executors
    - resources
+   - owner
   """
 
   thrift_spec = (
@@ -9929,12 +9957,15 @@ class LocalAssignment:
     (1, TType.STRING, 'topology_id', None, None, ), # 1
     (2, TType.LIST, 'executors', (TType.STRUCT,(ExecutorInfo, 
ExecutorInfo.thrift_spec)), None, ), # 2
     (3, TType.STRUCT, 'resources', (WorkerResources, 
WorkerResources.thrift_spec), None, ), # 3
+    None, # 4
+    (5, TType.STRING, 'owner', None, None, ), # 5
   )
 
-  def __init__(self, topology_id=None, executors=None, resources=None,):
+  def __init__(self, topology_id=None, executors=None, resources=None, 
owner=None,):
     self.topology_id = topology_id
     self.executors = executors
     self.resources = resources
+    self.owner = owner
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
@@ -9967,6 +9998,11 @@ class LocalAssignment:
           self.resources.read(iprot)
         else:
           iprot.skip(ftype)
+      elif fid == 5:
+        if ftype == TType.STRING:
+          self.owner = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -9992,6 +10028,10 @@ class LocalAssignment:
       oprot.writeFieldBegin('resources', TType.STRUCT, 3)
       self.resources.write(oprot)
       oprot.writeFieldEnd()
+    if self.owner is not None:
+      oprot.writeFieldBegin('owner', TType.STRING, 5)
+      oprot.writeString(self.owner.encode('utf-8'))
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -10008,6 +10048,7 @@ class LocalAssignment:
     value = (value * 31) ^ hash(self.topology_id)
     value = (value * 31) ^ hash(self.executors)
     value = (value * 31) ^ hash(self.resources)
+    value = (value * 31) ^ hash(self.owner)
     return value
 
   def __repr__(self):

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift
index 961c3cc..69682f6 100644
--- a/storm-client/src/storm.thrift
+++ b/storm-client/src/storm.thrift
@@ -476,6 +476,8 @@ struct Assignment {
     3: optional map<list<i64>, NodeInfo> executor_node_port = {};
     4: optional map<list<i64>, i64> executor_start_time_secs = {};
     5: optional map<NodeInfo, WorkerResources> worker_resources = {};
+    //6: from other pull request
+    7: optional string owner;
 }
 
 enum TopologyStatus {
@@ -500,6 +502,7 @@ struct StormBase {
     7: optional TopologyActionOptions topology_action_options;
     8: optional TopologyStatus prev_status;//currently only used during 
rebalance action.
     9: optional map<string, DebugOptions> component_debug; // 
topology/component level debug option.
+   10: optional string principal;
 }
 
 struct ClusterWorkerHeartbeat {
@@ -522,6 +525,8 @@ struct LocalAssignment {
   1: required string topology_id;
   2: required list<ExecutorInfo> executors;
   3: optional WorkerResources resources;
+  //4: other pull request
+  5: optional string owner;
 }
 
 struct LSSupervisorId {

http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/test/jvm/org/apache/storm/security/auth/AuthUtilsTestMock.java
----------------------------------------------------------------------
diff --git 
a/storm-client/test/jvm/org/apache/storm/security/auth/AuthUtilsTestMock.java 
b/storm-client/test/jvm/org/apache/storm/security/auth/AuthUtilsTestMock.java
index dbff9f2..d1763b1 100644
--- 
a/storm-client/test/jvm/org/apache/storm/security/auth/AuthUtilsTestMock.java
+++ 
b/storm-client/test/jvm/org/apache/storm/security/auth/AuthUtilsTestMock.java
@@ -69,7 +69,7 @@ public class AuthUtilsTestMock implements IAutoCredentials,
 
     // ICredentialsRenewer
     @Override
-    public void renew(Map<String, String> credentials, Map<String, Object> 
topologyConf) {}
+    public void renew(Map<String, String> credentials, Map<String, Object> 
topologyConf, String ownerPrincipal) {}
 
     // IAutoCredentials
     @Override
@@ -85,7 +85,7 @@ public class AuthUtilsTestMock implements IAutoCredentials,
 
     // INimbusCredentialPlugin
     @Override
-    public void populateCredentials(Map<String,String> credentials, 
Map<String, Object> conf) {}
+    public void populateCredentials(Map<String,String> credentials, 
Map<String, Object> topoConf) {}
 
     // Shutdownable via INimbusCredentailPlugin
     @Override

Reply via email to