User improvements
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9980b68d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9980b68d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9980b68d Branch: refs/heads/1.1.x-branch Commit: 9980b68d521dfdf5a83577ac361949293751c618 Parents: d83b0b9 Author: Robert Evans <[email protected]> Authored: Fri Jul 21 13:50:15 2017 -0500 Committer: Robert Evans <[email protected]> Committed: Fri Jul 21 13:50:15 2017 -0500 ---------------------------------------------------------------------- .../backends/trident/TestPlanCompiler.java | 2 +- .../apache/storm/hbase/security/AutoHBase.java | 34 ++-- .../storm/hdfs/common/security/AutoHDFS.java | 40 ++-- .../src/clj/org/apache/storm/converter.clj | 11 +- .../src/clj/org/apache/storm/daemon/common.clj | 4 +- .../src/clj/org/apache/storm/daemon/nimbus.clj | 71 +++++-- .../storm/cluster/StormClusterStateImpl.java | 4 + .../storm/daemon/supervisor/AdvancedFSOps.java | 16 +- .../storm/daemon/supervisor/Container.java | 6 +- .../daemon/supervisor/ReadClusterState.java | 3 + .../storm/daemon/supervisor/Supervisor.java | 16 +- .../daemon/supervisor/SupervisorUtils.java | 11 +- .../daemon/supervisor/timer/UpdateBlobs.java | 17 +- .../org/apache/storm/generated/Assignment.java | 114 +++++++++++- .../apache/storm/generated/LocalAssignment.java | 114 +++++++++++- .../org/apache/storm/generated/StormBase.java | 114 +++++++++++- .../apache/storm/localizer/AsyncLocalizer.java | 37 ++-- .../apache/storm/scheduler/TopologyDetails.java | 33 ++-- .../multitenant/MultitenantScheduler.java | 2 +- .../storm/security/INimbusCredentialPlugin.java | 24 ++- .../security/auth/ICredentialsRenewer.java | 18 +- .../storm/security/auth/kerberos/AutoTGT.java | 6 +- storm-core/src/py/storm/ttypes.py | 47 ++++- storm-core/src/storm.thrift | 5 + .../test/clj/org/apache/storm/cluster_test.clj | 8 +- .../scheduler/multitenant_scheduler_test.clj | 77 ++++---- .../scheduler/resource_aware_scheduler_test.clj | 49 ++--- .../clj/org/apache/storm/scheduler_test.clj | 10 +- .../storm/daemon/supervisor/ContainerTest.java | 6 +- .../storm/localizer/AsyncLocalizerTest.java | 8 +- .../resource/TestResourceAwareScheduler.java | 183 ++++++++++--------- .../storm/scheduler/resource/TestUser.java | 7 +- .../TestUtilsForResourceAwareScheduler.java | 34 ++-- .../eviction/TestDefaultEvictionStrategy.java | 118 ++++++------ .../TestDefaultResourceAwareStrategy.java | 8 +- 35 files changed, 858 insertions(+), 399 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java index 7b936df..1bf6e9c 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java @@ -51,7 +51,7 @@ import java.util.concurrent.Callable; import static org.apache.storm.sql.TestUtils.MockState.getCollectedValues; public class TestPlanCompiler { - private static LocalCluster cluster; + private static ILocalCluster cluster; @BeforeClass public static void staticSetup() throws Exception { http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java index a2ca68e..f20ee02 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java @@ -71,9 +71,9 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus } @Override - public void populateCredentials(Map<String, String> credentials, Map conf) { + public void populateCredentials(Map<String, String> credentials, Map<String, Object> conf, String owner) { try { - credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf))); + credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf, owner))); } catch (Exception e) { LOG.error("Could not populate HBase credentials.", e); } @@ -163,12 +163,10 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus } @SuppressWarnings("unchecked") - protected byte[] getHadoopCredentials(Map conf) { + protected byte[] getHadoopCredentials(Map<String, Object> conf, final String topologyOwnerPrincipal) { try { final Configuration hbaseConf = HBaseConfiguration.create(); if(UserGroupInformation.isSecurityEnabled()) { - final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL); - UserProvider provider = UserProvider.instantiate(hbaseConf); hbaseConf.set(HBASE_KEYTAB_FILE_KEY, hbaseKeytab); @@ -180,7 +178,7 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi); + final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologyOwnerPrincipal, ugi); User user = User.create(ugi); @@ -208,9 +206,9 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus } @Override - public void renew(Map<String, String> credentials, Map topologyConf) { + public void renew(Map<String, String> credentials, Map<String, Object> topologyConf, String ownerPrincipal) { //HBASE tokens are not renewable so we always have to get new ones. - populateCredentials(credentials, topologyConf); + populateCredentials(credentials, topologyConf, ownerPrincipal); } protected String getCredentialKey() { @@ -220,24 +218,34 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus @SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { - Map conf = new HashMap(); - conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. [email protected] + Map<String, Object> conf = new HashMap<>(); + final String topologyOwnerPrincipal = args[0]; //with realm e.g. [email protected] conf.put(HBASE_PRINCIPAL_KEY,args[1]); // hbase principal [email protected] conf.put(HBASE_KEYTAB_FILE_KEY,args[2]); // storm hbase keytab /etc/security/keytabs/storm-hbase.keytab AutoHBase autoHBase = new AutoHBase(); autoHBase.prepare(conf); - Map<String,String> creds = new HashMap<String, String>(); - autoHBase.populateCredentials(creds, conf); + Map<String,String> creds = new HashMap<>(); + autoHBase.populateCredentials(creds, conf, topologyOwnerPrincipal); LOG.info("Got HBase credentials" + autoHBase.getCredentials(creds)); Subject s = new Subject(); autoHBase.populateSubject(s, creds); LOG.info("Got a Subject " + s); - autoHBase.renew(creds, conf); + autoHBase.renew(creds, conf, topologyOwnerPrincipal); LOG.info("renewed credentials" + autoHBase.getCredentials(creds)); } + + @Override + public void populateCredentials(Map<String, String> credentials, Map topoConf) { + throw new IllegalStateException("SHOULD NOT BE CALLED"); + } + + @Override + public void renew(Map<String, String> credentials, Map topologyConf) { + throw new IllegalStateException("SHOULD NOT BE CALLED"); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java index ff3f9cc..5f370b8 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java @@ -18,7 +18,6 @@ package org.apache.storm.hdfs.common.security; -import org.apache.storm.Config; import org.apache.storm.security.INimbusCredentialPlugin; import org.apache.storm.security.auth.IAutoCredentials; import org.apache.storm.security.auth.ICredentialsRenewer; @@ -71,9 +70,9 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC } @Override - public void populateCredentials(Map<String, String> credentials, Map conf) { + public void populateCredentials(Map<String, String> credentials, Map<String, Object> topoConf, String topologyOwnerPrincipal) { try { - credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf))); + credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(topoConf, topologyOwnerPrincipal))); LOG.info("HDFS tokens added to credentials map."); } catch (Exception e) { LOG.error("Could not populate HDFS credentials.", e); @@ -85,8 +84,7 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC credentials.put(HDFS_CREDENTIALS, DatatypeConverter.printBase64Binary("dummy place holder".getBytes())); } - /* - * + /** * @param credentials map with creds. * @return instance of org.apache.hadoop.security.Credentials. * this class's populateCredentials must have been called before. @@ -167,7 +165,7 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC */ @Override @SuppressWarnings("unchecked") - public void renew(Map<String, String> credentials, Map topologyConf) { + public void renew(Map<String, String> credentials,Map<String, Object> topologyConf, String topologyOwnerPrincipal) { try { Credentials credential = getCredentials(credentials); if (credential != null) { @@ -189,26 +187,24 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC } catch (Exception e) { LOG.warn("could not renew the credentials, one of the possible reason is tokens are beyond " + "renewal period so attempting to get new tokens.", e); - populateCredentials(credentials, topologyConf); + populateCredentials(credentials, topologyConf, topologyOwnerPrincipal); } } @SuppressWarnings("unchecked") - protected byte[] getHadoopCredentials(Map conf) { + protected byte[] getHadoopCredentials(Map conf, final String topologyOwnerPrincipal) { try { if(UserGroupInformation.isSecurityEnabled()) { final Configuration configuration = new Configuration(); login(configuration); - final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL); - final URI nameNodeURI = conf.containsKey(TOPOLOGY_HDFS_URI) ? new URI(conf.get(TOPOLOGY_HDFS_URI).toString()) : FileSystem.getDefaultUri(configuration); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi); + final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologyOwnerPrincipal, ugi); Credentials creds = (Credentials) proxyUser.doAs(new PrivilegedAction<Object>() { @Override @@ -218,7 +214,7 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC Credentials credential= proxyUser.getCredentials(); fileSystem.addDelegationTokens(hdfsPrincipal, credential); - LOG.info("Delegation tokens acquired for user {}", topologySubmitterUser); + LOG.info("Delegation tokens acquired for user {}", topologyOwnerPrincipal); return credential; } catch (IOException e) { throw new RuntimeException(e); @@ -257,8 +253,8 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC @SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { - Map conf = new HashMap(); - conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. [email protected] + Map<String, Object> conf = new HashMap(); + final String topologyOwnerPrincipal = args[0]; //with realm e.g. [email protected] conf.put(STORM_USER_NAME_KEY, args[1]); //with realm e.g. [email protected] conf.put(STORM_KEYTAB_FILE_KEY, args[2]);// /etc/security/keytabs/storm.keytab @@ -266,16 +262,26 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC AutoHDFS autoHDFS = new AutoHDFS(); autoHDFS.prepare(conf); - Map<String,String> creds = new HashMap<String, String>(); - autoHDFS.populateCredentials(creds, conf); + Map<String,String> creds = new HashMap<>(); + autoHDFS.populateCredentials(creds, conf, topologyOwnerPrincipal); LOG.info("Got HDFS credentials", autoHDFS.getCredentials(creds)); Subject s = new Subject(); autoHDFS.populateSubject(s, creds); LOG.info("Got a Subject "+ s); - autoHDFS.renew(creds, conf); + autoHDFS.renew(creds, conf, topologyOwnerPrincipal); LOG.info("renewed credentials", autoHDFS.getCredentials(creds)); } + + @Override + public void populateCredentials(Map<String, String> credentials, Map topoConf) { + throw new IllegalStateException("SHOULD NOT BE CALLED"); + } + + @Override + public void renew(Map<String, String> credentials, Map topologyConf) { + throw new IllegalStateException("SHOULD NOT BE CALLED"); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/clj/org/apache/storm/converter.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj index bb2dc87..828d425 100644 --- a/storm-core/src/clj/org/apache/storm/converter.clj +++ b/storm-core/src/clj/org/apache/storm/converter.clj @@ -69,6 +69,8 @@ (.set_mem_off_heap (second resources)) (.set_cpu (last resources)))]) (:worker->resources assignment))))) + (if (:owner assignment) + (.set_owner thrift-assignment (:owner assignment))) thrift-assignment)) (defn clojurify-executor->node_port [executor->node_port] @@ -98,7 +100,8 @@ (clojurify-executor->node_port (into {} (.get_executor_node_port assignment))) (map-key (fn [executor] (into [] executor)) (into {} (.get_executor_start_time_secs assignment))) - (clojurify-worker->resources (into {} (.get_worker_resources assignment)))))) + (clojurify-worker->resources (into {} (.get_worker_resources assignment))) + (.get_owner assignment)))) (defn convert-to-symbol-from-status [status] (condp = status @@ -188,7 +191,8 @@ (.set_owner (:owner storm-base)) (.set_topology_action_options (thriftify-topology-action-options storm-base)) (.set_prev_status (convert-to-status-from-symbol (:prev-status storm-base))) - (.set_component_debug (map-val thriftify-debugoptions (:component->debug storm-base))))) + (.set_component_debug (map-val thriftify-debugoptions (:component->debug storm-base))) + (.set_principal (:principal storm-base)))) (defn clojurify-storm-base [^StormBase storm-base] (if storm-base @@ -201,7 +205,8 @@ (.get_owner storm-base) (clojurify-topology-action-options (.get_topology_action_options storm-base)) (convert-to-symbol-from-status (.get_prev_status storm-base)) - (map-val clojurify-debugoptions (.get_component_debug storm-base))))) + (map-val clojurify-debugoptions (.get_component_debug storm-base)) + (.get_principal storm-base)))) (defn thriftify-stats [stats] (if stats http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/clj/org/apache/storm/daemon/common.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj index db01727..9ee2261 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/common.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj @@ -66,11 +66,11 @@ ;; the task id is the virtual port ;; node->host is here so that tasks know who to talk to just from assignment ;; this avoid situation where node goes down and task doesn't know what to do information-wise -(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs worker->resources]) +(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs worker->resources owner]) ;; component->executors is a map from spout/bolt id to number of executors for that component -(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status component->debug]) +(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status component->debug principal]) (defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version resources-map]) http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 3bd8a17..7607b1b 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -16,6 +16,7 @@ (ns org.apache.storm.daemon.nimbus (:import [org.apache.thrift.server THsHaServer THsHaServer$Args]) (:import [org.apache.storm.generated KeyNotFoundException]) + (:import [org.apache.storm.security INimbusCredentialPlugin]) (:import [org.apache.storm.blobstore LocalFsBlobStore]) (:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory]) (:import [org.apache.thrift.exception]) @@ -540,12 +541,23 @@ (Utils/fromCompressedJsonConf (.readBlob blob-store (master-stormconf-key storm-id) nimbus-subject)))) +(defn fixup-storm-base + [storm-base topo-conf] + (assoc storm-base + :owner (.get topo-conf TOPOLOGY-SUBMITTER-USER) + :principal (.get topo-conf TOPOLOGY-SUBMITTER-PRINCIPAL))) + (defn read-topology-details [nimbus storm-id] (let [blob-store (:blob-store nimbus) storm-base (or (.storm-base (:storm-cluster-state nimbus) storm-id nil) (throw (NotAliveException. storm-id))) topology-conf (read-storm-conf-as-nimbus storm-id blob-store) + storm-base (if (nil? (:principal storm-base)) + (let [new-sb (fixup-storm-base storm-base topology-conf)] + (.update-storm! (:storm-cluster-state nimbus) storm-id new-sb) + new-sb) + storm-base) topology (read-storm-topology-as-nimbus storm-id blob-store) executor->component (->> (compute-executor->component nimbus storm-id) (map-key (fn [[start-task end-task]] @@ -555,7 +567,8 @@ topology (:num-workers storm-base) executor->component - (:launch-time-secs storm-base)))) + (:launch-time-secs storm-base) + (:owner storm-base)))) ;; Does not assume that clocks are synchronized. Executor heartbeat is only used so that ;; nimbus knows when it's received a new heartbeat. All timing is done by nimbus and @@ -959,6 +972,11 @@ (defn- to-worker-slot [[node port]] (WorkerSlot. node port)) +(defn- fixup-assignment + [assignment td] + (assoc assignment + :owner (.getTopologySubmitter td))) + ;; get existing assignment (just the executor->node+port map) -> default to {} ;; filter out ones which have a executor timeout ;; figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many executors should be in each slot (e.g., 4, 4, 4, 5) @@ -970,9 +988,9 @@ storm-cluster-state (:storm-cluster-state nimbus) ^INimbus inimbus (:inimbus nimbus) ;; read all the topologies - topologies (locking (:submit-lock nimbus) (into {} (for [tid (.active-storms storm-cluster-state)] + tds (locking (:submit-lock nimbus) (into {} (for [tid (.active-storms storm-cluster-state)] {tid (read-topology-details nimbus tid)}))) - topologies (Topologies. topologies) + topologies (Topologies. tds) ;; read all the assignments assigned-topology-ids (.assignments storm-cluster-state nil) existing-assignments (into {} (for [tid assigned-topology-ids] @@ -980,7 +998,14 @@ ;; we exclude its assignment, meaning that all the slots occupied by its assignment ;; will be treated as free slot in the scheduler code. (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id)) - {tid (.assignment-info storm-cluster-state tid nil)})))] + (let [assignment (.assignment-info storm-cluster-state tid nil) + td (.get tds tid) + assignment (if (and (not (:owner assignment)) (not (nil? td))) + (let [new-assignment (fixup-assignment assignment td)] + (.set-assignment! storm-cluster-state tid new-assignment) + new-assignment) + assignment)] + {tid assignment}))))] ;; make the new assignments for topologies (locking (:sched-lock nimbus) (let [ new-scheduler-assignments (compute-new-scheduler-assignments @@ -1019,7 +1044,8 @@ (select-keys all-node->host all-nodes) executor->node+port start-times - worker->resources)}))] + worker->resources + (.getTopologySubmitter (.get tds topology-id)))}))] (when (not= new-assignments existing-assignments) (log-debug "RESETTING id->resources and id->worker-resources cache!") @@ -1052,7 +1078,7 @@ (catch Exception e (log-warn-error e "Ignoring exception from Topology action notifier for storm-Id " storm-id)))))) -(defn- start-storm [nimbus storm-name storm-id topology-initial-status] +(defn- start-storm [nimbus storm-name storm-id topology-initial-status owner principal] {:pre [(#{:active :inactive} topology-initial-status)]} (let [storm-cluster-state (:storm-cluster-state nimbus) conf (:conf nimbus) @@ -1068,18 +1094,13 @@ {:type topology-initial-status} (storm-conf TOPOLOGY-WORKERS) num-executors - (storm-conf TOPOLOGY-SUBMITTER-USER) + owner nil nil - {})) + {} + principal)) (notify-topology-action-listener nimbus storm-name "activate"))) -;; Master: -;; job submit: -;; 1. read which nodes are available -;; 2. set assignments -;; 3. start storm - necessary in case master goes down, when goes back up can remember to take down the storm (2 states: on or off) - (defn storm-active? [storm-cluster-state storm-name] (not-nil? (get-storm-id storm-cluster-state storm-name))) @@ -1406,12 +1427,18 @@ (doseq [id assigned-ids] (locking update-lock (let [orig-creds (.credentials storm-cluster-state id nil) + storm-base (.storm-base storm-cluster-state id nil) topology-conf (try-read-storm-conf (:conf nimbus) id blob-store)] (if orig-creds (let [new-creds (HashMap. orig-creds)] (doseq [renewer renewers] (log-message "Renewing Creds For " id " with " renewer) - (.renew renewer new-creds (Collections/unmodifiableMap topology-conf))) + ;;Instead of trying to use reflection to make this work, lets just catch the error + ;; when it does not work + (try + (.renew renewer new-creds (Collections/unmodifiableMap topology-conf) (:principal storm-base)) + (catch clojure.lang.ArityException e + (.renew renewer new-creds (Collections/unmodifiableMap topology-conf))))) (when-not (= orig-creds new-creds) (.set-credentials! storm-cluster-state id new-creds topology-conf) )))))))) @@ -1676,9 +1703,11 @@ submitter-user (.toLocal principal-to-local principal) system-user (System/getProperty "user.name") topo-acl (distinct (remove nil? (conj (.get storm-conf-submitted TOPOLOGY-USERS) submitter-principal, submitter-user))) + submitter-principal (if submitter-principal submitter-principal "") + submitter-user (if submitter-user submitter-user system-user) storm-conf (-> storm-conf-submitted - (assoc TOPOLOGY-SUBMITTER-PRINCIPAL (if submitter-principal submitter-principal "")) - (assoc TOPOLOGY-SUBMITTER-USER (if submitter-user submitter-user system-user)) ;Don't let the user set who we launch as + (assoc TOPOLOGY-SUBMITTER-PRINCIPAL submitter-principal) + (assoc TOPOLOGY-SUBMITTER-USER submitter-user) ;Don't let the user set who we launch as (assoc TOPOLOGY-USERS topo-acl) (assoc STORM-ZOOKEEPER-SUPERACL (.get conf STORM-ZOOKEEPER-SUPERACL))) storm-conf (if (Utils/isZkAuthenticationConfiguredStormServer conf) @@ -1688,7 +1717,11 @@ topology (normalize-topology total-storm-conf topology) storm-cluster-state (:storm-cluster-state nimbus)] (when credentials (doseq [nimbus-autocred-plugin (:nimbus-autocred-plugins nimbus)] - (.populateCredentials nimbus-autocred-plugin credentials (Collections/unmodifiableMap storm-conf)))) + ;;Instead of using reflection just recover from the error it not being there throws + (try + (.populateCredentials ^INimbusCredentialPlugin nimbus-autocred-plugin ^Map credentials ^Map (Collections/unmodifiableMap storm-conf) ^String submitter-principal) + (catch clojure.lang.ArityException e + (.populateCredentials ^INimbusCredentialPlugin nimbus-autocred-plugin ^Map credentials ^Map (Collections/unmodifiableMap storm-conf)))))) (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? submitter-user) (.isEmpty (.trim submitter-user)))) (throw (AuthorizationException. "Could not determine the user to run this topology as."))) (system-topology! total-storm-conf topology) ;; this validates the structure of the topology @@ -1717,7 +1750,7 @@ (notify-topology-action-listener nimbus storm-name "submitTopology") (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive TopologyInitialStatus/ACTIVE :active}] - (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions)))))) + (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions)) submitter-user submitter-principal)))) (catch Throwable e (log-warn-error e "Topology submission exception. (topology name='" storm-name "')") (throw e)))) http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java index 972d778..7f834fb 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java @@ -570,6 +570,10 @@ public class StormClusterStateImpl implements IStormClusterState { if (StringUtils.isBlank(newElems.get_owner())) { newElems.set_owner(stormBase.get_owner()); } + if (StringUtils.isBlank(newElems.get_principal()) && stormBase.is_set_principal()) { + newElems.set_principal(stormBase.get_principal()); + } + if (newElems.get_topology_action_options() == null) { newElems.set_topology_action_options(stormBase.get_topology_action_options()); } http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java index 87d726f..2e0f1ee 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java @@ -102,13 +102,13 @@ public class AdvancedFSOps { } @Override - public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException { - SupervisorUtils.setupStormCodeDir(_conf, topologyConf, path.getCanonicalPath()); + public void setupStormCodeDir(String user, File path) throws IOException { + SupervisorUtils.setupStormCodeDir(_conf, user, path.getCanonicalPath()); } @Override - public void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File path) throws IOException { - SupervisorUtils.setupWorkerArtifactsDir(_conf, topologyConf, path.getCanonicalPath()); + public void setupWorkerArtifactsDir(String user, File path) throws IOException { + SupervisorUtils.setupWorkerArtifactsDir(_conf, user, path.getCanonicalPath()); } } @@ -233,21 +233,21 @@ public class AdvancedFSOps { /** * Setup the permissions for the storm code dir - * @param topologyConf the config of the Topology + * @param user the user that owns the topology * @param path the directory to set the permissions on * @throws IOException on any error */ - public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException { + public void setupStormCodeDir(String user, File path) throws IOException { //By default this is a NOOP } /** * Setup the permissions for the worker artifacts dirs - * @param topologyConf the config of the Topology + * @param user the user that owns the topology * @param path the directory to set the permissions on * @throws IOException on any error */ - public void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File path) throws IOException { + public void setupWorkerArtifactsDir(String user, File path) throws IOException { //By default this is a NOOP } http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java index 859ccf1..345454c 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java @@ -317,7 +317,7 @@ public abstract class Container implements Killable { File workerArtifacts = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port)); if (!_ops.fileExists(workerArtifacts)) { _ops.forceMkdir(workerArtifacts); - _ops.setupWorkerArtifactsDir(_topoConf, workerArtifacts); + _ops.setupWorkerArtifactsDir(_assignment.get_owner(), workerArtifacts); } String user = getWorkerUser(); @@ -462,8 +462,8 @@ public abstract class Container implements Killable { if (_ops.fileExists(file)) { return _ops.slurpString(file).trim(); - } else if (_topoConf != null) { - return (String) _topoConf.get(Config.TOPOLOGY_SUBMITTER_USER); + } else if (_assignment != null && _assignment.is_set_owner()) { + return _assignment.get_owner(); } if (ConfigUtils.isLocalMode(_conf)) { return System.getProperty("user.name"); http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java index 8fefe62..8857ef9 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java @@ -276,6 +276,9 @@ public class ReadClusterState implements Runnable, AutoCloseable { if (slotsResources.containsKey(port)) { localAssignment.set_resources(slotsResources.get(port)); } + if (assignment.is_set_owner()) { + localAssignment.set_owner(assignment.get_owner()); + } portTasks.put(port.intValue(), localAssignment); } List<ExecutorInfo> executorInfoList = localAssignment.get_executors(); http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java index a6adace..386beb5 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java @@ -208,8 +208,20 @@ public class Supervisor implements DaemonCommon, AutoCloseable { this.readState = new ReadClusterState(this); Set<String> downloadedTopoIds = SupervisorUtils.readDownloadedTopologyIds(conf); - for (String topoId : downloadedTopoIds) { - SupervisorUtils.addBlobReferences(localizer, topoId, conf); + Map<Integer, LocalAssignment> portToAssignments = localState.getLocalAssignmentsMap(); + if (portToAssignments != null) { + Map<String, LocalAssignment> assignments = new HashMap<>(); + for (LocalAssignment la : localState.getLocalAssignmentsMap().values()) { + assignments.put(la.get_topology_id(), la); + } + for (String topoId : downloadedTopoIds) { + LocalAssignment la = assignments.get(topoId); + if (la != null) { + SupervisorUtils.addBlobReferences(localizer, topoId, conf, la.get_owner()); + } else { + LOG.warn("Could not find an owner for topo {}", topoId); + } + } } // do this after adding the references so we don't try to clean things being used localizer.startCleaner(); http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java index 19d3b78..ef4b54d 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java @@ -94,23 +94,23 @@ public class SupervisorUtils { return ret; } - public static void setupStormCodeDir(Map<String, Object> conf, Map<String, Object> stormConf, String dir) throws IOException { + public static void setupStormCodeDir(Map<String, Object> conf, String user, String dir) throws IOException { if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { String logPrefix = "Storm Code Dir Setup for " + dir; List<String> commands = new ArrayList<>(); commands.add("code-dir"); commands.add(dir); - processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix); + processLauncherAndWait(conf, user, commands, null, logPrefix); } } - public static void setupWorkerArtifactsDir(Map<String, Object> conf, Map<String, Object> stormConf, String dir) throws IOException { + public static void setupWorkerArtifactsDir(Map<String, Object> conf, String user, String dir) throws IOException { if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { String logPrefix = "Worker Artifacts Setup for " + dir; List<String> commands = new ArrayList<>(); commands.add("artifacts-dir"); commands.add(dir); - processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix); + processLauncherAndWait(conf, user, commands, null, logPrefix); } } @@ -161,10 +161,9 @@ public class SupervisorUtils { * @param stormId * @param conf */ - static void addBlobReferences(Localizer localizer, String stormId, Map<String, Object> conf) throws IOException { + static void addBlobReferences(Localizer localizer, String stormId, Map<String, Object> conf, String user) throws IOException { Map<String, Object> stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); - String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME); List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); if (blobstoreMap != null) { http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java index 0b6d996..746578e 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java @@ -17,6 +17,7 @@ */ package org.apache.storm.daemon.supervisor.timer; +import java.util.HashMap; import org.apache.storm.Config; import org.apache.storm.daemon.supervisor.Supervisor; import org.apache.storm.daemon.supervisor.SupervisorUtils; @@ -59,15 +60,16 @@ public class UpdateBlobs implements Runnable { Map<String, Object> conf = supervisor.getConf(); Set<String> downloadedStormIds = SupervisorUtils.readDownloadedTopologyIds(conf); AtomicReference<Map<Long, LocalAssignment>> newAssignment = supervisor.getCurrAssignment(); - Set<String> assignedStormIds = new HashSet<>(); + Map<String, LocalAssignment> assignedStormIds = new HashMap<>(); for (LocalAssignment localAssignment : newAssignment.get().values()) { - assignedStormIds.add(localAssignment.get_topology_id()); + assignedStormIds.put(localAssignment.get_topology_id(), localAssignment); } for (String stormId : downloadedStormIds) { - if (assignedStormIds.contains(stormId)) { + LocalAssignment la = assignedStormIds.get(stormId); + if (la != null) { String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot); - updateBlobsForTopology(conf, stormId, supervisor.getLocalizer()); + updateBlobsForTopology(conf, stormId, supervisor.getLocalizer(), la.get_owner()); } } } catch (Exception e) { @@ -89,10 +91,9 @@ public class UpdateBlobs implements Runnable { * @param localizer * @throws IOException */ - private void updateBlobsForTopology(Map conf, String stormId, Localizer localizer) throws IOException { - Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); - Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); - String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + private void updateBlobsForTopology(Map<String, Object> conf, String stormId, Localizer localizer, String user) throws IOException { + Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); try { localizer.updateBlobs(localresources, user); http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/generated/Assignment.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/generated/Assignment.java b/storm-core/src/jvm/org/apache/storm/generated/Assignment.java index 4c973d5..394f934 100644 --- a/storm-core/src/jvm/org/apache/storm/generated/Assignment.java +++ b/storm-core/src/jvm/org/apache/storm/generated/Assignment.java @@ -60,6 +60,7 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen private static final org.apache.thrift.protocol.TField EXECUTOR_NODE_PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("executor_node_port", org.apache.thrift.protocol.TType.MAP, (short)3); private static final org.apache.thrift.protocol.TField EXECUTOR_START_TIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("executor_start_time_secs", org.apache.thrift.protocol.TType.MAP, (short)4); private static final org.apache.thrift.protocol.TField WORKER_RESOURCES_FIELD_DESC = new org.apache.thrift.protocol.TField("worker_resources", org.apache.thrift.protocol.TType.MAP, (short)5); + private static final org.apache.thrift.protocol.TField OWNER_FIELD_DESC = new org.apache.thrift.protocol.TField("owner", org.apache.thrift.protocol.TType.STRING, (short)7); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -72,6 +73,7 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen private Map<List<Long>,NodeInfo> executor_node_port; // optional private Map<List<Long>,Long> executor_start_time_secs; // optional private Map<NodeInfo,WorkerResources> worker_resources; // optional + private String owner; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -79,7 +81,8 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen NODE_HOST((short)2, "node_host"), EXECUTOR_NODE_PORT((short)3, "executor_node_port"), EXECUTOR_START_TIME_SECS((short)4, "executor_start_time_secs"), - WORKER_RESOURCES((short)5, "worker_resources"); + WORKER_RESOURCES((short)5, "worker_resources"), + OWNER((short)7, "owner"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -104,6 +107,8 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen return EXECUTOR_START_TIME_SECS; case 5: // WORKER_RESOURCES return WORKER_RESOURCES; + case 7: // OWNER + return OWNER; default: return null; } @@ -144,7 +149,7 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen } // isset id assignments - private static final _Fields optionals[] = {_Fields.NODE_HOST,_Fields.EXECUTOR_NODE_PORT,_Fields.EXECUTOR_START_TIME_SECS,_Fields.WORKER_RESOURCES}; + private static final _Fields optionals[] = {_Fields.NODE_HOST,_Fields.EXECUTOR_NODE_PORT,_Fields.EXECUTOR_START_TIME_SECS,_Fields.WORKER_RESOURCES,_Fields.OWNER}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -168,6 +173,8 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, NodeInfo.class), new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, WorkerResources.class)))); + tmpMap.put(_Fields.OWNER, new org.apache.thrift.meta_data.FieldMetaData("owner", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Assignment.class, metaDataMap); } @@ -246,6 +253,9 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen } this.worker_resources = __this__worker_resources; } + if (other.is_set_owner()) { + this.owner = other.owner; + } } public Assignment deepCopy() { @@ -263,6 +273,7 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen this.worker_resources = new HashMap<NodeInfo,WorkerResources>(); + this.owner = null; } public String get_master_code_dir() { @@ -424,6 +435,29 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen } } + public String get_owner() { + return this.owner; + } + + public void set_owner(String owner) { + this.owner = owner; + } + + public void unset_owner() { + this.owner = null; + } + + /** Returns true if field owner is set (has been assigned a value) and false otherwise */ + public boolean is_set_owner() { + return this.owner != null; + } + + public void set_owner_isSet(boolean value) { + if (!value) { + this.owner = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case MASTER_CODE_DIR: @@ -466,6 +500,14 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen } break; + case OWNER: + if (value == null) { + unset_owner(); + } else { + set_owner((String)value); + } + break; + } } @@ -486,6 +528,9 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen case WORKER_RESOURCES: return get_worker_resources(); + case OWNER: + return get_owner(); + } throw new IllegalStateException(); } @@ -507,6 +552,8 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen return is_set_executor_start_time_secs(); case WORKER_RESOURCES: return is_set_worker_resources(); + case OWNER: + return is_set_owner(); } throw new IllegalStateException(); } @@ -569,6 +616,15 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen return false; } + boolean this_present_owner = true && this.is_set_owner(); + boolean that_present_owner = true && that.is_set_owner(); + if (this_present_owner || that_present_owner) { + if (!(this_present_owner && that_present_owner)) + return false; + if (!this.owner.equals(that.owner)) + return false; + } + return true; } @@ -601,6 +657,11 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen if (present_worker_resources) list.add(worker_resources); + boolean present_owner = true && (is_set_owner()); + list.add(present_owner); + if (present_owner) + list.add(owner); + return list.hashCode(); } @@ -662,6 +723,16 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen return lastComparison; } } + lastComparison = Boolean.valueOf(is_set_owner()).compareTo(other.is_set_owner()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_owner()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.owner, other.owner); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -729,6 +800,16 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen } first = false; } + if (is_set_owner()) { + if (!first) sb.append(", "); + sb.append("owner:"); + if (this.owner == null) { + sb.append("null"); + } else { + sb.append(this.owner); + } + first = false; + } sb.append(")"); return sb.toString(); } @@ -887,6 +968,14 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 7: // OWNER + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.owner = iprot.readString(); + struct.set_owner_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -979,6 +1068,13 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen oprot.writeFieldEnd(); } } + if (struct.owner != null) { + if (struct.is_set_owner()) { + oprot.writeFieldBegin(OWNER_FIELD_DESC); + oprot.writeString(struct.owner); + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -1010,7 +1106,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen if (struct.is_set_worker_resources()) { optionals.set(3); } - oprot.writeBitSet(optionals, 4); + if (struct.is_set_owner()) { + optionals.set(4); + } + oprot.writeBitSet(optionals, 5); if (struct.is_set_node_host()) { { oprot.writeI32(struct.node_host.size()); @@ -1063,6 +1162,9 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen } } } + if (struct.is_set_owner()) { + oprot.writeString(struct.owner); + } } @Override @@ -1070,7 +1172,7 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen TTupleProtocol iprot = (TTupleProtocol) prot; struct.master_code_dir = iprot.readString(); struct.set_master_code_dir_isSet(true); - BitSet incoming = iprot.readBitSet(4); + BitSet incoming = iprot.readBitSet(5); if (incoming.get(0)) { { org.apache.thrift.protocol.TMap _map652 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32()); @@ -1152,6 +1254,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen } struct.set_worker_resources_isSet(true); } + if (incoming.get(4)) { + struct.owner = iprot.readString(); + struct.set_owner_isSet(true); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/generated/LocalAssignment.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/generated/LocalAssignment.java b/storm-core/src/jvm/org/apache/storm/generated/LocalAssignment.java index b48d342..5071499 100644 --- a/storm-core/src/jvm/org/apache/storm/generated/LocalAssignment.java +++ b/storm-core/src/jvm/org/apache/storm/generated/LocalAssignment.java @@ -58,6 +58,7 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, private static final org.apache.thrift.protocol.TField TOPOLOGY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("topology_id", org.apache.thrift.protocol.TType.STRING, (short)1); private static final org.apache.thrift.protocol.TField EXECUTORS_FIELD_DESC = new org.apache.thrift.protocol.TField("executors", org.apache.thrift.protocol.TType.LIST, (short)2); private static final org.apache.thrift.protocol.TField RESOURCES_FIELD_DESC = new org.apache.thrift.protocol.TField("resources", org.apache.thrift.protocol.TType.STRUCT, (short)3); + private static final org.apache.thrift.protocol.TField OWNER_FIELD_DESC = new org.apache.thrift.protocol.TField("owner", org.apache.thrift.protocol.TType.STRING, (short)5); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -68,12 +69,14 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, private String topology_id; // required private List<ExecutorInfo> executors; // required private WorkerResources resources; // optional + private String owner; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { TOPOLOGY_ID((short)1, "topology_id"), EXECUTORS((short)2, "executors"), - RESOURCES((short)3, "resources"); + RESOURCES((short)3, "resources"), + OWNER((short)5, "owner"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -94,6 +97,8 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, return EXECUTORS; case 3: // RESOURCES return RESOURCES; + case 5: // OWNER + return OWNER; default: return null; } @@ -134,7 +139,7 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, } // isset id assignments - private static final _Fields optionals[] = {_Fields.RESOURCES}; + private static final _Fields optionals[] = {_Fields.RESOURCES,_Fields.OWNER}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -145,6 +150,8 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ExecutorInfo.class)))); tmpMap.put(_Fields.RESOURCES, new org.apache.thrift.meta_data.FieldMetaData("resources", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, WorkerResources.class))); + tmpMap.put(_Fields.OWNER, new org.apache.thrift.meta_data.FieldMetaData("owner", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LocalAssignment.class, metaDataMap); } @@ -178,6 +185,9 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, if (other.is_set_resources()) { this.resources = new WorkerResources(other.resources); } + if (other.is_set_owner()) { + this.owner = other.owner; + } } public LocalAssignment deepCopy() { @@ -189,6 +199,7 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, this.topology_id = null; this.executors = null; this.resources = null; + this.owner = null; } public String get_topology_id() { @@ -275,6 +286,29 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, } } + public String get_owner() { + return this.owner; + } + + public void set_owner(String owner) { + this.owner = owner; + } + + public void unset_owner() { + this.owner = null; + } + + /** Returns true if field owner is set (has been assigned a value) and false otherwise */ + public boolean is_set_owner() { + return this.owner != null; + } + + public void set_owner_isSet(boolean value) { + if (!value) { + this.owner = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case TOPOLOGY_ID: @@ -301,6 +335,14 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, } break; + case OWNER: + if (value == null) { + unset_owner(); + } else { + set_owner((String)value); + } + break; + } } @@ -315,6 +357,9 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, case RESOURCES: return get_resources(); + case OWNER: + return get_owner(); + } throw new IllegalStateException(); } @@ -332,6 +377,8 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, return is_set_executors(); case RESOURCES: return is_set_resources(); + case OWNER: + return is_set_owner(); } throw new IllegalStateException(); } @@ -376,6 +423,15 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, return false; } + boolean this_present_owner = true && this.is_set_owner(); + boolean that_present_owner = true && that.is_set_owner(); + if (this_present_owner || that_present_owner) { + if (!(this_present_owner && that_present_owner)) + return false; + if (!this.owner.equals(that.owner)) + return false; + } + return true; } @@ -398,6 +454,11 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, if (present_resources) list.add(resources); + boolean present_owner = true && (is_set_owner()); + list.add(present_owner); + if (present_owner) + list.add(owner); + return list.hashCode(); } @@ -439,6 +500,16 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, return lastComparison; } } + lastComparison = Boolean.valueOf(is_set_owner()).compareTo(other.is_set_owner()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_owner()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.owner, other.owner); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -484,6 +555,16 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, } first = false; } + if (is_set_owner()) { + if (!first) sb.append(", "); + sb.append("owner:"); + if (this.owner == null) { + sb.append("null"); + } else { + sb.append(this.owner); + } + first = false; + } sb.append(")"); return sb.toString(); } @@ -574,6 +655,14 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 5: // OWNER + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.owner = iprot.readString(); + struct.set_owner_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -611,6 +700,13 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, oprot.writeFieldEnd(); } } + if (struct.owner != null) { + if (struct.is_set_owner()) { + oprot.writeFieldBegin(OWNER_FIELD_DESC); + oprot.writeString(struct.owner); + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -640,10 +736,16 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, if (struct.is_set_resources()) { optionals.set(0); } - oprot.writeBitSet(optionals, 1); + if (struct.is_set_owner()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); if (struct.is_set_resources()) { struct.resources.write(oprot); } + if (struct.is_set_owner()) { + oprot.writeString(struct.owner); + } } @Override @@ -663,12 +765,16 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, } } struct.set_executors_isSet(true); - BitSet incoming = iprot.readBitSet(1); + BitSet incoming = iprot.readBitSet(2); if (incoming.get(0)) { struct.resources = new WorkerResources(); struct.resources.read(iprot); struct.set_resources_isSet(true); } + if (incoming.get(1)) { + struct.owner = iprot.readString(); + struct.set_owner_isSet(true); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/generated/StormBase.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/generated/StormBase.java b/storm-core/src/jvm/org/apache/storm/generated/StormBase.java index 34b2358..e3114fb 100644 --- a/storm-core/src/jvm/org/apache/storm/generated/StormBase.java +++ b/storm-core/src/jvm/org/apache/storm/generated/StormBase.java @@ -64,6 +64,7 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ private static final org.apache.thrift.protocol.TField TOPOLOGY_ACTION_OPTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("topology_action_options", org.apache.thrift.protocol.TType.STRUCT, (short)7); private static final org.apache.thrift.protocol.TField PREV_STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("prev_status", org.apache.thrift.protocol.TType.I32, (short)8); private static final org.apache.thrift.protocol.TField COMPONENT_DEBUG_FIELD_DESC = new org.apache.thrift.protocol.TField("component_debug", org.apache.thrift.protocol.TType.MAP, (short)9); + private static final org.apache.thrift.protocol.TField PRINCIPAL_FIELD_DESC = new org.apache.thrift.protocol.TField("principal", org.apache.thrift.protocol.TType.STRING, (short)10); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -80,6 +81,7 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ private TopologyActionOptions topology_action_options; // optional private TopologyStatus prev_status; // optional private Map<String,DebugOptions> component_debug; // optional + private String principal; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -99,7 +101,8 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ * @see TopologyStatus */ PREV_STATUS((short)8, "prev_status"), - COMPONENT_DEBUG((short)9, "component_debug"); + COMPONENT_DEBUG((short)9, "component_debug"), + PRINCIPAL((short)10, "principal"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -132,6 +135,8 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ return PREV_STATUS; case 9: // COMPONENT_DEBUG return COMPONENT_DEBUG; + case 10: // PRINCIPAL + return PRINCIPAL; default: return null; } @@ -175,7 +180,7 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ private static final int __NUM_WORKERS_ISSET_ID = 0; private static final int __LAUNCH_TIME_SECS_ISSET_ID = 1; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.COMPONENT_EXECUTORS,_Fields.LAUNCH_TIME_SECS,_Fields.OWNER,_Fields.TOPOLOGY_ACTION_OPTIONS,_Fields.PREV_STATUS,_Fields.COMPONENT_DEBUG}; + private static final _Fields optionals[] = {_Fields.COMPONENT_EXECUTORS,_Fields.LAUNCH_TIME_SECS,_Fields.OWNER,_Fields.TOPOLOGY_ACTION_OPTIONS,_Fields.PREV_STATUS,_Fields.COMPONENT_DEBUG,_Fields.PRINCIPAL}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -201,6 +206,8 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, DebugOptions.class)))); + tmpMap.put(_Fields.PRINCIPAL, new org.apache.thrift.meta_data.FieldMetaData("principal", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StormBase.class, metaDataMap); } @@ -261,6 +268,9 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ } this.component_debug = __this__component_debug; } + if (other.is_set_principal()) { + this.principal = other.principal; + } } public StormBase deepCopy() { @@ -280,6 +290,7 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ this.topology_action_options = null; this.prev_status = null; this.component_debug = null; + this.principal = null; } public String get_name() { @@ -525,6 +536,29 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ } } + public String get_principal() { + return this.principal; + } + + public void set_principal(String principal) { + this.principal = principal; + } + + public void unset_principal() { + this.principal = null; + } + + /** Returns true if field principal is set (has been assigned a value) and false otherwise */ + public boolean is_set_principal() { + return this.principal != null; + } + + public void set_principal_isSet(boolean value) { + if (!value) { + this.principal = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case NAME: @@ -599,6 +633,14 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ } break; + case PRINCIPAL: + if (value == null) { + unset_principal(); + } else { + set_principal((String)value); + } + break; + } } @@ -631,6 +673,9 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ case COMPONENT_DEBUG: return get_component_debug(); + case PRINCIPAL: + return get_principal(); + } throw new IllegalStateException(); } @@ -660,6 +705,8 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ return is_set_prev_status(); case COMPONENT_DEBUG: return is_set_component_debug(); + case PRINCIPAL: + return is_set_principal(); } throw new IllegalStateException(); } @@ -758,6 +805,15 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ return false; } + boolean this_present_principal = true && this.is_set_principal(); + boolean that_present_principal = true && that.is_set_principal(); + if (this_present_principal || that_present_principal) { + if (!(this_present_principal && that_present_principal)) + return false; + if (!this.principal.equals(that.principal)) + return false; + } + return true; } @@ -810,6 +866,11 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ if (present_component_debug) list.add(component_debug); + boolean present_principal = true && (is_set_principal()); + list.add(present_principal); + if (present_principal) + list.add(principal); + return list.hashCode(); } @@ -911,6 +972,16 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ return lastComparison; } } + lastComparison = Boolean.valueOf(is_set_principal()).compareTo(other.is_set_principal()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_principal()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.principal, other.principal); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -1006,6 +1077,16 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ } first = false; } + if (is_set_principal()) { + if (!first) sb.append(", "); + sb.append("principal:"); + if (this.principal == null) { + sb.append("null"); + } else { + sb.append(this.principal); + } + first = false; + } sb.append(")"); return sb.toString(); } @@ -1161,6 +1242,14 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 10: // PRINCIPAL + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.principal = iprot.readString(); + struct.set_principal_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1243,6 +1332,13 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ oprot.writeFieldEnd(); } } + if (struct.principal != null) { + if (struct.is_set_principal()) { + oprot.writeFieldBegin(PRINCIPAL_FIELD_DESC); + oprot.writeString(struct.principal); + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -1282,7 +1378,10 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ if (struct.is_set_component_debug()) { optionals.set(5); } - oprot.writeBitSet(optionals, 6); + if (struct.is_set_principal()) { + optionals.set(6); + } + oprot.writeBitSet(optionals, 7); if (struct.is_set_component_executors()) { { oprot.writeI32(struct.component_executors.size()); @@ -1315,6 +1414,9 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ } } } + if (struct.is_set_principal()) { + oprot.writeString(struct.principal); + } } @Override @@ -1326,7 +1428,7 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ struct.set_status_isSet(true); struct.num_workers = iprot.readI32(); struct.set_num_workers_isSet(true); - BitSet incoming = iprot.readBitSet(6); + BitSet incoming = iprot.readBitSet(7); if (incoming.get(0)) { { org.apache.thrift.protocol.TMap _map686 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I32, iprot.readI32()); @@ -1375,6 +1477,10 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._ } struct.set_component_debug_isSet(true); } + if (incoming.get(6)) { + struct.principal = iprot.readString(); + struct.set_principal_isSet(true); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java index ae5103f..2a9ec24 100644 --- a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java +++ b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java @@ -101,10 +101,12 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { private class DownloadBaseBlobsDistributed implements Callable<Void> { protected final String _topologyId; protected final File _stormRoot; - - public DownloadBaseBlobsDistributed(String topologyId) throws IOException { + protected final String owner; + + public DownloadBaseBlobsDistributed(String topologyId, String owner) throws IOException { _topologyId = topologyId; _stormRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf, _topologyId)); + this.owner = owner; } protected void downloadBaseBlobs(File tmproot) throws Exception { @@ -145,7 +147,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { try { downloadBaseBlobs(tr); _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot); - _fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, _topologyId), _stormRoot); + _fsOps.setupStormCodeDir(owner, _stormRoot); deleteAll = false; } finally { if (deleteAll) { @@ -164,8 +166,8 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { private class DownloadBaseBlobsLocal extends DownloadBaseBlobsDistributed { - public DownloadBaseBlobsLocal(String topologyId) throws IOException { - super(topologyId); + public DownloadBaseBlobsLocal(String topologyId, String owner) throws IOException { + super(topologyId, owner); } @Override @@ -210,21 +212,22 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { private class DownloadBlobs implements Callable<Void> { private final String _topologyId; + private final String topoOwner; - public DownloadBlobs(String topologyId) { + public DownloadBlobs(String topologyId, String topoOwner) { _topologyId = topologyId; + this.topoOwner = topoOwner; } @Override public Void call() throws Exception { try { String stormroot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId); - Map<String, Object> stormConf = ConfigUtils.readSupervisorStormConf(_conf, _topologyId); + Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(_conf, _topologyId); @SuppressWarnings("unchecked") - Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); - String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); - String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME); + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME); List<LocalResource> localResourceList = new ArrayList<>(); if (blobstoreMap != null) { @@ -247,12 +250,12 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { } if (!localResourceList.isEmpty()) { - File userDir = _localizer.getLocalUserFileCacheDir(user); + File userDir = _localizer.getLocalUserFileCacheDir(topoOwner); if (!_fsOps.fileExists(userDir)) { _fsOps.forceMkdir(userDir); } - List<LocalizedResource> localizedResources = _localizer.getBlobs(localResourceList, user, topoName, userDir); - _fsOps.setupBlobPermissions(userDir, user); + List<LocalizedResource> localizedResources = _localizer.getBlobs(localResourceList, topoOwner, topoName, userDir); + _fsOps.setupBlobPermissions(userDir, topoOwner); if (!_symlinksDisabled) { for (LocalizedResource localizedResource : localizedResources) { String keyName = localizedResource.getKey(); @@ -310,9 +313,9 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { if (localResource == null) { Callable<Void> c; if (_isLocalMode) { - c = new DownloadBaseBlobsLocal(topologyId); + c = new DownloadBaseBlobsLocal(topologyId, assignment.get_owner()); } else { - c = new DownloadBaseBlobsDistributed(topologyId); + c = new DownloadBaseBlobsDistributed(topologyId, assignment.get_owner()); } localResource = new LocalDownloadedResource(_execService.submit(c)); _basicPending.put(topologyId, localResource); @@ -363,7 +366,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { final String topologyId = assignment.get_topology_id(); LocalDownloadedResource localResource = _blobPending.get(topologyId); if (localResource == null) { - Callable<Void> c = new DownloadBlobs(topologyId); + Callable<Void> c = new DownloadBlobs(topologyId, assignment.get_owner()); localResource = new LocalDownloadedResource(_execService.submit(c)); _blobPending.put(topologyId, localResource); } @@ -386,7 +389,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { @SuppressWarnings("unchecked") Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); if (blobstoreMap != null) { - String user = (String) topoConf.get(Config.TOPOLOGY_SUBMITTER_USER); + String user = assignment.get_owner(); String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME); for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) { http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java b/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java index a0eb4ad..a394e39 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.scheduler; import java.util.ArrayList; @@ -37,13 +38,12 @@ import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class TopologyDetails { - private String topologyId; - private Map topologyConf; - private StormTopology topology; - private Map<ExecutorDetails, String> executorToComponent; - private int numWorkers; + private final String topologyId; + private final Map topologyConf; + private final StormTopology topology; + private final Map<ExecutorDetails, String> executorToComponent; + private final int numWorkers; //<ExecutorDetails - Task, Map<String - Type of resource, Map<String - type of that resource, Double - amount>>> private Map<ExecutorDetails, Map<String, Double>> resourceList; //Max heap size for a worker used by topology @@ -51,21 +51,23 @@ public class TopologyDetails { //topology priority private Integer topologyPriority; //when topology was launched - private int launchTime; + private final int launchTime; + private final String owner; private static final Logger LOG = LoggerFactory.getLogger(TopologyDetails.class); - public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers) { - this(topologyId, topologyConf, topology, numWorkers, null, 0); + public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers, String owner) { + this(topologyId, topologyConf, topology, numWorkers, null, 0, owner); } public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, - int numWorkers, Map<ExecutorDetails, String> executorToComponents) { - this(topologyId, topologyConf, topology, numWorkers, executorToComponents, 0); + int numWorkers, Map<ExecutorDetails, String> executorToComponents, String owner) { + this(topologyId, topologyConf, topology, numWorkers, executorToComponents, 0, owner); } public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, - int numWorkers, Map<ExecutorDetails, String> executorToComponents, int launchTime) { + int numWorkers, Map<ExecutorDetails, String> executorToComponents, int launchTime, String owner) { + this.owner = owner; this.topologyId = topologyId; this.topologyConf = topologyConf; this.topology = topology; @@ -464,12 +466,7 @@ public class TopologyDetails { * Get the user that submitted this topology */ public String getTopologySubmitter() { - String user = (String) this.topologyConf.get(Config.TOPOLOGY_SUBMITTER_USER); - if (user == null || user.equals("")) { - LOG.debug("Topology {} submitted by anonymous user", this.getName()); - user = System.getProperty("user.name"); - } - return user; + return owner; } /** http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java index 60a5259..01cfb91 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java @@ -79,7 +79,7 @@ public class MultitenantScheduler implements IScheduler { defaultPool.init(cluster, nodeIdToNode); for (TopologyDetails td: topologies.getTopologies()) { - String user = (String)td.getConf().get(Config.TOPOLOGY_SUBMITTER_USER); + String user = td.getTopologySubmitter(); LOG.debug("Found top {} run by user {}",td.getId(), user); NodePool pool = userPools.get(user); if (pool == null || !pool.canAdd(td)) {
