User improvements
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dbf7b998 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dbf7b998 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dbf7b998 Branch: refs/heads/master Commit: dbf7b998c04efb622f4d6ddc1ad4ae9d382e07a5 Parents: 2aba1c4 Author: Robert Evans <[email protected]> Authored: Fri Jul 21 12:31:23 2017 -0500 Committer: Robert Evans <[email protected]> Committed: Fri Jul 21 12:31:23 2017 -0500 ---------------------------------------------------------------------- .../AbstractHadoopNimbusPluginAutoCreds.java | 14 +- .../apache/storm/hbase/security/AutoHBase.java | 3 +- .../storm/hbase/security/AutoHBaseCommand.java | 5 +- .../storm/hbase/security/AutoHBaseNimbus.java | 16 +- .../apache/storm/hdfs/security/AutoHDFS.java | 3 +- .../storm/hdfs/security/AutoHDFSCommand.java | 5 +- .../storm/hdfs/security/AutoHDFSNimbus.java | 16 +- .../apache/storm/hive/security/AutoHive.java | 3 +- .../storm/hive/security/AutoHiveCommand.java | 5 +- .../storm/hive/security/AutoHiveNimbus.java | 13 +- .../storm/cluster/StormClusterStateImpl.java | 4 + .../storm/daemon/supervisor/AdvancedFSOps.java | 16 +- .../supervisor/ClientSupervisorUtils.java | 8 +- .../storm/daemon/supervisor/IAdvancedFSOps.java | 10 +- .../org/apache/storm/generated/Assignment.java | 114 +++++++++++- .../apache/storm/generated/LocalAssignment.java | 114 +++++++++++- .../org/apache/storm/generated/StormBase.java | 114 +++++++++++- .../apache/storm/scheduler/TopologyDetails.java | 35 ++-- .../storm/security/INimbusCredentialPlugin.java | 25 ++- .../security/auth/ICredentialsRenewer.java | 20 +- .../storm/security/auth/kerberos/AutoTGT.java | 2 +- storm-client/src/py/storm/ttypes.py | 47 ++++- storm-client/src/storm.thrift | 5 + .../storm/security/auth/AuthUtilsTestMock.java | 4 +- .../scheduler/multitenant_scheduler_test.clj | 77 ++++---- .../clj/org/apache/storm/scheduler_test.clj | 10 +- .../test/jvm/org/apache/storm/MockAutoCred.java | 4 +- .../org/apache/storm/daemon/nimbus/Nimbus.java | 61 +++++-- .../storm/daemon/supervisor/BasicContainer.java | 2 +- .../storm/daemon/supervisor/Container.java | 6 +- .../daemon/supervisor/ReadClusterState.java | 3 + .../storm/daemon/supervisor/Supervisor.java | 16 +- .../daemon/supervisor/SupervisorUtils.java | 7 +- .../daemon/supervisor/timer/UpdateBlobs.java | 13 +- .../apache/storm/localizer/AsyncLocalizer.java | 31 ++-- .../multitenant/MultitenantScheduler.java | 2 +- .../java/org/apache/storm/MessagingTest.java | 3 - .../storm/daemon/supervisor/ContainerTest.java | 6 +- .../storm/localizer/AsyncLocalizerTest.java | 8 +- .../resource/TestResourceAwareScheduler.java | 183 ++++++++++--------- .../storm/scheduler/resource/TestUser.java | 7 +- .../TestUtilsForResourceAwareScheduler.java | 32 ++-- .../eviction/TestDefaultEvictionStrategy.java | 118 ++++++------ .../TestDefaultResourceAwareStrategy.java | 8 +- 44 files changed, 819 insertions(+), 379 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java ---------------------------------------------------------------------- diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java index 89ec6f5..6f76b9f 100644 --- a/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java +++ b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java @@ -49,7 +49,7 @@ public abstract class AbstractHadoopNimbusPluginAutoCreds } @Override - public void populateCredentials(Map<String, String> credentials, Map<String, Object> topologyConf) { + public void populateCredentials(Map<String, String> credentials, Map<String, Object> topologyConf, final String topologyOwnerPrincipal) { try { List<String> configKeys = getConfigKeys(topologyConf); if (!configKeys.isEmpty()) { @@ -59,7 +59,7 @@ public abstract class AbstractHadoopNimbusPluginAutoCreds } } else { credentials.put(getCredentialKey(StringUtils.EMPTY), - DatatypeConverter.printBase64Binary(getHadoopCredentials(topologyConf))); + DatatypeConverter.printBase64Binary(getHadoopCredentials(topologyConf, topologyOwnerPrincipal))); } LOG.info("Tokens added to credentials map."); } catch (Exception e) { @@ -68,8 +68,8 @@ public abstract class AbstractHadoopNimbusPluginAutoCreds } @Override - public void renew(Map<String, String> credentials, Map<String, Object> topologyConf) { - doRenew(credentials, topologyConf); + public void renew(Map<String, String> credentials, Map<String, Object> topologyConf, final String topologyOwnerPrincipal) { + doRenew(credentials, topologyConf, topologyOwnerPrincipal); } protected Set<Pair<String, Credentials>> getCredentials(Map<String, String> credentials, @@ -113,11 +113,11 @@ public abstract class AbstractHadoopNimbusPluginAutoCreds */ protected abstract String getConfigKeyString(); - protected abstract byte[] getHadoopCredentials(Map topologyConf, String configKey); + protected abstract byte[] getHadoopCredentials(Map topologyConf, String configKey, final String topologyOwnerPrincipal); - protected abstract byte[] getHadoopCredentials(Map topologyConf); + protected abstract byte[] getHadoopCredentials(Map topologyConf, final String topologyOwnerPrincipal); - protected abstract void doRenew(Map<String, String> credentials, Map topologyConf); + protected abstract void doRenew(Map<String, String> credentials, Map topologyConf, final String topologyOwnerPrincipal); protected List<String> getConfigKeys(Map conf) { String configKeyString = getConfigKeyString(); http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java ---------------------------------------------------------------------- diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java index 0218be5..5c3d0ce 100644 --- a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java +++ b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java @@ -42,4 +42,5 @@ public class AutoHBase extends AbstractHadoopAutoCreds { public String getCredentialKey(String configKey) { return HBASE_CREDENTIALS + configKey; } -} \ No newline at end of file +} + http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseCommand.java ---------------------------------------------------------------------- diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseCommand.java b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseCommand.java index b239816..e9e2a83 100644 --- a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseCommand.java +++ b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseCommand.java @@ -40,7 +40,6 @@ public final class AutoHBaseCommand { @SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { Map conf = new HashMap(); - conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. [email protected] conf.put(HBASE_PRINCIPAL_KEY, args[1]); // hbase principal [email protected] conf.put(HBASE_KEYTAB_FILE_KEY, args[2]); // storm hbase keytab /etc/security/keytabs/storm-hbase.keytab @@ -51,14 +50,14 @@ public final class AutoHBaseCommand { autoHBaseNimbus.prepare(conf); Map<String, String> creds = new HashMap<>(); - autoHBaseNimbus.populateCredentials(creds, conf); + autoHBaseNimbus.populateCredentials(creds, conf, args[0]); //with realm e.g. [email protected] LOG.info("Got HBase credentials" + autoHBase.getCredentials(creds)); Subject s = new Subject(); autoHBase.populateSubject(s, creds); LOG.info("Got a Subject " + s); - autoHBaseNimbus.renew(creds, conf); + autoHBaseNimbus.renew(creds, conf, args[0]); LOG.info("renewed credentials" + autoHBase.getCredentials(creds)); } } http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java ---------------------------------------------------------------------- diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java index ec85135..bd1e03a 100644 --- a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java +++ b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java @@ -66,14 +66,14 @@ public class AutoHBaseNimbus extends AbstractHadoopNimbusPluginAutoCreds { } @Override - protected byte[] getHadoopCredentials(Map conf, String configKey) { + protected byte[] getHadoopCredentials(Map conf, String configKey, final String topologyOwnerPrincipal) { Configuration configuration = getHadoopConfiguration(conf, configKey); - return getHadoopCredentials(conf, configuration); + return getHadoopCredentials(conf, configuration, topologyOwnerPrincipal); } @Override - protected byte[] getHadoopCredentials(Map conf) { - return getHadoopCredentials(conf, HBaseConfiguration.create()); + protected byte[] getHadoopCredentials(Map conf, final String topologyOwnerPrincipal) { + return getHadoopCredentials(conf, HBaseConfiguration.create(), topologyOwnerPrincipal); } private Configuration getHadoopConfiguration(Map topoConf, String configKey) { @@ -83,11 +83,9 @@ public class AutoHBaseNimbus extends AbstractHadoopNimbusPluginAutoCreds { } @SuppressWarnings("unchecked") - protected byte[] getHadoopCredentials(Map conf, Configuration hbaseConf) { + protected byte[] getHadoopCredentials(Map conf, Configuration hbaseConf, final String topologySubmitterUser) { try { if(UserGroupInformation.isSecurityEnabled()) { - final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL); - UserProvider provider = UserProvider.instantiate(hbaseConf); provider.login(HBASE_KEYTAB_FILE_KEY, HBASE_PRINCIPAL_KEY, InetAddress.getLocalHost().getCanonicalHostName()); @@ -130,9 +128,9 @@ public class AutoHBaseNimbus extends AbstractHadoopNimbusPluginAutoCreds { } @Override - public void doRenew(Map<String, String> credentials, Map topologyConf) { + public void doRenew(Map<String, String> credentials, Map topologyConf, final String topologySubmitterUser) { //HBASE tokens are not renewable so we always have to get new ones. - populateCredentials(credentials, topologyConf); + populateCredentials(credentials, topologyConf, topologySubmitterUser); } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java ---------------------------------------------------------------------- diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java index 4c2cb62..1af47d8 100644 --- a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java +++ b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java @@ -42,4 +42,5 @@ public class AutoHDFS extends AbstractHadoopAutoCreds { public String getCredentialKey(String configKey) { return HDFS_CREDENTIALS + configKey; } -} \ No newline at end of file +} + http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSCommand.java ---------------------------------------------------------------------- diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSCommand.java b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSCommand.java index 803e625..5f56d61 100644 --- a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSCommand.java +++ b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSCommand.java @@ -40,7 +40,6 @@ public final class AutoHDFSCommand { @SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { Map conf = new HashMap(); - conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. [email protected] conf.put(STORM_USER_NAME_KEY, args[1]); //with realm e.g. [email protected] conf.put(STORM_KEYTAB_FILE_KEY, args[2]);// /etc/security/keytabs/storm.keytab @@ -50,14 +49,14 @@ public final class AutoHDFSCommand { autoHDFSNimbus.prepare(conf); Map<String,String> creds = new HashMap<>(); - autoHDFSNimbus.populateCredentials(creds, conf); + autoHDFSNimbus.populateCredentials(creds, conf, args[0]); LOG.info("Got HDFS credentials", autoHDFS.getCredentials(creds)); Subject s = new Subject(); autoHDFS.populateSubject(s, creds); LOG.info("Got a Subject "+ s); - autoHDFSNimbus.renew(creds, conf); + autoHDFSNimbus.renew(creds, conf, args[0]); LOG.info("renewed credentials", autoHDFS.getCredentials(creds)); } http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java ---------------------------------------------------------------------- diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java index 78aef12..99b4ba8 100644 --- a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java +++ b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java @@ -74,14 +74,14 @@ public class AutoHDFSNimbus extends AbstractHadoopNimbusPluginAutoCreds { } @Override - protected byte[] getHadoopCredentials(Map conf, String configKey) { + protected byte[] getHadoopCredentials(Map conf, String configKey, final String topologyOwnerPrincipal) { Configuration configuration = getHadoopConfiguration(conf, configKey); - return getHadoopCredentials(conf, configuration); + return getHadoopCredentials(conf, configuration, topologyOwnerPrincipal); } @Override - protected byte[] getHadoopCredentials(Map conf) { - return getHadoopCredentials(conf, new Configuration()); + protected byte[] getHadoopCredentials(Map conf, final String topologyOwnerPrincipal) { + return getHadoopCredentials(conf, new Configuration(), topologyOwnerPrincipal); } private Configuration getHadoopConfiguration(Map topoConf, String configKey) { @@ -91,13 +91,11 @@ public class AutoHDFSNimbus extends AbstractHadoopNimbusPluginAutoCreds { } @SuppressWarnings("unchecked") - private byte[] getHadoopCredentials(Map conf, final Configuration configuration) { + private byte[] getHadoopCredentials(Map conf, final Configuration configuration, final String topologySubmitterUser) { try { if(UserGroupInformation.isSecurityEnabled()) { login(configuration); - final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL); - final URI nameNodeURI = conf.containsKey(TOPOLOGY_HDFS_URI) ? new URI(conf.get(TOPOLOGY_HDFS_URI).toString()) : FileSystem.getDefaultUri(configuration); @@ -147,7 +145,7 @@ public class AutoHDFSNimbus extends AbstractHadoopNimbusPluginAutoCreds { */ @Override @SuppressWarnings("unchecked") - public void doRenew(Map<String, String> credentials, Map topologyConf) { + public void doRenew(Map<String, String> credentials, Map topologyConf, final String topologyOwnerPrincipal) { List<String> confKeys = getConfigKeys(topologyConf); for (Pair<String, Credentials> cred : getCredentials(credentials, confKeys)) { try { @@ -168,7 +166,7 @@ public class AutoHDFSNimbus extends AbstractHadoopNimbusPluginAutoCreds { } catch (Exception e) { LOG.warn("could not renew the credentials, one of the possible reason is tokens are beyond " + "renewal period so attempting to get new tokens.", e); - populateCredentials(credentials, topologyConf); + populateCredentials(credentials, topologyConf, topologyOwnerPrincipal); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java ---------------------------------------------------------------------- diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java index d88f197..6010dd1 100644 --- a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java +++ b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java @@ -44,4 +44,5 @@ public class AutoHive extends AbstractHadoopAutoCreds { return HIVE_CREDENTIALS + configKey; } -} \ No newline at end of file +} + http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java ---------------------------------------------------------------------- diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java index 7aded11..9009a53 100644 --- a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java +++ b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java @@ -41,7 +41,6 @@ public final class AutoHiveCommand { @SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { Map conf = new HashMap(); - conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. [email protected] conf.put(HIVE_PRINCIPAL_KEY, args[1]); // hive principal [email protected] conf.put(HIVE_KEYTAB_FILE_KEY, args[2]); // storm hive keytab /etc/security/keytabs/storm-hive.keytab conf.put(HiveConf.ConfVars.METASTOREURIS.varname, args[3]); // hive.metastore.uris : "thrift://pm-eng1-cluster1.field.hortonworks.com:9083" @@ -52,14 +51,14 @@ public final class AutoHiveCommand { autoHiveNimbus.prepare(conf); Map<String, String> creds = new HashMap<>(); - autoHiveNimbus.populateCredentials(creds, conf); + autoHiveNimbus.populateCredentials(creds, conf, args[0]); LOG.info("Got Hive credentials" + autoHive.getCredentials(creds)); Subject subject = new Subject(); autoHive.populateSubject(subject, creds); LOG.info("Got a Subject " + subject); - autoHiveNimbus.renew(creds, conf); + autoHiveNimbus.renew(creds, conf, args[0]); LOG.info("Renewed credentials" + autoHive.getCredentials(creds)); } http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveNimbus.java ---------------------------------------------------------------------- diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveNimbus.java b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveNimbus.java index 5d76018..b60e15a 100644 --- a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveNimbus.java +++ b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveNimbus.java @@ -79,15 +79,15 @@ public class AutoHiveNimbus extends AbstractHadoopNimbusPluginAutoCreds { } @Override - protected byte[] getHadoopCredentials(Map conf, String configKey) { + protected byte[] getHadoopCredentials(Map conf, String configKey, final String topologyOwnerPrincipal) { Configuration configuration = getHadoopConfiguration(conf, configKey); - return getHadoopCredentials(conf, configuration); + return getHadoopCredentials(conf, configuration, topologyOwnerPrincipal); } @Override - protected byte[] getHadoopCredentials(Map conf) { + protected byte[] getHadoopCredentials(Map conf, final String topologyOwnerPrincipal) { Configuration configuration = new Configuration(); - return getHadoopCredentials(conf, configuration); + return getHadoopCredentials(conf, configuration, topologyOwnerPrincipal); } private Configuration getHadoopConfiguration(Map topoConf, String configKey) { @@ -107,10 +107,9 @@ public class AutoHiveNimbus extends AbstractHadoopNimbusPluginAutoCreds { } @SuppressWarnings("unchecked") - protected byte[] getHadoopCredentials(Map conf, final Configuration configuration) { + protected byte[] getHadoopCredentials(Map conf, final Configuration configuration, final String topologySubmitterUser) { try { if (UserGroupInformation.isSecurityEnabled()) { - String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL); String hiveMetaStoreURI = getMetaStoreURI(configuration); String hiveMetaStorePrincipal = getMetaStorePrincipal(configuration); HiveConf hcatConf = createHiveConf(hiveMetaStoreURI, hiveMetaStorePrincipal); @@ -193,7 +192,7 @@ public class AutoHiveNimbus extends AbstractHadoopNimbusPluginAutoCreds { } @Override - public void doRenew(Map<String, String> credentials, Map topologyConf) { + public void doRenew(Map<String, String> credentials, Map topologyConf, final String topologyOwnerPrincipal) { List<String> configKeys = getConfigKeys(topologyConf); for (Pair<String, Credentials> cred : getCredentials(credentials, configKeys)) { try { http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java index 1655323..1357584 100644 --- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java +++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java @@ -567,6 +567,10 @@ public class StormClusterStateImpl implements IStormClusterState { if (StringUtils.isBlank(newElems.get_owner())) { newElems.set_owner(stormBase.get_owner()); } + if (StringUtils.isBlank(newElems.get_principal()) && stormBase.is_set_principal()) { + newElems.set_principal(stormBase.get_principal()); + } + if (newElems.get_topology_action_options() == null) { newElems.set_topology_action_options(stormBase.get_topology_action_options()); } http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java index ba7f9db..04c4496 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java @@ -101,13 +101,13 @@ public class AdvancedFSOps implements IAdvancedFSOps { } @Override - public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException { - ClientSupervisorUtils.setupStormCodeDir(_conf, topologyConf, path.getCanonicalPath()); + public void setupStormCodeDir(String user, File path) throws IOException { + ClientSupervisorUtils.setupStormCodeDir(_conf, user, path.getCanonicalPath()); } @Override - public void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File path) throws IOException { - ClientSupervisorUtils.setupWorkerArtifactsDir(_conf, topologyConf, path.getCanonicalPath()); + public void setupWorkerArtifactsDir(String user, File path) throws IOException { + ClientSupervisorUtils.setupWorkerArtifactsDir(_conf, user, path.getCanonicalPath()); } } @@ -232,21 +232,21 @@ public class AdvancedFSOps implements IAdvancedFSOps { /** * Setup the permissions for the storm code dir - * @param topologyConf the config of the Topology + * @param user the user that owns the topology * @param path the directory to set the permissions on * @throws IOException on any error */ - public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException { + public void setupStormCodeDir(String user, File path) throws IOException { //By default this is a NOOP } /** * Setup the permissions for the worker artifacts dirs - * @param topologyConf the config of the Topology + * @param user the user that owns the topology * @param path the directory to set the permissions on * @throws IOException on any error */ - public void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File path) throws IOException { + public void setupWorkerArtifactsDir(String user, File path) throws IOException { //By default this is a NOOP } http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java index b898aee..29e75ed 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java @@ -145,23 +145,23 @@ public class ClientSupervisorUtils { return process; } - public static void setupStormCodeDir(Map<String, Object> conf, Map<String, Object> topoConf, String dir) throws IOException { + public static void setupStormCodeDir(Map<String, Object> conf, String user, String dir) throws IOException { if (ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { String logPrefix = "Storm Code Dir Setup for " + dir; List<String> commands = new ArrayList<>(); commands.add("code-dir"); commands.add(dir); - processLauncherAndWait(conf, (String) (topoConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix); + processLauncherAndWait(conf, user, commands, null, logPrefix); } } - public static void setupWorkerArtifactsDir(Map<String, Object> conf, Map<String, Object> topoConf, String dir) throws IOException { + public static void setupWorkerArtifactsDir(Map<String, Object> conf, String user, String dir) throws IOException { if (ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { String logPrefix = "Worker Artifacts Setup for " + dir; List<String> commands = new ArrayList<>(); commands.add("artifacts-dir"); commands.add(dir); - processLauncherAndWait(conf, (String) (topoConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix); + processLauncherAndWait(conf, user, commands, null, logPrefix); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java index e5f5db0..5f23774 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java @@ -83,19 +83,19 @@ public interface IAdvancedFSOps { /** * Setup the permissions for the storm code dir - * @param topologyConf the config of the Topology + * @param user the owner of the topology * @param path the directory to set the permissions on * @throws IOException on any error */ - void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException; + void setupStormCodeDir(String user, File path) throws IOException; /** * Setup the permissions for the worker artifacts dirs - * @param topologyConf the config of the Topology + * @param user the owner of the topology * @param path the directory to set the permissions on * @throws IOException on any error */ - void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File path) throws IOException; + void setupWorkerArtifactsDir(String user, File path) throws IOException; /** * Sanity check if everything the topology needs is there for it to run. @@ -157,7 +157,7 @@ public interface IAdvancedFSOps { /** * Read the contents of a file into a byte array. - * @param localtion the file to read + * @param location the file to read * @return the contents of the file * @throws IOException on any error */ http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/generated/Assignment.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/generated/Assignment.java b/storm-client/src/jvm/org/apache/storm/generated/Assignment.java index 4c973d5..394f934 100644 --- a/storm-client/src/jvm/org/apache/storm/generated/Assignment.java +++ b/storm-client/src/jvm/org/apache/storm/generated/Assignment.java @@ -60,6 +60,7 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen private static final org.apache.thrift.protocol.TField EXECUTOR_NODE_PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("executor_node_port", org.apache.thrift.protocol.TType.MAP, (short)3); private static final org.apache.thrift.protocol.TField EXECUTOR_START_TIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("executor_start_time_secs", org.apache.thrift.protocol.TType.MAP, (short)4); private static final org.apache.thrift.protocol.TField WORKER_RESOURCES_FIELD_DESC = new org.apache.thrift.protocol.TField("worker_resources", org.apache.thrift.protocol.TType.MAP, (short)5); + private static final org.apache.thrift.protocol.TField OWNER_FIELD_DESC = new org.apache.thrift.protocol.TField("owner", org.apache.thrift.protocol.TType.STRING, (short)7); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -72,6 +73,7 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen private Map<List<Long>,NodeInfo> executor_node_port; // optional private Map<List<Long>,Long> executor_start_time_secs; // optional private Map<NodeInfo,WorkerResources> worker_resources; // optional + private String owner; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -79,7 +81,8 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen NODE_HOST((short)2, "node_host"), EXECUTOR_NODE_PORT((short)3, "executor_node_port"), EXECUTOR_START_TIME_SECS((short)4, "executor_start_time_secs"), - WORKER_RESOURCES((short)5, "worker_resources"); + WORKER_RESOURCES((short)5, "worker_resources"), + OWNER((short)7, "owner"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -104,6 +107,8 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen return EXECUTOR_START_TIME_SECS; case 5: // WORKER_RESOURCES return WORKER_RESOURCES; + case 7: // OWNER + return OWNER; default: return null; } @@ -144,7 +149,7 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen } // isset id assignments - private static final _Fields optionals[] = {_Fields.NODE_HOST,_Fields.EXECUTOR_NODE_PORT,_Fields.EXECUTOR_START_TIME_SECS,_Fields.WORKER_RESOURCES}; + private static final _Fields optionals[] = {_Fields.NODE_HOST,_Fields.EXECUTOR_NODE_PORT,_Fields.EXECUTOR_START_TIME_SECS,_Fields.WORKER_RESOURCES,_Fields.OWNER}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -168,6 +173,8 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, NodeInfo.class), new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, WorkerResources.class)))); + tmpMap.put(_Fields.OWNER, new org.apache.thrift.meta_data.FieldMetaData("owner", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Assignment.class, metaDataMap); } @@ -246,6 +253,9 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen } this.worker_resources = __this__worker_resources; } + if (other.is_set_owner()) { + this.owner = other.owner; + } } public Assignment deepCopy() { @@ -263,6 +273,7 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen this.worker_resources = new HashMap<NodeInfo,WorkerResources>(); + this.owner = null; } public String get_master_code_dir() { @@ -424,6 +435,29 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen } } + public String get_owner() { + return this.owner; + } + + public void set_owner(String owner) { + this.owner = owner; + } + + public void unset_owner() { + this.owner = null; + } + + /** Returns true if field owner is set (has been assigned a value) and false otherwise */ + public boolean is_set_owner() { + return this.owner != null; + } + + public void set_owner_isSet(boolean value) { + if (!value) { + this.owner = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case MASTER_CODE_DIR: @@ -466,6 +500,14 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen } break; + case OWNER: + if (value == null) { + unset_owner(); + } else { + set_owner((String)value); + } + break; + } } @@ -486,6 +528,9 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen case WORKER_RESOURCES: return get_worker_resources(); + case OWNER: + return get_owner(); + } throw new IllegalStateException(); } @@ -507,6 +552,8 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen return is_set_executor_start_time_secs(); case WORKER_RESOURCES: return is_set_worker_resources(); + case OWNER: + return is_set_owner(); } throw new IllegalStateException(); } @@ -569,6 +616,15 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen return false; } + boolean this_present_owner = true && this.is_set_owner(); + boolean that_present_owner = true && that.is_set_owner(); + if (this_present_owner || that_present_owner) { + if (!(this_present_owner && that_present_owner)) + return false; + if (!this.owner.equals(that.owner)) + return false; + } + return true; } @@ -601,6 +657,11 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen if (present_worker_resources) list.add(worker_resources); + boolean present_owner = true && (is_set_owner()); + list.add(present_owner); + if (present_owner) + list.add(owner); + return list.hashCode(); } @@ -662,6 +723,16 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen return lastComparison; } } + lastComparison = Boolean.valueOf(is_set_owner()).compareTo(other.is_set_owner()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_owner()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.owner, other.owner); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -729,6 +800,16 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen } first = false; } + if (is_set_owner()) { + if (!first) sb.append(", "); + sb.append("owner:"); + if (this.owner == null) { + sb.append("null"); + } else { + sb.append(this.owner); + } + first = false; + } sb.append(")"); return sb.toString(); } @@ -887,6 +968,14 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 7: // OWNER + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.owner = iprot.readString(); + struct.set_owner_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -979,6 +1068,13 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen oprot.writeFieldEnd(); } } + if (struct.owner != null) { + if (struct.is_set_owner()) { + oprot.writeFieldBegin(OWNER_FIELD_DESC); + oprot.writeString(struct.owner); + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -1010,7 +1106,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen if (struct.is_set_worker_resources()) { optionals.set(3); } - oprot.writeBitSet(optionals, 4); + if (struct.is_set_owner()) { + optionals.set(4); + } + oprot.writeBitSet(optionals, 5); if (struct.is_set_node_host()) { { oprot.writeI32(struct.node_host.size()); @@ -1063,6 +1162,9 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen } } } + if (struct.is_set_owner()) { + oprot.writeString(struct.owner); + } } @Override @@ -1070,7 +1172,7 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen TTupleProtocol iprot = (TTupleProtocol) prot; struct.master_code_dir = iprot.readString(); struct.set_master_code_dir_isSet(true); - BitSet incoming = iprot.readBitSet(4); + BitSet incoming = iprot.readBitSet(5); if (incoming.get(0)) { { org.apache.thrift.protocol.TMap _map652 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32()); @@ -1152,6 +1254,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen } struct.set_worker_resources_isSet(true); } + if (incoming.get(4)) { + struct.owner = iprot.readString(); + struct.set_owner_isSet(true); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/generated/LocalAssignment.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/generated/LocalAssignment.java b/storm-client/src/jvm/org/apache/storm/generated/LocalAssignment.java index b48d342..5071499 100644 --- a/storm-client/src/jvm/org/apache/storm/generated/LocalAssignment.java +++ b/storm-client/src/jvm/org/apache/storm/generated/LocalAssignment.java @@ -58,6 +58,7 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, private static final org.apache.thrift.protocol.TField TOPOLOGY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("topology_id", org.apache.thrift.protocol.TType.STRING, (short)1); private static final org.apache.thrift.protocol.TField EXECUTORS_FIELD_DESC = new org.apache.thrift.protocol.TField("executors", org.apache.thrift.protocol.TType.LIST, (short)2); private static final org.apache.thrift.protocol.TField RESOURCES_FIELD_DESC = new org.apache.thrift.protocol.TField("resources", org.apache.thrift.protocol.TType.STRUCT, (short)3); + private static final org.apache.thrift.protocol.TField OWNER_FIELD_DESC = new org.apache.thrift.protocol.TField("owner", org.apache.thrift.protocol.TType.STRING, (short)5); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -68,12 +69,14 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, private String topology_id; // required private List<ExecutorInfo> executors; // required private WorkerResources resources; // optional + private String owner; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { TOPOLOGY_ID((short)1, "topology_id"), EXECUTORS((short)2, "executors"), - RESOURCES((short)3, "resources"); + RESOURCES((short)3, "resources"), + OWNER((short)5, "owner"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -94,6 +97,8 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, return EXECUTORS; case 3: // RESOURCES return RESOURCES; + case 5: // OWNER + return OWNER; default: return null; } @@ -134,7 +139,7 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, } // isset id assignments - private static final _Fields optionals[] = {_Fields.RESOURCES}; + private static final _Fields optionals[] = {_Fields.RESOURCES,_Fields.OWNER}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -145,6 +150,8 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ExecutorInfo.class)))); tmpMap.put(_Fields.RESOURCES, new org.apache.thrift.meta_data.FieldMetaData("resources", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, WorkerResources.class))); + tmpMap.put(_Fields.OWNER, new org.apache.thrift.meta_data.FieldMetaData("owner", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LocalAssignment.class, metaDataMap); } @@ -178,6 +185,9 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, if (other.is_set_resources()) { this.resources = new WorkerResources(other.resources); } + if (other.is_set_owner()) { + this.owner = other.owner; + } } public LocalAssignment deepCopy() { @@ -189,6 +199,7 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, this.topology_id = null; this.executors = null; this.resources = null; + this.owner = null; } public String get_topology_id() { @@ -275,6 +286,29 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, } } + public String get_owner() { + return this.owner; + } + + public void set_owner(String owner) { + this.owner = owner; + } + + public void unset_owner() { + this.owner = null; + } + + /** Returns true if field owner is set (has been assigned a value) and false otherwise */ + public boolean is_set_owner() { + return this.owner != null; + } + + public void set_owner_isSet(boolean value) { + if (!value) { + this.owner = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case TOPOLOGY_ID: @@ -301,6 +335,14 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, } break; + case OWNER: + if (value == null) { + unset_owner(); + } else { + set_owner((String)value); + } + break; + } } @@ -315,6 +357,9 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, case RESOURCES: return get_resources(); + case OWNER: + return get_owner(); + } throw new IllegalStateException(); } @@ -332,6 +377,8 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, return is_set_executors(); case RESOURCES: return is_set_resources(); + case OWNER: + return is_set_owner(); } throw new IllegalStateException(); } @@ -376,6 +423,15 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, return false; } + boolean this_present_owner = true && this.is_set_owner(); + boolean that_present_owner = true && that.is_set_owner(); + if (this_present_owner || that_present_owner) { + if (!(this_present_owner && that_present_owner)) + return false; + if (!this.owner.equals(that.owner)) + return false; + } + return true; } @@ -398,6 +454,11 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, if (present_resources) list.add(resources); + boolean present_owner = true && (is_set_owner()); + list.add(present_owner); + if (present_owner) + list.add(owner); + return list.hashCode(); } @@ -439,6 +500,16 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, return lastComparison; } } + lastComparison = Boolean.valueOf(is_set_owner()).compareTo(other.is_set_owner()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_owner()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.owner, other.owner); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -484,6 +555,16 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, } first = false; } + if (is_set_owner()) { + if (!first) sb.append(", "); + sb.append("owner:"); + if (this.owner == null) { + sb.append("null"); + } else { + sb.append(this.owner); + } + first = false; + } sb.append(")"); return sb.toString(); } @@ -574,6 +655,14 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 5: // OWNER + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.owner = iprot.readString(); + struct.set_owner_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -611,6 +700,13 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, oprot.writeFieldEnd(); } } + if (struct.owner != null) { + if (struct.is_set_owner()) { + oprot.writeFieldBegin(OWNER_FIELD_DESC); + oprot.writeString(struct.owner); + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -640,10 +736,16 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, if (struct.is_set_resources()) { optionals.set(0); } - oprot.writeBitSet(optionals, 1); + if (struct.is_set_owner()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); if (struct.is_set_resources()) { struct.resources.write(oprot); } + if (struct.is_set_owner()) { + oprot.writeString(struct.owner); + } } @Override @@ -663,12 +765,16 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, } } struct.set_executors_isSet(true); - BitSet incoming = iprot.readBitSet(1); + BitSet incoming = iprot.readBitSet(2); if (incoming.get(0)) { struct.resources = new WorkerResources(); struct.resources.read(iprot); struct.set_resources_isSet(true); } + if (incoming.get(1)) { + struct.owner = iprot.readString(); + struct.set_owner_isSet(true); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/generated/StormBase.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/generated/StormBase.java b/storm-client/src/jvm/org/apache/storm/generated/StormBase.java index 34b2358..e3114fb 100644 --- a/storm-client/src/jvm/org/apache/storm/generated/StormBase.java +++ b/storm-client/src/jvm/org/apache/storm/generated/StormBase.java @@ -64,6 +64,7 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ private static final org.apache.thrift.protocol.TField TOPOLOGY_ACTION_OPTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("topology_action_options", org.apache.thrift.protocol.TType.STRUCT, (short)7); private static final org.apache.thrift.protocol.TField PREV_STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("prev_status", org.apache.thrift.protocol.TType.I32, (short)8); private static final org.apache.thrift.protocol.TField COMPONENT_DEBUG_FIELD_DESC = new org.apache.thrift.protocol.TField("component_debug", org.apache.thrift.protocol.TType.MAP, (short)9); + private static final org.apache.thrift.protocol.TField PRINCIPAL_FIELD_DESC = new org.apache.thrift.protocol.TField("principal", org.apache.thrift.protocol.TType.STRING, (short)10); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -80,6 +81,7 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ private TopologyActionOptions topology_action_options; // optional private TopologyStatus prev_status; // optional private Map<String,DebugOptions> component_debug; // optional + private String principal; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -99,7 +101,8 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ * @see TopologyStatus */ PREV_STATUS((short)8, "prev_status"), - COMPONENT_DEBUG((short)9, "component_debug"); + COMPONENT_DEBUG((short)9, "component_debug"), + PRINCIPAL((short)10, "principal"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -132,6 +135,8 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ return PREV_STATUS; case 9: // COMPONENT_DEBUG return COMPONENT_DEBUG; + case 10: // PRINCIPAL + return PRINCIPAL; default: return null; } @@ -175,7 +180,7 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ private static final int __NUM_WORKERS_ISSET_ID = 0; private static final int __LAUNCH_TIME_SECS_ISSET_ID = 1; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.COMPONENT_EXECUTORS,_Fields.LAUNCH_TIME_SECS,_Fields.OWNER,_Fields.TOPOLOGY_ACTION_OPTIONS,_Fields.PREV_STATUS,_Fields.COMPONENT_DEBUG}; + private static final _Fields optionals[] = {_Fields.COMPONENT_EXECUTORS,_Fields.LAUNCH_TIME_SECS,_Fields.OWNER,_Fields.TOPOLOGY_ACTION_OPTIONS,_Fields.PREV_STATUS,_Fields.COMPONENT_DEBUG,_Fields.PRINCIPAL}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -201,6 +206,8 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, DebugOptions.class)))); + tmpMap.put(_Fields.PRINCIPAL, new org.apache.thrift.meta_data.FieldMetaData("principal", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StormBase.class, metaDataMap); } @@ -261,6 +268,9 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ } this.component_debug = __this__component_debug; } + if (other.is_set_principal()) { + this.principal = other.principal; + } } public StormBase deepCopy() { @@ -280,6 +290,7 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ this.topology_action_options = null; this.prev_status = null; this.component_debug = null; + this.principal = null; } public String get_name() { @@ -525,6 +536,29 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ } } + public String get_principal() { + return this.principal; + } + + public void set_principal(String principal) { + this.principal = principal; + } + + public void unset_principal() { + this.principal = null; + } + + /** Returns true if field principal is set (has been assigned a value) and false otherwise */ + public boolean is_set_principal() { + return this.principal != null; + } + + public void set_principal_isSet(boolean value) { + if (!value) { + this.principal = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case NAME: @@ -599,6 +633,14 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ } break; + case PRINCIPAL: + if (value == null) { + unset_principal(); + } else { + set_principal((String)value); + } + break; + } } @@ -631,6 +673,9 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ case COMPONENT_DEBUG: return get_component_debug(); + case PRINCIPAL: + return get_principal(); + } throw new IllegalStateException(); } @@ -660,6 +705,8 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ return is_set_prev_status(); case COMPONENT_DEBUG: return is_set_component_debug(); + case PRINCIPAL: + return is_set_principal(); } throw new IllegalStateException(); } @@ -758,6 +805,15 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ return false; } + boolean this_present_principal = true && this.is_set_principal(); + boolean that_present_principal = true && that.is_set_principal(); + if (this_present_principal || that_present_principal) { + if (!(this_present_principal && that_present_principal)) + return false; + if (!this.principal.equals(that.principal)) + return false; + } + return true; } @@ -810,6 +866,11 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ if (present_component_debug) list.add(component_debug); + boolean present_principal = true && (is_set_principal()); + list.add(present_principal); + if (present_principal) + list.add(principal); + return list.hashCode(); } @@ -911,6 +972,16 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ return lastComparison; } } + lastComparison = Boolean.valueOf(is_set_principal()).compareTo(other.is_set_principal()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_principal()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.principal, other.principal); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -1006,6 +1077,16 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ } first = false; } + if (is_set_principal()) { + if (!first) sb.append(", "); + sb.append("principal:"); + if (this.principal == null) { + sb.append("null"); + } else { + sb.append(this.principal); + } + first = false; + } sb.append(")"); return sb.toString(); } @@ -1161,6 +1242,14 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 10: // PRINCIPAL + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.principal = iprot.readString(); + struct.set_principal_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1243,6 +1332,13 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ oprot.writeFieldEnd(); } } + if (struct.principal != null) { + if (struct.is_set_principal()) { + oprot.writeFieldBegin(PRINCIPAL_FIELD_DESC); + oprot.writeString(struct.principal); + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -1282,7 +1378,10 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ if (struct.is_set_component_debug()) { optionals.set(5); } - oprot.writeBitSet(optionals, 6); + if (struct.is_set_principal()) { + optionals.set(6); + } + oprot.writeBitSet(optionals, 7); if (struct.is_set_component_executors()) { { oprot.writeI32(struct.component_executors.size()); @@ -1315,6 +1414,9 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ } } } + if (struct.is_set_principal()) { + oprot.writeString(struct.principal); + } } @Override @@ -1326,7 +1428,7 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ struct.set_status_isSet(true); struct.num_workers = iprot.readI32(); struct.set_num_workers_isSet(true); - BitSet incoming = iprot.readBitSet(6); + BitSet incoming = iprot.readBitSet(7); if (incoming.get(0)) { { org.apache.thrift.protocol.TMap _map686 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I32, iprot.readI32()); @@ -1375,6 +1477,10 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ } struct.set_component_debug_isSet(true); } + if (incoming.get(6)) { + struct.principal = iprot.readString(); + struct.set_principal_isSet(true); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/scheduler/TopologyDetails.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/scheduler/TopologyDetails.java b/storm-client/src/jvm/org/apache/storm/scheduler/TopologyDetails.java index c2ddc15..520aa4f 100644 --- a/storm-client/src/jvm/org/apache/storm/scheduler/TopologyDetails.java +++ b/storm-client/src/jvm/org/apache/storm/scheduler/TopologyDetails.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.scheduler; import java.util.ArrayList; @@ -39,13 +40,12 @@ import org.apache.storm.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class TopologyDetails { - private String topologyId; - private Map<String, Object> topologyConf; - private StormTopology topology; - private Map<ExecutorDetails, String> executorToComponent; - private int numWorkers; + private final String topologyId; + private final Map<String, Object> topologyConf; + private final StormTopology topology; + private final Map<ExecutorDetails, String> executorToComponent; + private final int numWorkers; //<ExecutorDetails - Task, Map<String - Type of resource, Map<String - type of that resource, Double - amount>>> private Map<ExecutorDetails, Map<String, Double>> resourceList; //Max heap size for a worker used by topology @@ -53,21 +53,23 @@ public class TopologyDetails { //topology priority private Integer topologyPriority; //when topology was launched - private int launchTime; + private final int launchTime; + private final String owner; private static final Logger LOG = LoggerFactory.getLogger(TopologyDetails.class); - public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology, int numWorkers) { - this(topologyId, topologyConf, topology, numWorkers, null, 0); + public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology, int numWorkers, String owner) { + this(topologyId, topologyConf, topology, numWorkers, null, 0, owner); } public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology, - int numWorkers, Map<ExecutorDetails, String> executorToComponents) { - this(topologyId, topologyConf, topology, numWorkers, executorToComponents, 0); + int numWorkers, Map<ExecutorDetails, String> executorToComponents, String owner) { + this(topologyId, topologyConf, topology, numWorkers, executorToComponents, 0, owner); } - public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology, - int numWorkers, Map<ExecutorDetails, String> executorToComponents, int launchTime) { + public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology, int numWorkers, + Map<ExecutorDetails, String> executorToComponents, int launchTime, String owner) { + this.owner = owner; this.topologyId = topologyId; this.topologyConf = topologyConf; this.topology = topology; @@ -466,12 +468,7 @@ public class TopologyDetails { * Get the user that submitted this topology */ public String getTopologySubmitter() { - String user = (String) this.topologyConf.get(Config.TOPOLOGY_SUBMITTER_USER); - if (user == null || user.equals("")) { - LOG.debug("Topology {} submitted by anonymous user", this.getName()); - user = System.getProperty("user.name"); - } - return user; + return owner; } /** http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java b/storm-client/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java index f23063d..cec005e 100644 --- a/storm-client/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java +++ b/storm-client/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java @@ -20,6 +20,7 @@ package org.apache.storm.security; import org.apache.storm.daemon.Shutdownable; import java.util.Map; +import org.apache.storm.generated.StormTopology; /** * Nimbus auto credential plugin that will be called on nimbus host @@ -29,8 +30,8 @@ import java.util.Map; public interface INimbusCredentialPlugin extends Shutdownable { /** - * this method will be called when nimbus initializes. - * @param conf + * This method will be called when nimbus initializes. + * @param conf the cluster config */ void prepare(Map<String, Object> conf); @@ -41,7 +42,23 @@ public interface INimbusCredentialPlugin extends Shutdownable { * and stored in zookeeper. * @param credentials credentials map where more credentials will be added. * @param topologyConf topology configuration - * @return */ - void populateCredentials(Map<String, String> credentials, Map<String, Object> topologyConf); + @Deprecated + default void populateCredentials(Map<String, String> credentials, Map<String, Object> topologyConf) { + throw new IllegalStateException("One of the populateCredentials methods must be overridden"); + } + + /** + * Method that will be called on nimbus as part of submit topology. This plugin will be called + * at least once during the submit Topology action. It will be not be called during activate instead + * the credentials return by this method will be merged with the other credentials in the topology + * and stored in zookeeper. + * @param credentials credentials map where more credentials will be added. + * @param topoConf topology configuration + * @param topologyOwnerPrincipal the full principal name of the owner of the topology + */ + @SuppressWarnings("deprecation") + default void populateCredentials(Map<String, String> credentials, Map<String, Object> topoConf, final String topologyOwnerPrincipal) { + populateCredentials(credentials, topoConf); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java b/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java index 0f529e6..9662479 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java @@ -29,12 +29,26 @@ public interface ICredentialsRenewer { * Called when initializing the service. * @param conf the storm cluster configuration. */ - public void prepare(Map<String, Object> conf); + void prepare(Map<String, Object> conf); /** * Renew any credentials that need to be renewed. (Update the credentials if needed) * @param credentials the credentials that may have something to renew. * @param topologyConf topology configuration. - */ - public void renew(Map<String, String> credentials, Map<String, Object> topologyConf); + * @param topologyOwnerPrincipal the full principal name of the owner of the topology + */ + @SuppressWarnings("deprecation") + default void renew(Map<String, String> credentials, Map<String, Object> topologyConf, String topologyOwnerPrincipal) { + renew(credentials, topologyConf); + } + + /** + * Renew any credentials that need to be renewed. (Update the credentials if needed) + * @param credentials the credentials that may have something to renew. + * @param topologyConf topology configuration. + */ + @Deprecated + default void renew(Map<String, String> credentials, Map<String, Object> topologyConf) { + throw new IllegalStateException("At least one of the renew methods must be overridden"); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java index 5c9fa75..4a4c52b 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java @@ -202,7 +202,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer { } @Override - public void renew(Map<String,String> credentials, Map<String, Object> topologyConf) { + public void renew(Map<String,String> credentials, Map<String, Object> topologyConf, String topologyOwnerPrincipal) { KerberosTicket tgt = getTGT(credentials); if (tgt != null) { long refreshTime = getRefreshTime(tgt); http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/py/storm/ttypes.py ---------------------------------------------------------------------- diff --git a/storm-client/src/py/storm/ttypes.py b/storm-client/src/py/storm/ttypes.py index 4058f60..21c0dbf 100644 --- a/storm-client/src/py/storm/ttypes.py +++ b/storm-client/src/py/storm/ttypes.py @@ -9169,6 +9169,7 @@ class Assignment: - executor_node_port - executor_start_time_secs - worker_resources + - owner """ thrift_spec = ( @@ -9182,9 +9183,11 @@ class Assignment: }, ), # 4 (5, TType.MAP, 'worker_resources', (TType.STRUCT,(NodeInfo, NodeInfo.thrift_spec),TType.STRUCT,(WorkerResources, WorkerResources.thrift_spec)), { }, ), # 5 + None, # 6 + (7, TType.STRING, 'owner', None, None, ), # 7 ) - def __init__(self, master_code_dir=None, node_host=thrift_spec[2][4], executor_node_port=thrift_spec[3][4], executor_start_time_secs=thrift_spec[4][4], worker_resources=thrift_spec[5][4],): + def __init__(self, master_code_dir=None, node_host=thrift_spec[2][4], executor_node_port=thrift_spec[3][4], executor_start_time_secs=thrift_spec[4][4], worker_resources=thrift_spec[5][4], owner=None,): self.master_code_dir = master_code_dir if node_host is self.thrift_spec[2][4]: node_host = { @@ -9202,6 +9205,7 @@ class Assignment: worker_resources = { } self.worker_resources = worker_resources + self.owner = owner def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -9274,6 +9278,11 @@ class Assignment: iprot.readMapEnd() else: iprot.skip(ftype) + elif fid == 7: + if ftype == TType.STRING: + self.owner = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -9326,6 +9335,10 @@ class Assignment: viter601.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() + if self.owner is not None: + oprot.writeFieldBegin('owner', TType.STRING, 7) + oprot.writeString(self.owner.encode('utf-8')) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -9342,6 +9355,7 @@ class Assignment: value = (value * 31) ^ hash(self.executor_node_port) value = (value * 31) ^ hash(self.executor_start_time_secs) value = (value * 31) ^ hash(self.worker_resources) + value = (value * 31) ^ hash(self.owner) return value def __repr__(self): @@ -9447,6 +9461,7 @@ class StormBase: - topology_action_options - prev_status - component_debug + - principal """ thrift_spec = ( @@ -9460,9 +9475,10 @@ class StormBase: (7, TType.STRUCT, 'topology_action_options', (TopologyActionOptions, TopologyActionOptions.thrift_spec), None, ), # 7 (8, TType.I32, 'prev_status', None, None, ), # 8 (9, TType.MAP, 'component_debug', (TType.STRING,None,TType.STRUCT,(DebugOptions, DebugOptions.thrift_spec)), None, ), # 9 + (10, TType.STRING, 'principal', None, None, ), # 10 ) - def __init__(self, name=None, status=None, num_workers=None, component_executors=None, launch_time_secs=None, owner=None, topology_action_options=None, prev_status=None, component_debug=None,): + def __init__(self, name=None, status=None, num_workers=None, component_executors=None, launch_time_secs=None, owner=None, topology_action_options=None, prev_status=None, component_debug=None, principal=None,): self.name = name self.status = status self.num_workers = num_workers @@ -9472,6 +9488,7 @@ class StormBase: self.topology_action_options = topology_action_options self.prev_status = prev_status self.component_debug = component_debug + self.principal = principal def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -9541,6 +9558,11 @@ class StormBase: iprot.readMapEnd() else: iprot.skip(ftype) + elif fid == 10: + if ftype == TType.STRING: + self.principal = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -9595,6 +9617,10 @@ class StormBase: viter619.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() + if self.principal is not None: + oprot.writeFieldBegin('principal', TType.STRING, 10) + oprot.writeString(self.principal.encode('utf-8')) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -9619,6 +9645,7 @@ class StormBase: value = (value * 31) ^ hash(self.topology_action_options) value = (value * 31) ^ hash(self.prev_status) value = (value * 31) ^ hash(self.component_debug) + value = (value * 31) ^ hash(self.principal) return value def __repr__(self): @@ -9922,6 +9949,7 @@ class LocalAssignment: - topology_id - executors - resources + - owner """ thrift_spec = ( @@ -9929,12 +9957,15 @@ class LocalAssignment: (1, TType.STRING, 'topology_id', None, None, ), # 1 (2, TType.LIST, 'executors', (TType.STRUCT,(ExecutorInfo, ExecutorInfo.thrift_spec)), None, ), # 2 (3, TType.STRUCT, 'resources', (WorkerResources, WorkerResources.thrift_spec), None, ), # 3 + None, # 4 + (5, TType.STRING, 'owner', None, None, ), # 5 ) - def __init__(self, topology_id=None, executors=None, resources=None,): + def __init__(self, topology_id=None, executors=None, resources=None, owner=None,): self.topology_id = topology_id self.executors = executors self.resources = resources + self.owner = owner def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -9967,6 +9998,11 @@ class LocalAssignment: self.resources.read(iprot) else: iprot.skip(ftype) + elif fid == 5: + if ftype == TType.STRING: + self.owner = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -9992,6 +10028,10 @@ class LocalAssignment: oprot.writeFieldBegin('resources', TType.STRUCT, 3) self.resources.write(oprot) oprot.writeFieldEnd() + if self.owner is not None: + oprot.writeFieldBegin('owner', TType.STRING, 5) + oprot.writeString(self.owner.encode('utf-8')) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -10008,6 +10048,7 @@ class LocalAssignment: value = (value * 31) ^ hash(self.topology_id) value = (value * 31) ^ hash(self.executors) value = (value * 31) ^ hash(self.resources) + value = (value * 31) ^ hash(self.owner) return value def __repr__(self): http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/src/storm.thrift ---------------------------------------------------------------------- diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift index 961c3cc..69682f6 100644 --- a/storm-client/src/storm.thrift +++ b/storm-client/src/storm.thrift @@ -476,6 +476,8 @@ struct Assignment { 3: optional map<list<i64>, NodeInfo> executor_node_port = {}; 4: optional map<list<i64>, i64> executor_start_time_secs = {}; 5: optional map<NodeInfo, WorkerResources> worker_resources = {}; + //6: from other pull request + 7: optional string owner; } enum TopologyStatus { @@ -500,6 +502,7 @@ struct StormBase { 7: optional TopologyActionOptions topology_action_options; 8: optional TopologyStatus prev_status;//currently only used during rebalance action. 9: optional map<string, DebugOptions> component_debug; // topology/component level debug option. + 10: optional string principal; } struct ClusterWorkerHeartbeat { @@ -522,6 +525,8 @@ struct LocalAssignment { 1: required string topology_id; 2: required list<ExecutorInfo> executors; 3: optional WorkerResources resources; + //4: other pull request + 5: optional string owner; } struct LSSupervisorId { http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-client/test/jvm/org/apache/storm/security/auth/AuthUtilsTestMock.java ---------------------------------------------------------------------- diff --git a/storm-client/test/jvm/org/apache/storm/security/auth/AuthUtilsTestMock.java b/storm-client/test/jvm/org/apache/storm/security/auth/AuthUtilsTestMock.java index dbff9f2..d1763b1 100644 --- a/storm-client/test/jvm/org/apache/storm/security/auth/AuthUtilsTestMock.java +++ b/storm-client/test/jvm/org/apache/storm/security/auth/AuthUtilsTestMock.java @@ -69,7 +69,7 @@ public class AuthUtilsTestMock implements IAutoCredentials, // ICredentialsRenewer @Override - public void renew(Map<String, String> credentials, Map<String, Object> topologyConf) {} + public void renew(Map<String, String> credentials, Map<String, Object> topologyConf, String ownerPrincipal) {} // IAutoCredentials @Override @@ -85,7 +85,7 @@ public class AuthUtilsTestMock implements IAutoCredentials, // INimbusCredentialPlugin @Override - public void populateCredentials(Map<String,String> credentials, Map<String, Object> conf) {} + public void populateCredentials(Map<String,String> credentials, Map<String, Object> topoConf) {} // Shutdownable via INimbusCredentailPlugin @Override
