http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java b/storm-core/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java index 4de0cfa..bbb1e6c 100644 --- a/storm-core/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java +++ b/storm-core/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java @@ -20,6 +20,7 @@ package org.apache.storm.security; import org.apache.storm.daemon.Shutdownable; import java.util.Map; +import org.apache.storm.generated.StormTopology; /** * Nimbus auto credential plugin that will be called on nimbus host @@ -29,8 +30,8 @@ import java.util.Map; public interface INimbusCredentialPlugin extends Shutdownable { /** - * this method will be called when nimbus initializes. - * @param conf + * This method will be called when nimbus initializes. + * @param conf the cluster config */ void prepare(Map conf); @@ -39,9 +40,22 @@ public interface INimbusCredentialPlugin extends Shutdownable { * at least once during the submit Topology action. It will be not be called during activate instead * the credentials return by this method will be merged with the other credentials in the topology * and stored in zookeeper. + * NOTE: THIS METHOD WILL BE CALLED THROUGH REFLECTION. Existing compiled implementations will still + * work but new implementations will not compile. A NOOP implementation can be added to make it compile. * @param credentials credentials map where more credentials will be added. - * @param conf topology configuration - * @return + * @param topoConf topology configuration */ - void populateCredentials(Map<String, String> credentials, Map conf); + @Deprecated + void populateCredentials(Map<String, String> credentials, Map topoConf); + + /** + * Method that will be called on nimbus as part of submit topology. This plugin will be called + * at least once during the submit Topology action. It will be not be called during activate instead + * the credentials return by this method will be merged with the other credentials in the topology + * and stored in zookeeper. + * @param credentials credentials map where more credentials will be added. + * @param topoConf topology configuration + * @param topologyOwnerPrincipal the full principal name of the owner of the topology + */ + void populateCredentials(Map<String, String> credentials, Map<String, Object> topoConf, final String topologyOwnerPrincipal); }
http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java b/storm-core/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java index 1f947aa..43690b3 100644 --- a/storm-core/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java +++ b/storm-core/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java @@ -35,6 +35,20 @@ public interface ICredentialsRenewer { * Renew any credentials that need to be renewed. (Update the credentials if needed) * @param credentials the credentials that may have something to renew. * @param topologyConf topology configuration. - */ - public void renew(Map<String, String> credentials, Map topologyConf); + * @param topologyOwnerPrincipal the full principal name of the owner of the topology + */ + @SuppressWarnings("deprecation") + void renew(Map<String, String> credentials, Map<String, Object> topologyConf, String topologyOwnerPrincipal); + + /** + * Renew any credentials that need to be renewed. (Update the credentials if needed) + * NOTE: THIS WILL BE CALLED THROUGH REFLECTION. So if the newer renew exists it will be called instead, + * but if it does not exist this will be called. That means that this is binary compatible but not source + * compatible with older version. To make the compilation work this can become a noop when the new API + * is implemented. + * @param credentials the credentials that may have something to renew. + * @param topologyConf topology configuration. + */ + @Deprecated + void renew(Map<String, String> credentials, Map topologyConf); } http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java b/storm-core/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java index d02c4e3..3d78d48 100644 --- a/storm-core/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java +++ b/storm-core/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java @@ -202,7 +202,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer { } @Override - public void renew(Map<String,String> credentials, Map topologyConf) { + public void renew(Map<String,String> credentials, Map<String, Object> topologyConf, String topologyOwnerPrincipal) { KerberosTicket tgt = getTGT(credentials); if (tgt != null) { long refreshTime = getRefreshTime(tgt); @@ -219,6 +219,10 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer { } } + public void renew(Map<String, String> credentials, Map topologyConf) { + throw new IllegalStateException("SHOULD NOT BE CALLED"); + } + public static void main(String[] args) throws Exception { AutoTGT at = new AutoTGT(); Map conf = new java.util.HashMap(); http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/py/storm/ttypes.py ---------------------------------------------------------------------- diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py index 8262285..8091910 100644 --- a/storm-core/src/py/storm/ttypes.py +++ b/storm-core/src/py/storm/ttypes.py @@ -9113,6 +9113,7 @@ class Assignment: - executor_node_port - executor_start_time_secs - worker_resources + - owner """ thrift_spec = ( @@ -9126,9 +9127,11 @@ class Assignment: }, ), # 4 (5, TType.MAP, 'worker_resources', (TType.STRUCT,(NodeInfo, NodeInfo.thrift_spec),TType.STRUCT,(WorkerResources, WorkerResources.thrift_spec)), { }, ), # 5 + None, # 6 + (7, TType.STRING, 'owner', None, None, ), # 7 ) - def __init__(self, master_code_dir=None, node_host=thrift_spec[2][4], executor_node_port=thrift_spec[3][4], executor_start_time_secs=thrift_spec[4][4], worker_resources=thrift_spec[5][4],): + def __init__(self, master_code_dir=None, node_host=thrift_spec[2][4], executor_node_port=thrift_spec[3][4], executor_start_time_secs=thrift_spec[4][4], worker_resources=thrift_spec[5][4], owner=None,): self.master_code_dir = master_code_dir if node_host is self.thrift_spec[2][4]: node_host = { @@ -9146,6 +9149,7 @@ class Assignment: worker_resources = { } self.worker_resources = worker_resources + self.owner = owner def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -9218,6 +9222,11 @@ class Assignment: iprot.readMapEnd() else: iprot.skip(ftype) + elif fid == 7: + if ftype == TType.STRING: + self.owner = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -9270,6 +9279,10 @@ class Assignment: viter601.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() + if self.owner is not None: + oprot.writeFieldBegin('owner', TType.STRING, 7) + oprot.writeString(self.owner.encode('utf-8')) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -9286,6 +9299,7 @@ class Assignment: value = (value * 31) ^ hash(self.executor_node_port) value = (value * 31) ^ hash(self.executor_start_time_secs) value = (value * 31) ^ hash(self.worker_resources) + value = (value * 31) ^ hash(self.owner) return value def __repr__(self): @@ -9391,6 +9405,7 @@ class StormBase: - topology_action_options - prev_status - component_debug + - principal """ thrift_spec = ( @@ -9404,9 +9419,10 @@ class StormBase: (7, TType.STRUCT, 'topology_action_options', (TopologyActionOptions, TopologyActionOptions.thrift_spec), None, ), # 7 (8, TType.I32, 'prev_status', None, None, ), # 8 (9, TType.MAP, 'component_debug', (TType.STRING,None,TType.STRUCT,(DebugOptions, DebugOptions.thrift_spec)), None, ), # 9 + (10, TType.STRING, 'principal', None, None, ), # 10 ) - def __init__(self, name=None, status=None, num_workers=None, component_executors=None, launch_time_secs=None, owner=None, topology_action_options=None, prev_status=None, component_debug=None,): + def __init__(self, name=None, status=None, num_workers=None, component_executors=None, launch_time_secs=None, owner=None, topology_action_options=None, prev_status=None, component_debug=None, principal=None,): self.name = name self.status = status self.num_workers = num_workers @@ -9416,6 +9432,7 @@ class StormBase: self.topology_action_options = topology_action_options self.prev_status = prev_status self.component_debug = component_debug + self.principal = principal def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -9485,6 +9502,11 @@ class StormBase: iprot.readMapEnd() else: iprot.skip(ftype) + elif fid == 10: + if ftype == TType.STRING: + self.principal = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -9539,6 +9561,10 @@ class StormBase: viter619.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() + if self.principal is not None: + oprot.writeFieldBegin('principal', TType.STRING, 10) + oprot.writeString(self.principal.encode('utf-8')) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -9563,6 +9589,7 @@ class StormBase: value = (value * 31) ^ hash(self.topology_action_options) value = (value * 31) ^ hash(self.prev_status) value = (value * 31) ^ hash(self.component_debug) + value = (value * 31) ^ hash(self.principal) return value def __repr__(self): @@ -9866,6 +9893,7 @@ class LocalAssignment: - topology_id - executors - resources + - owner """ thrift_spec = ( @@ -9873,12 +9901,15 @@ class LocalAssignment: (1, TType.STRING, 'topology_id', None, None, ), # 1 (2, TType.LIST, 'executors', (TType.STRUCT,(ExecutorInfo, ExecutorInfo.thrift_spec)), None, ), # 2 (3, TType.STRUCT, 'resources', (WorkerResources, WorkerResources.thrift_spec), None, ), # 3 + None, # 4 + (5, TType.STRING, 'owner', None, None, ), # 5 ) - def __init__(self, topology_id=None, executors=None, resources=None,): + def __init__(self, topology_id=None, executors=None, resources=None, owner=None,): self.topology_id = topology_id self.executors = executors self.resources = resources + self.owner = owner def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -9911,6 +9942,11 @@ class LocalAssignment: self.resources.read(iprot) else: iprot.skip(ftype) + elif fid == 5: + if ftype == TType.STRING: + self.owner = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -9936,6 +9972,10 @@ class LocalAssignment: oprot.writeFieldBegin('resources', TType.STRUCT, 3) self.resources.write(oprot) oprot.writeFieldEnd() + if self.owner is not None: + oprot.writeFieldBegin('owner', TType.STRING, 5) + oprot.writeString(self.owner.encode('utf-8')) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -9952,6 +9992,7 @@ class LocalAssignment: value = (value * 31) ^ hash(self.topology_id) value = (value * 31) ^ hash(self.executors) value = (value * 31) ^ hash(self.resources) + value = (value * 31) ^ hash(self.owner) return value def __repr__(self): http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/src/storm.thrift ---------------------------------------------------------------------- diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift index 146591f..6c395cc 100644 --- a/storm-core/src/storm.thrift +++ b/storm-core/src/storm.thrift @@ -473,6 +473,8 @@ struct Assignment { 3: optional map<list<i64>, NodeInfo> executor_node_port = {}; 4: optional map<list<i64>, i64> executor_start_time_secs = {}; 5: optional map<NodeInfo, WorkerResources> worker_resources = {}; + //6: from other pull request + 7: optional string owner; } enum TopologyStatus { @@ -497,6 +499,7 @@ struct StormBase { 7: optional TopologyActionOptions topology_action_options; 8: optional TopologyStatus prev_status;//currently only used during rebalance action. 9: optional map<string, DebugOptions> component_debug; // topology/component level debug option. + 10: optional string principal; } struct ClusterWorkerHeartbeat { @@ -519,6 +522,8 @@ struct LocalAssignment { 1: required string topology_id; 2: required list<ExecutorInfo> executors; 3: optional WorkerResources resources; + //4: other pull request + 5: optional string owner; } struct LSSupervisorId { http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/clj/org/apache/storm/cluster_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj index e082768..55b686e 100644 --- a/storm-core/test/clj/org/apache/storm/cluster_test.clj +++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj @@ -171,14 +171,14 @@ (deftest test-storm-cluster-state-basics (with-inprocess-zookeeper zk-port (let [state (mk-storm-state zk-port) - assignment1 (Assignment. "/aaa" {} {[1] ["1" 1001 1]} {} {}) - assignment2 (Assignment. "/aaa" {} {[2] ["2" 2002]} {} {}) + assignment1 (Assignment. "/aaa" {} {[1] ["1" 1001 1]} {} {} "") + assignment2 (Assignment. "/aaa" {} {[2] ["2" 2002]} {} {} "") nimbusInfo1 (NimbusInfo. "nimbus1" 6667 false) nimbusInfo2 (NimbusInfo. "nimbus2" 6667 false) nimbusSummary1 (NimbusSummary. "nimbus1" 6667 (current-time-secs) false "v1") nimbusSummary2 (NimbusSummary. "nimbus2" 6667 (current-time-secs) false "v2") - base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "" nil nil {}) - base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "" nil nil {})] + base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "" nil nil {} "") + base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "" nil nil {} "")] (is (= [] (.assignments state nil))) (.set-assignment! state "storm1" assignment1) (is (= assignment1 (.assignment-info state "storm1" nil))) http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj index 0c3ee9b..144f18c 100644 --- a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj +++ b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj @@ -138,7 +138,7 @@ 2 {executor1 "spout1" executor2 "bolt1" - executor3 "bolt2"})] + executor3 "bolt2"} "user")] ;; assign one node so it is not in the pool (.assign (.get node-map "super0") "topology1" (list executor1) cluster) (.init free-pool cluster node-map) @@ -175,7 +175,7 @@ 5 {executor1 "spout1" executor2 "bolt1" - executor3 "bolt2"})] + executor3 "bolt2"} "user")] ;; assign one node so it is not in the pool (.assign (.get node-map "super0") "topology1" (list executor1) cluster) (.init free-pool cluster node-map) @@ -216,7 +216,7 @@ executor2 "bolt1" executor3 "bolt1" executor4 "bolt1" - executor5 "bolt2"})] + executor5 "bolt2"} "user")] ;; assign one node so it is not in the pool (.assign (.get node-map "super0") "topology1" (list executor1) cluster) (.init free-pool cluster node-map) @@ -255,7 +255,7 @@ executor2 "bolt1" executor3 "bolt2" executor4 "bolt3" - executor5 "bolt4"})] + executor5 "bolt4"} "user")] (let [node-map (Node/getAllNodesFrom single-cluster) free-pool (FreePool. ) default-pool (DefaultPool. )] @@ -302,7 +302,7 @@ 2 {executor1 "spout1" executor2 "bolt1" - executor3 "bolt2"}) + executor3 "bolt2"} "user") topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) @@ -310,7 +310,7 @@ {executor11 "spout11" executor12 "bolt12" executor13 "bolt13" - executor14 "bolt14"})] + executor14 "bolt14"} "user")] ;; assign one node so it is not in the pool (.assign (.get node-map "super0") "topology1" (list executor1) cluster) (.init free-pool cluster node-map) @@ -383,7 +383,7 @@ {executor1 "spout1" executor2 "bolt1" executor3 "bolt2" - executor4 "bolt4"})] + executor4 "bolt4"} "user")] ;; assign one node so it is not in the pool (.assign (.get node-map "super0") "topology1" (list executor1) cluster) (.init free-pool cluster node-map) @@ -427,7 +427,7 @@ {executor1 "spout1" executor2 "bolt1" executor3 "bolt2" - executor4 "bolt4"})] + executor4 "bolt4"} "user")] ;; assign one node so it is not in the pool (.assign (.get node-map "super0") "topology1" (list executor1) cluster) (.init free-pool cluster node-map) @@ -474,7 +474,7 @@ {executor1 "spout1" executor2 "bolt1" executor3 "bolt2" - executor4 "bolt4"}) + executor4 "bolt4"} "user") topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2" TOPOLOGY-ISOLATED-MACHINES 2} @@ -483,7 +483,7 @@ {executor11 "spout11" executor12 "bolt12" executor13 "bolt13" - executor14 "bolt14"})] + executor14 "bolt14"} "user")] ;; assign one node so it is not in the pool (.assign (.get node-map "super0") "topology1" (list executor1) cluster) (.init free-pool cluster node-map) @@ -579,7 +579,7 @@ {executor1 "spout1" executor2 "bolt1" executor3 "bolt2" - executor4 "bolt4"}) + executor4 "bolt4"} "user") topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2" TOPOLOGY-ISOLATED-MACHINES 2} @@ -588,7 +588,7 @@ {executor11 "spout11" executor12 "bolt12" executor13 "bolt13" - executor14 "bolt14"})] + executor14 "bolt14"} "user")] ;; assign one node so it is not in the pool (.assign (.get node-map "super0") "topology1" (list executor1) cluster) (.init free-pool cluster node-map) @@ -629,34 +629,31 @@ (deftest test-multitenant-scheduler (let [supers (gen-supervisors 10) topology1 (TopologyDetails. "topology1" - {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userC"} + {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) 4 (mk-ed-map [["spout1" 0 5] ["bolt1" 5 10] ["bolt2" 10 15] - ["bolt3" 15 20]])) + ["bolt3" 15 20]]) "userC") topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2" - TOPOLOGY-ISOLATED-MACHINES 2 - TOPOLOGY-SUBMITTER-USER "userA"} + TOPOLOGY-ISOLATED-MACHINES 2} (StormTopology.) 4 (mk-ed-map [["spout11" 0 5] ["bolt12" 5 6] ["bolt13" 6 7] - ["bolt14" 7 10]])) + ["bolt14" 7 10]]) "userA") topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3" - TOPOLOGY-ISOLATED-MACHINES 5 - TOPOLOGY-SUBMITTER-USER "userB"} + TOPOLOGY-ISOLATED-MACHINES 5} (StormTopology.) 10 (mk-ed-map [["spout21" 0 10] ["bolt22" 10 20] ["bolt23" 20 30] - ["bolt24" 30 40]])) + ["bolt24" 30 40]]) "userB") cluster (Cluster. (nimbus/standalone-nimbus) supers {} nil) node-map (Node/getAllNodesFrom cluster) topologies (Topologies. (to-top-map [topology1 topology2 topology3])) @@ -682,14 +679,13 @@ (deftest test-force-free-slot-in-bad-state (let [supers (gen-supervisors 1) topology1 (TopologyDetails. "topology1" - {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userC"} + {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) 4 (mk-ed-map [["spout1" 0 5] ["bolt1" 5 10] ["bolt2" 10 15] - ["bolt3" 15 20]])) + ["bolt3" 15 20]]) "userC") existing-assignments { "topology1" (SchedulerAssignmentImpl. "topology1" {(ExecutorDetails. 0 5) (WorkerSlot. "super0" 1) (ExecutorDetails. 5 10) (WorkerSlot. "super0" 20) @@ -719,25 +715,22 @@ (testing "Assiging same worker slot to different topologies is bad state" (let [supers (gen-supervisors 5) topology1 (TopologyDetails. "topology1" - {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userC"} + {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) 1 - (mk-ed-map [["spout1" 0 1]])) + (mk-ed-map [["spout1" 0 1]]) "userC") topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2" - TOPOLOGY-ISOLATED-MACHINES 2 - TOPOLOGY-SUBMITTER-USER "userA"} + TOPOLOGY-ISOLATED-MACHINES 2} (StormTopology.) 2 - (mk-ed-map [["spout11" 1 2]["bolt11" 3 4]])) + (mk-ed-map [["spout11" 1 2]["bolt11" 3 4]]) "userA") topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3" - TOPOLOGY-ISOLATED-MACHINES 1 - TOPOLOGY-SUBMITTER-USER "userB"} + TOPOLOGY-ISOLATED-MACHINES 1} (StormTopology.) 1 - (mk-ed-map [["spout21" 2 3]])) + (mk-ed-map [["spout21" 2 3]]) "userB") worker-slot-with-multiple-assignments (WorkerSlot. "super1" 1) existing-assignments {"topology2" (SchedulerAssignmentImpl. "topology2" {(ExecutorDetails. 1 1) worker-slot-with-multiple-assignments}) "topology3" (SchedulerAssignmentImpl. "topology3" {(ExecutorDetails. 2 2) worker-slot-with-multiple-assignments})} @@ -761,11 +754,10 @@ (let [supers (gen-supervisors 1) port-not-reported-by-supervisor 6 topology1 (TopologyDetails. "topology1" - {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userA"} + {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) 1 - (mk-ed-map [["spout11" 0 1]])) + (mk-ed-map [["spout11" 0 1]]) "userA") existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1" {(ExecutorDetails. 0 0) (WorkerSlot. "super0" port-not-reported-by-supervisor)})} @@ -787,19 +779,17 @@ dead-supervisor "super1" port-not-reported-by-supervisor 6 topology1 (TopologyDetails. "topology1" - {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userA"} + {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) 2 (mk-ed-map [["spout11" 0 1] - ["bolt12" 1 2]])) + ["bolt12" 1 2]]) "userA") topology2 (TopologyDetails. "topology2" - {TOPOLOGY-NAME "topology-name-2" - TOPOLOGY-SUBMITTER-USER "userA"} + {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) 2 (mk-ed-map [["spout21" 4 5] - ["bolt22" 5 6]])) + ["bolt22" 5 6]]) "userA") worker-slot-with-multiple-assignments (WorkerSlot. dead-supervisor 1) existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1" @@ -837,11 +827,10 @@ supers {"super1" super1 "super2" super2} topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userA" TOPOLOGY-ISOLATED-MACHINES 1} (StormTopology.) 7 - (mk-ed-map [["spout21" 0 7]])) + (mk-ed-map [["spout21" 0 7]]) "userA") existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1" {(ExecutorDetails. 0 0) (WorkerSlot. "super1" 1) http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj index ec51914..6210c33 100644 --- a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj +++ b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj @@ -101,8 +101,8 @@ cluster (Cluster. (nimbus/standalone-nimbus) supers {} {}) topologies (Topologies. (to-top-map [])) node-map (RAS_Nodes/getAllNodesFrom cluster topologies) - topology1 (TopologyDetails. "topology1" {} nil 0) - topology2 (TopologyDetails. "topology2" {} nil 0)] + topology1 (TopologyDetails. "topology1" {} nil 0 "user") + topology2 (TopologyDetails. "topology2" {} nil 0 "user")] (is (= 5 (.size node-map))) (let [node (.get node-map "id0")] (is (= "id0" (.getId node))) @@ -152,7 +152,6 @@ storm-topology (.createTopology builder) topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userC" TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 @@ -162,7 +161,7 @@ storm-topology 1 (mk-ed-map [["wordSpout" 0 1] - ["wordCountBolt" 1 2]])) + ["wordCountBolt" 1 2]]) "userC") cluster (Cluster. (nimbus/standalone-nimbus) supers {} {STORM-NETWORK-TOPOGRAPHY-PLUGIN "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"}) @@ -195,7 +194,6 @@ storm-topology1 (.createTopology builder1) topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userC" TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 @@ -210,14 +208,13 @@ ["wordCountBolt2" 3 4] ["wordCountBolt3" 4 5] ["wordCountBolt4" 5 6] - ["wordCountBolt5" 6 7]])) + ["wordCountBolt5" 6 7]]) "userC") builder2 (TopologyBuilder.) ;; a topology with two unconnected partitions _ (.setSpout builder2 "wordSpoutX" (TestWordSpout.) 1) _ (.setSpout builder2 "wordSpoutY" (TestWordSpout.) 1) storm-topology2 (.createTopology builder1) topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2" - TOPOLOGY-SUBMITTER-USER "userC" TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 @@ -227,7 +224,7 @@ storm-topology2 1 (mk-ed-map [["wordSpoutX" 0 1] - ["wordSpoutY" 1 2]])) + ["wordSpoutY" 1 2]]) "userC") supers (gen-supervisors 2 4) cluster (Cluster. (nimbus/standalone-nimbus) supers {} {STORM-NETWORK-TOPOGRAPHY-PLUGIN @@ -264,7 +261,6 @@ storm-topology (.createTopology builder) topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2" - TOPOLOGY-SUBMITTER-USER "userC" TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 @@ -274,7 +270,7 @@ storm-topology 2 (mk-ed-map [["wordSpout" 0 1] - ["wordCountBolt" 1 2]])) + ["wordCountBolt" 1 2]]) "userC") cluster (Cluster. (nimbus/standalone-nimbus) supers {} {STORM-NETWORK-TOPOGRAPHY-PLUGIN "org.apache.storm.testing.AlternateRackDNSToSwitchMapping"}) @@ -305,7 +301,6 @@ storm-topology (.createTopology builder) topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userC" TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 @@ -315,7 +310,7 @@ storm-topology 2 ;; need two workers, each on one node (mk-ed-map [["wordSpout" 0 2] - ["wordCountBolt" 2 3]])) + ["wordCountBolt" 2 3]]) "userC") cluster (Cluster. (nimbus/standalone-nimbus) supers {} {STORM-NETWORK-TOPOGRAPHY-PLUGIN "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"}) @@ -365,7 +360,6 @@ storm-topology1 (.createTopology builder1) topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userC" TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 @@ -374,13 +368,12 @@ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY} storm-topology1 3 ;; three workers to hold three executors - (mk-ed-map [["spout1" 0 3]])) + (mk-ed-map [["spout1" 0 3]]) "userC") builder2 (TopologyBuilder.) _ (.setSpout builder2 "spout2" (TestWordSpout.) 2) storm-topology2 (.createTopology builder2) topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2" - TOPOLOGY-SUBMITTER-USER "userC" TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 1280.0 ;; large enough thus two eds can not be fully assigned to one node TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 @@ -389,7 +382,7 @@ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY} storm-topology2 2 ;; two workers, each holds one executor and resides on one node - (mk-ed-map [["spout2" 0 2]])) + (mk-ed-map [["spout2" 0 2]]) "userC") scheduler (ResourceAwareScheduler.)] (testing "When a worker fails, RAS does not alter existing assignments on healthy workers" @@ -512,7 +505,6 @@ storm-topology1 (.createTopology builder1) topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userC" TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 @@ -521,7 +513,7 @@ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY} storm-topology1 1 - (mk-ed-map [["spout1" 0 1]])) + (mk-ed-map [["spout1" 0 1]]) "userC") builder2 (TopologyBuilder.) ;; topo2 has 4 large tasks _ (doto (.setSpout builder2 "spout2" (TestWordSpout.) 4) (.setMemoryLoad 500.0 12.0) @@ -529,7 +521,6 @@ storm-topology2 (.createTopology builder2) topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2" - TOPOLOGY-SUBMITTER-USER "userC" TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 @@ -538,7 +529,7 @@ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY} storm-topology2 2 - (mk-ed-map [["spout2" 0 4]])) + (mk-ed-map [["spout2" 0 4]]) "userC") builder3 (TopologyBuilder.) ;; topo3 has 4 medium tasks, launching topo 1-3 together requires the same mem as the cluster's mem capacity (5G) _ (doto (.setSpout builder3 "spout3" (TestWordSpout.) 4) (.setMemoryLoad 200.0 56.0) @@ -546,7 +537,6 @@ storm-topology3 (.createTopology builder3) topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3" - TOPOLOGY-SUBMITTER-USER "userC" TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 @@ -555,7 +545,7 @@ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY} storm-topology3 2 - (mk-ed-map [["spout3" 0 4]])) + (mk-ed-map [["spout3" 0 4]]) "userC") builder4 (TopologyBuilder.) ;; topo4 has 12 small tasks, each's mem req does not exactly divide a node's mem capacity _ (doto (.setSpout builder4 "spout4" (TestWordSpout.) 2) (.setMemoryLoad 100.0 0.0) @@ -563,7 +553,6 @@ storm-topology4 (.createTopology builder4) topology4 (TopologyDetails. "topology4" {TOPOLOGY-NAME "topology-name-4" - TOPOLOGY-SUBMITTER-USER "userC" TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 @@ -572,7 +561,7 @@ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY} storm-topology4 2 - (mk-ed-map [["spout4" 0 12]])) + (mk-ed-map [["spout4" 0 12]]) "userC") builder5 (TopologyBuilder.) ;; topo5 has 40 small tasks, it should be able to exactly use up both the cpu and mem in teh cluster _ (doto (.setSpout builder5 "spout5" (TestWordSpout.) 40) (.setMemoryLoad 100.0 28.0) @@ -580,7 +569,6 @@ storm-topology5 (.createTopology builder5) topology5 (TopologyDetails. "topology5" {TOPOLOGY-NAME "topology-name-5" - TOPOLOGY-SUBMITTER-USER "userC" TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 @@ -589,7 +577,7 @@ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY} storm-topology5 2 - (mk-ed-map [["spout5" 0 40]])) + (mk-ed-map [["spout5" 0 40]]) "userC") epsilon 0.000001 topologies (Topologies. (to-top-map [topology1 topology2]))] @@ -661,7 +649,6 @@ storm-topology1 (.createTopology builder1) topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userA" TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 @@ -670,7 +657,7 @@ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY} storm-topology1 1 - (mk-ed-map [["spout1" 0 4]])) + (mk-ed-map [["spout1" 0 4]]) "userA") topologies (Topologies. (to-top-map [topology1]))] (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY}) @@ -687,7 +674,6 @@ storm-topology1 (.createTopology builder1) topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userC" TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 @@ -696,7 +682,7 @@ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY} storm-topology1 1 - (mk-ed-map [["spout1" 0 5]])) + (mk-ed-map [["spout1" 0 5]]) "userC") topologies (Topologies. (to-top-map [topology1]))] (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY}) @@ -715,7 +701,6 @@ _ (.setSpout builder1 "spout1" (TestWordSpout.) 2) storm-topology1 (.createTopology builder1) conf {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userC" TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 129.0 TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 @@ -726,7 +711,7 @@ conf storm-topology1 1 - (mk-ed-map [["spout1" 0 5]])) + (mk-ed-map [["spout1" 0 5]]) "userC") topologies (Topologies. (to-top-map [topology1]))] (is (thrown? IllegalArgumentException (StormSubmitter/submitTopologyWithProgressBar "test" conf storm-topology1))) http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/clj/org/apache/storm/scheduler_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler_test.clj index fc6e8e3..a6c5268 100644 --- a/storm-core/test/clj/org/apache/storm/scheduler_test.clj +++ b/storm-core/test/clj/org/apache/storm/scheduler_test.clj @@ -69,13 +69,13 @@ executor2 (ExecutorDetails. (int 6) (int 10)) topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) 1 {executor1 "spout1" - executor2 "bolt1"}) + executor2 "bolt1"} "user") ;; test topology.selectExecutorToComponent executor->comp (.selectExecutorToComponent topology1 (list executor1)) _ (is (= (clojurify-executor->comp {executor1 "spout1"}) (clojurify-executor->comp executor->comp))) ;; test topologies.getById - topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) 1 {}) + topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) 1 {} "user") topologies (Topologies. {"topology1" topology1 "topology2" topology2}) _ (is (= "topology1" (->> "topology1" (.getById topologies) @@ -104,19 +104,19 @@ 2 {executor1 "spout1" executor2 "bolt1" - executor3 "bolt2"}) + executor3 "bolt2"} "user") ;; topology2 is fully scheduled topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) 2 {executor11 "spout11" - executor12 "bolt12"}) + executor12 "bolt12"} "user") ;; topology3 needs scheduling, since the assignment is squeezed topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3"} (StormTopology.) 2 {executor21 "spout21" - executor22 "bolt22"}) + executor22 "bolt22"} "user") topologies (Topologies. {"topology1" topology1 "topology2" topology2 "topology3" topology3}) executor->slot1 {executor1 (WorkerSlot. "supervisor1" (int 1)) executor2 (WorkerSlot. "supervisor2" (int 2))} http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java index 0958d4d..07427bd 100644 --- a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java +++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java @@ -160,8 +160,7 @@ public class ContainerTest { final List<String> topoGroups = Arrays.asList("t-group-a", "t-group-b"); final List<String> logGroups = Arrays.asList("l-group-a", "l-group-b"); - - topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user); + topoConf.put(Config.LOGS_GROUPS, logGroups); topoConf.put(Config.TOPOLOGY_GROUPS, topoGroups); topoConf.put(Config.LOGS_USERS, logUsers); @@ -181,6 +180,7 @@ public class ContainerTest { LocalAssignment la = new LocalAssignment(); la.set_topology_id(topoId); + la.set_owner(user); MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, "SUPERVISOR", 8080, la, workerId, topoConf, ops); @@ -233,7 +233,6 @@ public class ContainerTest { final File workerPidsRoot = new File(workerRoot, "pids"); final Map<String, Object> topoConf = new HashMap<>(); - topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user); final Map<String, Object> superConf = new HashMap<>(); superConf.put(Config.STORM_LOCAL_DIR, stormLocal); @@ -248,6 +247,7 @@ public class ContainerTest { when(ops.getWriter(logMetadataFile)).thenReturn(yamlDump); LocalAssignment la = new LocalAssignment(); + la.set_owner(user); la.set_topology_id(topoId); MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, "SUPERVISOR", port, la, workerId, topoConf, ops); http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java index ed2b4ff..fe8392a 100644 --- a/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java +++ b/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java @@ -50,8 +50,10 @@ public class AsyncLocalizerTest { @Test public void testRequestDownloadBaseTopologyBlobs() throws Exception { final String topoId = "TOPO"; + final String user = "user"; LocalAssignment la = new LocalAssignment(); la.set_topology_id(topoId); + la.set_owner(user); ExecutorInfo ei = new ExecutorInfo(); ei.set_task_start(1); ei.set_task_end(1); @@ -97,7 +99,7 @@ public class AsyncLocalizerTest { //Extracting the dir from the jar verify(mockedU).extractDirFromJarImpl(endsWith("stormjar.jar"), eq("resources"), any(File.class)); verify(ops).moveDirectoryPreferAtomic(any(File.class), eq(fStormRoot)); - verify(ops).setupStormCodeDir(topoConf, fStormRoot); + verify(ops).setupStormCodeDir(user, fStormRoot); verify(ops, never()).deleteIfExists(any(File.class)); } finally { @@ -110,15 +112,16 @@ public class AsyncLocalizerTest { @Test public void testRequestDownloadTopologyBlobs() throws Exception { final String topoId = "TOPO-12345"; + final String user = "user"; LocalAssignment la = new LocalAssignment(); la.set_topology_id(topoId); + la.set_owner(user); ExecutorInfo ei = new ExecutorInfo(); ei.set_task_start(1); ei.set_task_end(1); la.add_to_executors(ei); final String topoName = "TOPO"; final int port = 8080; - final String user = "user"; final String simpleLocalName = "simple.txt"; final String simpleKey = "simple"; @@ -150,7 +153,6 @@ public class AsyncLocalizerTest { Map<String, Object> topoConf = new HashMap<>(conf); topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap); - topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user); topoConf.put(Config.TOPOLOGY_NAME, topoName); List<LocalizedResource> localizedList = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java index 44f2136..cecc6d1 100644 --- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java +++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java @@ -79,7 +79,6 @@ public class TestResourceAwareScheduler { defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0); defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0); defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0); - defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo"); } @Test @@ -103,7 +102,8 @@ public class TestResourceAwareScheduler { Assert.assertEquals(0, node.totalSlotsUsed()); Assert.assertEquals(4, node.totalSlots()); - TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 0, 2, 0, 0, 0); + TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 0, 2, 0, 0, 0, + "user"); List<ExecutorDetails> executors11 = new ArrayList<>(); executors11.add(new ExecutorDetails(1, 1)); @@ -123,7 +123,8 @@ public class TestResourceAwareScheduler { Assert.assertEquals(2, node.totalSlotsUsed()); Assert.assertEquals(4, node.totalSlots()); - TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 0, 2, 0, 0, 0); + TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 0, 2, 0, 0, 0, + "user"); List<ExecutorDetails> executors21 = new ArrayList<>(); executors21.add(new ExecutorDetails(1, 1)); @@ -165,7 +166,7 @@ public class TestResourceAwareScheduler { Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); ResourceAwareScheduler rs = new ResourceAwareScheduler(); - TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 0, 0); + TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 0, 0, "user"); Map<String, TopologyDetails> topoMap = new HashMap<>(); topoMap.put(topology1.getId(), topology1); Topologies topologies = new Topologies(topoMap); @@ -208,14 +209,14 @@ public class TestResourceAwareScheduler { Config config = new Config(); config.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1); - TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0); + TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0, "user"); TopologyBuilder builder2 = new TopologyBuilder(); // a topology with two unconnected partitions builder2.setSpout("wordSpoutX", new TestWordSpout(), 1); builder2.setSpout("wordSpoutY", new TestWordSpout(), 1); StormTopology stormTopology2 = builder2.createTopology(); Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2); - TopologyDetails topology2 = new TopologyDetails("topology2", config, stormTopology2, 0, executorMap2, 0); + TopologyDetails topology2 = new TopologyDetails("topology2", config, stormTopology2, 0, executorMap2, 0, "user"); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); ResourceAwareScheduler rs = new ResourceAwareScheduler(); @@ -271,7 +272,7 @@ public class TestResourceAwareScheduler { Config config = new Config(); config.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1); - TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0); + TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0, "user"); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); ResourceAwareScheduler rs = new ResourceAwareScheduler(); @@ -319,7 +320,7 @@ public class TestResourceAwareScheduler { Config config = new Config(); config.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1); - TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 2, executorMap1, 0); + TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 2, executorMap1, 0, "user"); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); ResourceAwareScheduler rs = new ResourceAwareScheduler(); @@ -409,7 +410,7 @@ public class TestResourceAwareScheduler { Config config1 = new Config(); config1.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1); - TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 3, executorMap1, 0); + TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 3, executorMap1, 0, "user"); TopologyBuilder builder2 = new TopologyBuilder(); builder2.setSpout("wordSpout2", new TestWordSpout(), 2); @@ -419,7 +420,7 @@ public class TestResourceAwareScheduler { // memory requirement is large enough so that two executors can not be fully assigned to one node config2.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 1280.0); Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2); - TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 2, executorMap2, 0); + TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 2, executorMap2, 0, "user"); // Test1: When a worker fails, RAS does not alter existing assignments on healthy workers Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1); @@ -567,7 +568,7 @@ public class TestResourceAwareScheduler { Config config1 = new Config(); config1.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1); - TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 1, executorMap1, 0); + TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 1, executorMap1, 0, "user"); // topo2 has 4 large tasks TopologyBuilder builder2 = new TopologyBuilder(); @@ -576,7 +577,7 @@ public class TestResourceAwareScheduler { Config config2 = new Config(); config2.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2); - TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0); + TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0, "user"); // topo3 has 4 large tasks TopologyBuilder builder3 = new TopologyBuilder(); @@ -585,7 +586,7 @@ public class TestResourceAwareScheduler { Config config3 = new Config(); config3.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap3 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology3); - TopologyDetails topology3 = new TopologyDetails("topology3", config2, stormTopology3, 1, executorMap3, 0); + TopologyDetails topology3 = new TopologyDetails("topology3", config2, stormTopology3, 1, executorMap3, 0, "user"); // topo4 has 12 small tasks, whose mem usage does not exactly divide a node's mem capacity TopologyBuilder builder4 = new TopologyBuilder(); @@ -594,7 +595,7 @@ public class TestResourceAwareScheduler { Config config4 = new Config(); config4.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap4 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology4); - TopologyDetails topology4 = new TopologyDetails("topology4", config4, stormTopology4, 1, executorMap4, 0); + TopologyDetails topology4 = new TopologyDetails("topology4", config4, stormTopology4, 1, executorMap4, 0, "user"); // topo5 has 40 small tasks, it should be able to exactly use up both the cpu and mem in the cluster TopologyBuilder builder5 = new TopologyBuilder(); @@ -603,7 +604,7 @@ public class TestResourceAwareScheduler { Config config5 = new Config(); config5.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap5 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology5); - TopologyDetails topology5 = new TopologyDetails("topology5", config5, stormTopology5, 1, executorMap5, 0); + TopologyDetails topology5 = new TopologyDetails("topology5", config5, stormTopology5, 1, executorMap5, 0, "user"); // Test1: Launch topo 1-3 together, it should be able to use up either mem or cpu resource due to exact division Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1); @@ -691,7 +692,7 @@ public class TestResourceAwareScheduler { config1.putAll(defaultTopologyConf); config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0); Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1); - TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 1, executorMap1, 0); + TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 1, executorMap1, 0, "user"); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1); ResourceAwareScheduler rs = new ResourceAwareScheduler(); Map<String, TopologyDetails> topoMap = new HashMap<>(); @@ -713,7 +714,7 @@ public class TestResourceAwareScheduler { config2.putAll(defaultTopologyConf); config2.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0); Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2); - TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0); + TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0, "user"); cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config2); topoMap = new HashMap<>(); topoMap.put(topology2.getId(), topology2); @@ -769,16 +770,17 @@ public class TestResourceAwareScheduler { config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); - config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10, + "jerry"); + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20, + "jerry"); + TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20, + "jerry"); - TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10); - TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20); - TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20); - - config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby"); - - TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10); - TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 20); + TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10, + "bobby"); + TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 20, + "bobby"); Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); topoMap.put(topo1.getId(), topo1); @@ -828,8 +830,6 @@ public class TestResourceAwareScheduler { config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 128.0); config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 0.0); - config.put(Config.TOPOLOGY_SUBMITTER_USER, TOPOLOGY_SUBMITTER); - Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>(); resourceUserPool.put("jerry", new HashMap<String, Number>()); resourceUserPool.get("jerry").put("cpu", 1000); @@ -845,11 +845,16 @@ public class TestResourceAwareScheduler { config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool); - TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20); - TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 30); - TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 30); - TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20); - TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 30); + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, + 20, TOPOLOGY_SUBMITTER); + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, + 30, TOPOLOGY_SUBMITTER); + TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, + 30, TOPOLOGY_SUBMITTER); + TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, + 20, TOPOLOGY_SUBMITTER); + TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, + 30, TOPOLOGY_SUBMITTER); Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); topoMap.put(topo1.getId(), topo1); @@ -894,7 +899,8 @@ public class TestResourceAwareScheduler { LOG.info("{} - {}", topo.getName(), queue); Assert.assertEquals("check order", topo.getName(), "topo-2"); - TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 30, 10); + TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 30, + 10, TOPOLOGY_SUBMITTER); topoMap.put(topo6.getId(), topo6); topologies = new Topologies(topoMap); @@ -954,29 +960,38 @@ public class TestResourceAwareScheduler { config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); - config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); - - TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20); - TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29); - TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 29); - TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20); - TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 29); - - config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby"); - - TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 20); - TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 29); - TopologyDetails topo8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16, 29); - TopologyDetails topo9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16, 20); - TopologyDetails topo10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24, 29); - - config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek"); - - TopologyDetails topo11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, currentTime - 2, 20); - TopologyDetails topo12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, currentTime - 8, 29); - TopologyDetails topo13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, currentTime - 16, 29); - TopologyDetails topo14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, currentTime - 16, 20); - TopologyDetails topo15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, currentTime - 24, 29); + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20, + "jerry"); + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29, + "jerry"); + TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, + 29, "jerry"); + TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, + 20, "jerry"); + TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, + 29, "jerry"); + + TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 20, + "bobby"); + TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 29, + "bobby"); + TopologyDetails topo8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16, + 29, "bobby"); + TopologyDetails topo9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16, + 20, "bobby"); + TopologyDetails topo10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24, + 29, "bobby"); + + TopologyDetails topo11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, currentTime - 2, + 20, "derek"); + TopologyDetails topo12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, currentTime - 8, + 29, "derek"); + TopologyDetails topo13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, currentTime - 16, + 29, "derek"); + TopologyDetails topo14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, currentTime - 16, + 20, "derek"); + TopologyDetails topo15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, currentTime - 24, + 29, "derek"); Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); topoMap.put(topo1.getId(), topo1); @@ -1040,10 +1055,10 @@ public class TestResourceAwareScheduler { config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); - config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); - - TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20); - TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29); + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20, + "jerry"); + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29, + "jerry"); Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); topoMap.put(topo1.getId(), topo1); @@ -1103,20 +1118,20 @@ public class TestResourceAwareScheduler { config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); - config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); - - TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20); - TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20); - - config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby"); + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20, + "jerry"); + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20, + "jerry"); - TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 10); - TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10); + TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 10, + "bobby"); + TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10, + "bobby"); - config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek"); - - TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29); - TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10); + TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29, + "derek"); + TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10, + "derek"); Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); topoMap.put(topo1.getId(), topo1); @@ -1228,8 +1243,10 @@ public class TestResourceAwareScheduler { config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); - TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, currentTime - 2, 29); - TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 2, 0, currentTime - 2, 10); + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, currentTime - 2, 29, + "user"); + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 2, 0, currentTime - 2, 10, + "user"); Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); topoMap.put(topo1.getId(), topo1); @@ -1283,11 +1300,12 @@ public class TestResourceAwareScheduler { Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); - config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); - - TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, currentTime - 2, 10); - TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 2, 0, 2, 0, currentTime - 2, 20); - TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 2, 1, 1, currentTime - 2, 20); + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, currentTime - 2, 10, + "jerry"); + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 2, 0, 2, 0, currentTime - 2, 20, + "jerry"); + TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 2, 1, 1, currentTime - 2, 20, + "jerry"); Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); topoMap.put(topo1.getId(), topo1); @@ -1343,13 +1361,10 @@ public class TestResourceAwareScheduler { StormTopology stormTopology = builder.createTopology(); TopologyDetails topo = new TopologyDetails("topo-1", config, stormTopology, - 0, - genExecsAndComps(stormTopology), 0); + 0, genExecsAndComps(stormTopology), 0, "jerry"); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); - config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); - Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); topoMap.put(topo.getId(), topo); http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java index 93e4b75..ce4d707 100644 --- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java +++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java @@ -42,7 +42,7 @@ public class TestUser { Config config = new Config(); config.putAll(Utils.readDefaultConfig()); - List<TopologyDetails> topos = TestUtilsForResourceAwareScheduler.getListOfTopologies(config); + List<TopologyDetails> topos = TestUtilsForResourceAwareScheduler.getListOfTopologies(config, "user1"); User user1 = new User("user1"); for (TopologyDetails topo : topos) { @@ -64,7 +64,7 @@ public class TestUser { Config config = new Config(); config.putAll(Utils.readDefaultConfig()); - List<TopologyDetails> topos = TestUtilsForResourceAwareScheduler.getListOfTopologies(config); + List<TopologyDetails> topos = TestUtilsForResourceAwareScheduler.getListOfTopologies(config, "user1"); User user1 = new User("user1"); for (TopologyDetails topo : topos) { @@ -95,7 +95,8 @@ public class TestUser { config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 200); config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 200); - TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 1, 2, 1, Time.currentTimeSecs() - 24, 9); + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 1, 2, 1, + Time.currentTimeSecs() - 24, 9, "user1"); User user1 = new User("user1", resourceGuaranteeMap); http://git-wip-us.apache.org/repos/asf/storm/blob/9980b68d/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java index bc69725..f745621 100644 --- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java +++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java @@ -63,20 +63,20 @@ public class TestUtilsForResourceAwareScheduler { private static final Logger LOG = LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class); - public static List<TopologyDetails> getListOfTopologies(Config config) { - - List<TopologyDetails> topos = new LinkedList<TopologyDetails>(); - - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 30)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 30)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 30)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 0)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 0)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16, 15)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16, 8)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24, 9)); + public static List<TopologyDetails> getListOfTopologies(Config config, String user) { + + List<TopologyDetails> topos = new LinkedList<>(); + + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20, user)); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 30, user)); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 30, user)); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20, user)); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 30, user)); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 0, user)); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 0, user )); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16, 15, user)); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16, 8, user)); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24, 9, user)); return topos; } @@ -140,8 +140,8 @@ public class TestUtilsForResourceAwareScheduler { return retMap; } - public static TopologyDetails getTopology(String name, Map config, int numSpout, int numBolt, - int spoutParallelism, int boltParallelism, int launchTime, int priority) { + public static TopologyDetails getTopology(String name, Map<String, Object> config, int numSpout, int numBolt, + int spoutParallelism, int boltParallelism, int launchTime, int priority, String owner) { Config conf = new Config(); conf.putAll(config); @@ -151,7 +151,7 @@ public class TestUtilsForResourceAwareScheduler { StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism); TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology, 0, - genExecsAndComps(topology), launchTime); + genExecsAndComps(topology), launchTime, owner); return topo; }
