http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj index 339d5b3..99aeb84 100644 --- a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj +++ b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj @@ -138,7 +138,7 @@ 2 {executor1 "spout1" executor2 "bolt1" - executor3 "bolt2"})] + executor3 "bolt2"} "user")] ;; assign one node so it is not in the pool (.assign (.get node-map "super0") "topology1" (list executor1) cluster) (.init free-pool cluster node-map) @@ -175,7 +175,7 @@ 5 {executor1 "spout1" executor2 "bolt1" - executor3 "bolt2"})] + executor3 "bolt2"} "user")] ;; assign one node so it is not in the pool (.assign (.get node-map "super0") "topology1" (list executor1) cluster) (.init free-pool cluster node-map) @@ -216,7 +216,7 @@ executor2 "bolt1" executor3 "bolt1" executor4 "bolt1" - executor5 "bolt2"})] + executor5 "bolt2"} "user")] ;; assign one node so it is not in the pool (.assign (.get node-map "super0") "topology1" (list executor1) cluster) (.init free-pool cluster node-map) @@ -255,7 +255,7 @@ executor2 "bolt1" executor3 "bolt2" executor4 "bolt3" - executor5 "bolt4"})] + executor5 "bolt4"} "user")] (let [node-map (Node/getAllNodesFrom single-cluster) free-pool (FreePool. ) default-pool (DefaultPool. )] @@ -302,7 +302,7 @@ 2 {executor1 "spout1" executor2 "bolt1" - executor3 "bolt2"}) + executor3 "bolt2"} "user") topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) @@ -310,7 +310,7 @@ {executor11 "spout11" executor12 "bolt12" executor13 "bolt13" - executor14 "bolt14"})] + executor14 "bolt14"} "user")] ;; assign one node so it is not in the pool (.assign (.get node-map "super0") "topology1" (list executor1) cluster) (.init free-pool cluster node-map) @@ -383,7 +383,7 @@ {executor1 "spout1" executor2 "bolt1" executor3 "bolt2" - executor4 "bolt4"})] + executor4 "bolt4"} "user")] ;; assign one node so it is not in the pool (.assign (.get node-map "super0") "topology1" (list executor1) cluster) (.init free-pool cluster node-map) @@ -427,7 +427,7 @@ {executor1 "spout1" executor2 "bolt1" executor3 "bolt2" - executor4 "bolt4"})] + executor4 "bolt4"} "user")] ;; assign one node so it is not in the pool (.assign (.get node-map "super0") "topology1" (list executor1) cluster) (.init free-pool cluster node-map) @@ -474,7 +474,7 @@ {executor1 "spout1" executor2 "bolt1" executor3 "bolt2" - executor4 "bolt4"}) + executor4 "bolt4"} "user") topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2" TOPOLOGY-ISOLATED-MACHINES 2} @@ -483,7 +483,7 @@ {executor11 "spout11" executor12 "bolt12" executor13 "bolt13" - executor14 "bolt14"})] + executor14 "bolt14"} "user")] ;; assign one node so it is not in the pool (.assign (.get node-map "super0") "topology1" (list executor1) cluster) (.init free-pool cluster node-map) @@ -579,7 +579,7 @@ {executor1 "spout1" executor2 "bolt1" executor3 "bolt2" - executor4 "bolt4"}) + executor4 "bolt4"} "user") topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2" TOPOLOGY-ISOLATED-MACHINES 2} @@ -588,7 +588,7 @@ {executor11 "spout11" executor12 "bolt12" executor13 "bolt13" - executor14 "bolt14"})] + executor14 "bolt14"} "user")] ;; assign one node so it is not in the pool (.assign (.get node-map "super0") "topology1" (list executor1) cluster) (.init free-pool cluster node-map) @@ -629,34 +629,31 @@ (deftest test-multitenant-scheduler (let [supers (gen-supervisors 10) topology1 (TopologyDetails. "topology1" - {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userC"} + {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) 4 (mk-ed-map [["spout1" 0 5] ["bolt1" 5 10] ["bolt2" 10 15] - ["bolt3" 15 20]])) + ["bolt3" 15 20]]) "userC") topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2" - TOPOLOGY-ISOLATED-MACHINES 2 - TOPOLOGY-SUBMITTER-USER "userA"} + TOPOLOGY-ISOLATED-MACHINES 2} (StormTopology.) 4 (mk-ed-map [["spout11" 0 5] ["bolt12" 5 6] ["bolt13" 6 7] - ["bolt14" 7 10]])) + ["bolt14" 7 10]]) "userA") topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3" - TOPOLOGY-ISOLATED-MACHINES 5 - TOPOLOGY-SUBMITTER-USER "userB"} + TOPOLOGY-ISOLATED-MACHINES 5} (StormTopology.) 10 (mk-ed-map [["spout21" 0 10] ["bolt22" 10 20] ["bolt23" 20 30] - ["bolt24" 30 40]])) + ["bolt24" 30 40]]) "userB") cluster (Cluster. (Nimbus$StandaloneINimbus.) supers {} nil) node-map (Node/getAllNodesFrom cluster) topologies (Topologies. (to-top-map [topology1 topology2 topology3])) @@ -682,14 +679,13 @@ (deftest test-force-free-slot-in-bad-state (let [supers (gen-supervisors 1) topology1 (TopologyDetails. "topology1" - {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userC"} + {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) 4 (mk-ed-map [["spout1" 0 5] ["bolt1" 5 10] ["bolt2" 10 15] - ["bolt3" 15 20]])) + ["bolt3" 15 20]]) "userC") existing-assignments { "topology1" (SchedulerAssignmentImpl. "topology1" {(ExecutorDetails. 0 5) (WorkerSlot. "super0" 1) (ExecutorDetails. 5 10) (WorkerSlot. "super0" 20) @@ -719,25 +715,22 @@ (testing "Assiging same worker slot to different topologies is bad state" (let [supers (gen-supervisors 5) topology1 (TopologyDetails. "topology1" - {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userC"} + {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) 1 - (mk-ed-map [["spout1" 0 1]])) + (mk-ed-map [["spout1" 0 1]]) "userC") topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2" - TOPOLOGY-ISOLATED-MACHINES 2 - TOPOLOGY-SUBMITTER-USER "userA"} + TOPOLOGY-ISOLATED-MACHINES 2} (StormTopology.) 2 - (mk-ed-map [["spout11" 1 2]["bolt11" 3 4]])) + (mk-ed-map [["spout11" 1 2]["bolt11" 3 4]]) "userA") topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3" - TOPOLOGY-ISOLATED-MACHINES 1 - TOPOLOGY-SUBMITTER-USER "userB"} + TOPOLOGY-ISOLATED-MACHINES 1} (StormTopology.) 1 - (mk-ed-map [["spout21" 2 3]])) + (mk-ed-map [["spout21" 2 3]]) "userB") worker-slot-with-multiple-assignments (WorkerSlot. "super1" 1) existing-assignments {"topology2" (SchedulerAssignmentImpl. "topology2" {(ExecutorDetails. 1 1) worker-slot-with-multiple-assignments}) "topology3" (SchedulerAssignmentImpl. "topology3" {(ExecutorDetails. 2 2) worker-slot-with-multiple-assignments})} @@ -761,11 +754,10 @@ (let [supers (gen-supervisors 1) port-not-reported-by-supervisor 6 topology1 (TopologyDetails. "topology1" - {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userA"} + {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) 1 - (mk-ed-map [["spout11" 0 1]])) + (mk-ed-map [["spout11" 0 1]]) "userA") existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1" {(ExecutorDetails. 0 0) (WorkerSlot. "super0" port-not-reported-by-supervisor)})} @@ -787,19 +779,17 @@ dead-supervisor "super1" port-not-reported-by-supervisor 6 topology1 (TopologyDetails. "topology1" - {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userA"} + {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) 2 (mk-ed-map [["spout11" 0 1] - ["bolt12" 1 2]])) + ["bolt12" 1 2]]) "userA") topology2 (TopologyDetails. "topology2" - {TOPOLOGY-NAME "topology-name-2" - TOPOLOGY-SUBMITTER-USER "userA"} + {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) 2 (mk-ed-map [["spout21" 4 5] - ["bolt22" 5 6]])) + ["bolt22" 5 6]]) "userA") worker-slot-with-multiple-assignments (WorkerSlot. dead-supervisor 1) existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1" @@ -837,11 +827,10 @@ supers {"super1" super1 "super2" super2} topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1" - TOPOLOGY-SUBMITTER-USER "userA" TOPOLOGY-ISOLATED-MACHINES 1} (StormTopology.) 7 - (mk-ed-map [["spout21" 0 7]])) + (mk-ed-map [["spout21" 0 7]]) "userA") existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1" {(ExecutorDetails. 0 0) (WorkerSlot. "super1" 1)
http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-core/test/clj/org/apache/storm/scheduler_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler_test.clj index 1c479a7..acd1f9f 100644 --- a/storm-core/test/clj/org/apache/storm/scheduler_test.clj +++ b/storm-core/test/clj/org/apache/storm/scheduler_test.clj @@ -69,13 +69,13 @@ executor2 (ExecutorDetails. (int 6) (int 10)) topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) 1 {executor1 "spout1" - executor2 "bolt1"}) + executor2 "bolt1"} "user") ;; test topology.selectExecutorToComponent executor->comp (.selectExecutorToComponent topology1 (list executor1)) _ (is (= (clojurify-executor->comp {executor1 "spout1"}) (clojurify-executor->comp executor->comp))) ;; test topologies.getById - topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) 1 {}) + topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) 1 {} "user") topologies (Topologies. {"topology1" topology1 "topology2" topology2}) _ (is (= "topology1" (->> "topology1" (.getById topologies) @@ -104,19 +104,19 @@ 2 {executor1 "spout1" executor2 "bolt1" - executor3 "bolt2"}) + executor3 "bolt2"} "user") ;; topology2 is fully scheduled topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) 2 {executor11 "spout11" - executor12 "bolt12"}) + executor12 "bolt12"} "user") ;; topology3 needs scheduling, since the assignment is squeezed topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3"} (StormTopology.) 2 {executor21 "spout21" - executor22 "bolt22"}) + executor22 "bolt22"} "user") topologies (Topologies. {"topology1" topology1 "topology2" topology2 "topology3" topology3}) executor->slot1 {executor1 (WorkerSlot. "supervisor1" (int 1)) executor2 (WorkerSlot. "supervisor2" (int 2))} http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-core/test/jvm/org/apache/storm/MockAutoCred.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/MockAutoCred.java b/storm-core/test/jvm/org/apache/storm/MockAutoCred.java index 457ac50..1a746ca 100644 --- a/storm-core/test/jvm/org/apache/storm/MockAutoCred.java +++ b/storm-core/test/jvm/org/apache/storm/MockAutoCred.java @@ -42,7 +42,7 @@ public class MockAutoCred implements INimbusCredentialPlugin, IAutoCredentials, } @Override - public void populateCredentials(Map<String, String> credentials, Map<String, Object> conf) { + public void populateCredentials(Map<String, String> credentials, Map<String, Object> topoConf) { credentials.put(NIMBUS_CRED_KEY, NIMBUS_CRED_VAL); } @@ -58,7 +58,7 @@ public class MockAutoCred implements INimbusCredentialPlugin, IAutoCredentials, } @Override - public void renew(Map<String, String> credentials, Map<String, Object> topologyConf) { + public void renew(Map<String, String> credentials, Map<String, Object> topologyConf, String ownerPrincipal) { credentials.put(NIMBUS_CRED_KEY, NIMBUS_CRED_RENEW_VAL); credentials.put(GATEWAY_CRED_KEY, GATEWAY_CRED_RENEW_VAL); } http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index 31dc992..69cd649 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -1333,6 +1333,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { BlobStore store = blobStore; Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, store); StormTopology topo = readStormTopologyAsNimbus(topoId, store); + if (!base.is_set_principal()) { + fixupBase(base, topoConf); + stormClusterState.updateStorm(topoId, base); + } Map<List<Integer>, String> rawExecToComponent = computeExecutorToComponent(topoId, base); Map<ExecutorDetails, String> executorsToComponent = new HashMap<>(); for (Entry<List<Integer>, String> entry: rawExecToComponent.entrySet()) { @@ -1341,19 +1345,21 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { executorsToComponent.put(execDetails, entry.getValue()); } - return new TopologyDetails(topoId, topoConf, topo, base.get_num_workers(), executorsToComponent, base.get_launch_time_secs()); + return new TopologyDetails(topoId, topoConf, topo, base.get_num_workers(), executorsToComponent, base.get_launch_time_secs(), + base.get_owner()); } private void updateHeartbeats(String topoId, Set<List<Integer>> allExecutors, Assignment existingAssignment) { LOG.debug("Updating heartbeats for {} {}", topoId, allExecutors); IStormClusterState state = stormClusterState; Map<List<Integer>, Map<String, Object>> executorBeats = StatsUtil.convertExecutorBeats(state.executorBeats(topoId, existingAssignment.get_executor_node_port())); - Map<List<Integer>, Map<String, Object>> cache = StatsUtil.updateHeartbeatCache(heartbeatsCache.get().get(topoId), executorBeats, allExecutors, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS))); + Map<List<Integer>, Map<String, Object>> cache = StatsUtil.updateHeartbeatCache(heartbeatsCache.get().get(topoId), + executorBeats, allExecutors, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS))); heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache)); } /** - * update all the heartbeats for all the topologies' executors + * Update all the heartbeats for all the topologies' executors. * @param existingAssignments current assignments (thrift) * @param topologyToExecutors topology ID to executors. */ @@ -1369,8 +1375,6 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { Map<List<Integer>, Map<String, Object>> hbCache = heartbeatsCache.get().get(topoId); LOG.debug("NEW Computing alive executors for {}\nExecutors: {}\nAssignment: {}\nHeartbeat cache: {}", topoId, allExecutors, assignment, hbCache); - //TODO need to consider all executors associated with a dead executor (in same slot) dead as well, - // don't just rely on heartbeat being the same int taskLaunchSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_LAUNCH_SECS)); Set<List<Integer>> ret = new HashSet<>(); @@ -1759,7 +1763,15 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { // we exclude its assignment, meaning that all the slots occupied by its assignment // will be treated as free slot in the scheduler code. if (!id.equals(scratchTopoId)) { - existingAssignments.put(id, state.assignmentInfo(id, null)); + Assignment currentAssignment = state.assignmentInfo(id, null); + if (!currentAssignment.is_set_owner()) { + TopologyDetails td = tds.get(id); + if (td != null) { + currentAssignment.set_owner(td.getTopologySubmitter()); + state.setAssignment(id, currentAssignment); + } + } + existingAssignments.put(id, currentAssignment); } } // make the new assignments for topologies @@ -1842,6 +1854,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { workerResources.put(ni, resources); } newAssignment.set_worker_resources(workerResources); + TopologyDetails td = tds.get(topoId); + newAssignment.set_owner(td.getTopologySubmitter()); newAssignments.put(topoId, newAssignment); } @@ -1894,7 +1908,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } } - private void startTopology(String topoName, String topoId, TopologyStatus initStatus) throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException { + private void fixupBase(StormBase base, Map<String, Object> topoConf) { + base.set_owner((String) topoConf.get(Config.TOPOLOGY_SUBMITTER_USER)); + base.set_principal((String) topoConf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL)); + } + + private void startTopology(String topoName, String topoId, TopologyStatus initStatus, String owner, String principal) + throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException { assert(TopologyStatus.ACTIVE == initStatus || TopologyStatus.INACTIVE == initStatus); IStormClusterState state = stormClusterState; BlobStore store = blobStore; @@ -1911,7 +1931,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { base.set_status(initStatus); base.set_num_workers(ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_WORKERS), 0)); base.set_component_executors(numExecutors); - base.set_owner((String) topoConf.get(Config.TOPOLOGY_SUBMITTER_USER)); + base.set_owner(owner); + base.set_principal(principal); base.set_component_debug(new HashMap<>()); state.activateStorm(topoId, base); notifyTopologyActionListener(topoName, "activate"); @@ -2162,9 +2183,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { BlobStore store = blobStore; Collection<ICredentialsRenewer> renewers = credRenewers; Object lock = credUpdateLock; - List<String> assignedIds = state.activeStorms(); - if (assignedIds != null) { - for (String id: assignedIds) { + Map<String, StormBase> assignedBases = state.topologyBases(); + if (assignedBases != null) { + for (Entry<String, StormBase> entry: assignedBases.entrySet()) { + String id = entry.getKey(); + String ownerPrincipal = entry.getValue().get_principal(); Map<String, Object> topoConf = Collections.unmodifiableMap(merge(conf, tryReadTopoConf(id, store))); synchronized(lock) { Credentials origCreds = state.credentials(id, null); @@ -2172,8 +2195,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { Map<String, String> origCredsMap = origCreds.get_creds(); Map<String, String> newCredsMap = new HashMap<>(origCredsMap); for (ICredentialsRenewer renewer: renewers) { - LOG.info("Renewing Creds For {} with {}", id, renewer); - renewer.renew(newCredsMap, topoConf); + LOG.info("Renewing Creds For {} with {} owned by {}", id, renewer, ownerPrincipal); + renewer.renew(newCredsMap, topoConf, ownerPrincipal); } if (!newCredsMap.equals(origCredsMap)) { state.setCredentials(id, new Credentials(newCredsMap), topoConf); @@ -2340,7 +2363,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { ret.launchTimeSecs = 0; } ret.assignment = state.assignmentInfo(topoId, null); - ret.beats = Utils.OR(heartbeatsCache.get().get(topoId), Collections.<List<Integer>, Map<String, Object>>emptyMap()); + ret.beats = Utils.OR(heartbeatsCache.get().get(topoId), Collections.emptyMap()); ret.allComponents = new HashSet<>(ret.taskToComponent.values()); return ret; } @@ -2525,9 +2548,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { Set<String> topoAcl = new HashSet<>((List<String>)topoConf.getOrDefault(Config.TOPOLOGY_USERS, Collections.emptyList())); topoAcl.add(submitterPrincipal); topoAcl.add(submitterUser); - - topoConf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, Utils.OR(submitterPrincipal, "")); - topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, Utils.OR(submitterUser, systemUser)); //Don't let the user set who we launch as + + String topologyPrincipal = Utils.OR(submitterPrincipal, ""); + topoConf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, topologyPrincipal); + String topologyOwner = Utils.OR(submitterUser, systemUser); + topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, topologyOwner); //Don't let the user set who we launch as topoConf.put(Config.TOPOLOGY_USERS, new ArrayList<>(topoAcl)); topoConf.put(Config.STORM_ZOOKEEPER_SUPERACL, conf.get(Config.STORM_ZOOKEEPER_SUPERACL)); if (!Utils.isZkAuthenticationConfiguredStormServer(conf)) { @@ -2605,7 +2630,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { throw new IllegalArgumentException("Inital Status of " + options.get_initial_status() + " is not allowed."); } - startTopology(topoName, topoId, status); + startTopology(topoName, topoId, status, topologyOwner, topologyPrincipal); } } catch (Exception e) { LOG.warn("Topology submission exception. (topology name='{}')", topoName, e); http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java index f5eed43..9253169 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java @@ -402,7 +402,7 @@ public class BasicContainer extends Container { * Compute the classpath for the worker process * @param stormJar the topology jar * @param dependencyLocations any dependencies from the topology - * @param stormVerison the version of the storm framework to use + * @param topoVersion the version of the storm framework to use * @return the full classpath */ protected String getWorkerClassPath(String stormJar, List<String> dependencyLocations, SimpleVersion topoVersion) { http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java index f878507..bfd55da 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java @@ -321,7 +321,7 @@ public abstract class Container implements Killable { File workerArtifacts = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port)); if (!_ops.fileExists(workerArtifacts)) { _ops.forceMkdir(workerArtifacts); - _ops.setupWorkerArtifactsDir(_topoConf, workerArtifacts); + _ops.setupWorkerArtifactsDir(_assignment.get_owner(), workerArtifacts); } String user = getWorkerUser(); @@ -472,8 +472,8 @@ public abstract class Container implements Killable { if (_ops.fileExists(file)) { return _ops.slurpString(file).trim(); - } else if (_topoConf != null) { - return (String) _topoConf.get(Config.TOPOLOGY_SUBMITTER_USER); + } else if (_assignment != null && _assignment.is_set_owner()) { + return _assignment.get_owner(); } if (ConfigUtils.isLocalMode(_conf)) { return System.getProperty("user.name"); http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java index 8fdd47c..6acc4b5 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java @@ -276,6 +276,9 @@ public class ReadClusterState implements Runnable, AutoCloseable { if (slotsResources.containsKey(port)) { localAssignment.set_resources(slotsResources.get(port)); } + if (assignment.is_set_owner()) { + localAssignment.set_owner(assignment.get_owner()); + } portTasks.put(port.intValue(), localAssignment); } List<ExecutorInfo> executorInfoList = localAssignment.get_executors(); http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java index cc4a387..1f8d4c3 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java @@ -211,8 +211,20 @@ public class Supervisor implements DaemonCommon, AutoCloseable { this.readState = new ReadClusterState(this); Set<String> downloadedTopoIds = SupervisorUtils.readDownloadedTopologyIds(conf); - for (String topoId : downloadedTopoIds) { - SupervisorUtils.addBlobReferences(localizer, topoId, conf); + Map<Integer, LocalAssignment> portToAssignments = localState.getLocalAssignmentsMap(); + if (portToAssignments != null) { + Map<String, LocalAssignment> assignments = new HashMap<>(); + for (LocalAssignment la : localState.getLocalAssignmentsMap().values()) { + assignments.put(la.get_topology_id(), la); + } + for (String topoId : downloadedTopoIds) { + LocalAssignment la = assignments.get(topoId); + if (la != null) { + SupervisorUtils.addBlobReferences(localizer, topoId, conf, la.get_owner()); + } else { + LOG.warn("Could not find an owner for topo {}", topoId); + } + } } // do this after adding the references so we don't try to clean things being used localizer.startCleaner(); http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java index 8582fb4..09c2b5d 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java @@ -19,6 +19,7 @@ package org.apache.storm.daemon.supervisor; import org.apache.storm.Config; import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; import org.apache.storm.localizer.LocalResource; import org.apache.storm.localizer.Localizer; import org.apache.storm.utils.ConfigUtils; @@ -95,16 +96,16 @@ public class SupervisorUtils { } /** - * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the cache on restart. + * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the + * cache on restart. * * @param localizer * @param stormId * @param conf */ - static void addBlobReferences(Localizer localizer, String stormId, Map<String, Object> conf) throws IOException { + static void addBlobReferences(Localizer localizer, String stormId, Map<String, Object> conf, String user) throws IOException { Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId); Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); - String user = (String) topoConf.get(Config.TOPOLOGY_SUBMITTER_USER); String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME); List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); if (blobstoreMap != null) { http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java index af864e8..d0baf90 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java @@ -17,6 +17,7 @@ */ package org.apache.storm.daemon.supervisor.timer; +import java.util.HashMap; import org.apache.storm.Config; import org.apache.storm.daemon.supervisor.Supervisor; import org.apache.storm.daemon.supervisor.SupervisorUtils; @@ -59,15 +60,16 @@ public class UpdateBlobs implements Runnable { Map<String, Object> conf = supervisor.getConf(); Set<String> downloadedStormIds = SupervisorUtils.readDownloadedTopologyIds(conf); AtomicReference<Map<Long, LocalAssignment>> newAssignment = supervisor.getCurrAssignment(); - Set<String> assignedStormIds = new HashSet<>(); + Map<String, LocalAssignment> assignedStormIds = new HashMap<>(); for (LocalAssignment localAssignment : newAssignment.get().values()) { - assignedStormIds.add(localAssignment.get_topology_id()); + assignedStormIds.put(localAssignment.get_topology_id(), localAssignment); } for (String stormId : downloadedStormIds) { - if (assignedStormIds.contains(stormId)) { + LocalAssignment la = assignedStormIds.get(stormId); + if (la != null) { String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot); - updateBlobsForTopology(conf, stormId, supervisor.getLocalizer()); + updateBlobsForTopology(conf, stormId, supervisor.getLocalizer(), la.get_owner()); } } } catch (Exception e) { @@ -89,10 +91,9 @@ public class UpdateBlobs implements Runnable { * @param localizer * @throws IOException */ - private void updateBlobsForTopology(Map<String, Object> conf, String stormId, Localizer localizer) throws IOException { + private void updateBlobsForTopology(Map<String, Object> conf, String stormId, Localizer localizer, String user) throws IOException { Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId); Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); - String user = (String) topoConf.get(Config.TOPOLOGY_SUBMITTER_USER); List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); try { localizer.updateBlobs(localresources, user); http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java index 038114a..9dc473f 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java @@ -101,10 +101,12 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { private class DownloadBaseBlobsDistributed implements Callable<Void> { protected final String _topologyId; protected final File _stormRoot; - - public DownloadBaseBlobsDistributed(String topologyId) throws IOException { + protected final String owner; + + public DownloadBaseBlobsDistributed(String topologyId, String owner) throws IOException { _topologyId = topologyId; _stormRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf, _topologyId)); + this.owner = owner; } protected void downloadBaseBlobs(File tmproot) throws Exception { @@ -145,7 +147,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { try { downloadBaseBlobs(tr); _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot); - _fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, _topologyId), _stormRoot); + _fsOps.setupStormCodeDir(owner, _stormRoot); deleteAll = false; } finally { if (deleteAll) { @@ -164,8 +166,8 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { private class DownloadBaseBlobsLocal extends DownloadBaseBlobsDistributed { - public DownloadBaseBlobsLocal(String topologyId) throws IOException { - super(topologyId); + public DownloadBaseBlobsLocal(String topologyId, String owner) throws IOException { + super(topologyId, owner); } @Override @@ -209,9 +211,11 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { private class DownloadBlobs implements Callable<Void> { private final String _topologyId; + private final String topoOwner; - public DownloadBlobs(String topologyId) { + public DownloadBlobs(String topologyId, String topoOwner) { _topologyId = topologyId; + this.topoOwner = topoOwner; } @Override @@ -222,7 +226,6 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { @SuppressWarnings("unchecked") Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); - String user = (String) topoConf.get(Config.TOPOLOGY_SUBMITTER_USER); String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME); List<LocalResource> localResourceList = new ArrayList<>(); @@ -246,12 +249,12 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { } if (!localResourceList.isEmpty()) { - File userDir = _localizer.getLocalUserFileCacheDir(user); + File userDir = _localizer.getLocalUserFileCacheDir(topoOwner); if (!_fsOps.fileExists(userDir)) { _fsOps.forceMkdir(userDir); } - List<LocalizedResource> localizedResources = _localizer.getBlobs(localResourceList, user, topoName, userDir); - _fsOps.setupBlobPermissions(userDir, user); + List<LocalizedResource> localizedResources = _localizer.getBlobs(localResourceList, topoOwner, topoName, userDir); + _fsOps.setupBlobPermissions(userDir, topoOwner); if (!_symlinksDisabled) { for (LocalizedResource localizedResource : localizedResources) { String keyName = localizedResource.getKey(); @@ -309,9 +312,9 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { if (localResource == null) { Callable<Void> c; if (_isLocalMode) { - c = new DownloadBaseBlobsLocal(topologyId); + c = new DownloadBaseBlobsLocal(topologyId, assignment.get_owner()); } else { - c = new DownloadBaseBlobsDistributed(topologyId); + c = new DownloadBaseBlobsDistributed(topologyId, assignment.get_owner()); } localResource = new LocalDownloadedResource(_execService.submit(c)); _basicPending.put(topologyId, localResource); @@ -362,7 +365,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { final String topologyId = assignment.get_topology_id(); LocalDownloadedResource localResource = _blobPending.get(topologyId); if (localResource == null) { - Callable<Void> c = new DownloadBlobs(topologyId); + Callable<Void> c = new DownloadBlobs(topologyId, assignment.get_owner()); localResource = new LocalDownloadedResource(_execService.submit(c)); _blobPending.put(topologyId, localResource); } @@ -385,7 +388,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { @SuppressWarnings("unchecked") Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); if (blobstoreMap != null) { - String user = (String) topoConf.get(Config.TOPOLOGY_SUBMITTER_USER); + String user = assignment.get_owner(); String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME); for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) { http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java index e0f57cd..95cbf63 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java @@ -85,7 +85,7 @@ public class MultitenantScheduler implements IScheduler { defaultPool.init(cluster, nodeIdToNode); for (TopologyDetails td: topologies.getTopologies()) { - String user = (String)td.getConf().get(Config.TOPOLOGY_SUBMITTER_USER); + String user = td.getTopologySubmitter(); LOG.debug("Found top {} run by user {}",td.getId(), user); NodePool pool = userPools.get(user); if (pool == null || !pool.canAdd(td)) { http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/test/java/org/apache/storm/MessagingTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/MessagingTest.java b/storm-server/src/test/java/org/apache/storm/MessagingTest.java index 5686bc3..b0b6625 100644 --- a/storm-server/src/test/java/org/apache/storm/MessagingTest.java +++ b/storm-server/src/test/java/org/apache/storm/MessagingTest.java @@ -62,7 +62,4 @@ public class MessagingTest { Assert.assertEquals(6 * 4, Testing.readTuples(results, "2").size()); } } - - - } http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java index 2159b4a..35c5790 100644 --- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java +++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java @@ -162,8 +162,7 @@ public class ContainerTest { final List<String> topoGroups = Arrays.asList("t-group-a", "t-group-b"); final List<String> logGroups = Arrays.asList("l-group-a", "l-group-b"); - - topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user); + topoConf.put(DaemonConfig.LOGS_GROUPS, logGroups); topoConf.put(Config.TOPOLOGY_GROUPS, topoGroups); topoConf.put(DaemonConfig.LOGS_USERS, logUsers); @@ -183,6 +182,7 @@ public class ContainerTest { LocalAssignment la = new LocalAssignment(); la.set_topology_id(topoId); + la.set_owner(user); MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, "SUPERVISOR", 8080, la, null, workerId, topoConf, ops); @@ -235,7 +235,6 @@ public class ContainerTest { final File workerPidsRoot = new File(workerRoot, "pids"); final Map<String, Object> topoConf = new HashMap<>(); - topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user); final Map<String, Object> superConf = new HashMap<>(); superConf.put(Config.STORM_LOCAL_DIR, stormLocal); @@ -252,6 +251,7 @@ public class ContainerTest { ResourceIsolationInterface iso = mock(ResourceIsolationInterface.class); LocalAssignment la = new LocalAssignment(); + la.set_owner(user); la.set_topology_id(topoId); MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, "SUPERVISOR", port, la, iso, workerId, topoConf, ops); http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java index c04cfbe..2461b33 100644 --- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java +++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java @@ -47,8 +47,10 @@ public class AsyncLocalizerTest { @Test public void testRequestDownloadBaseTopologyBlobs() throws Exception { final String topoId = "TOPO"; + final String user = "user"; LocalAssignment la = new LocalAssignment(); la.set_topology_id(topoId); + la.set_owner(user); ExecutorInfo ei = new ExecutorInfo(); ei.set_task_start(1); ei.set_task_end(1); @@ -96,7 +98,7 @@ public class AsyncLocalizerTest { //Extracting the dir from the jar verify(mockedU).extractDirFromJarImpl(endsWith("stormjar.jar"), eq("resources"), any(File.class)); verify(ops).moveDirectoryPreferAtomic(any(File.class), eq(fStormRoot)); - verify(ops).setupStormCodeDir(topoConf, fStormRoot); + verify(ops).setupStormCodeDir(user, fStormRoot); verify(ops, never()).deleteIfExists(any(File.class)); } finally { @@ -110,15 +112,16 @@ public class AsyncLocalizerTest { @Test public void testRequestDownloadTopologyBlobs() throws Exception { final String topoId = "TOPO-12345"; + final String user = "user"; LocalAssignment la = new LocalAssignment(); la.set_topology_id(topoId); + la.set_owner(user); ExecutorInfo ei = new ExecutorInfo(); ei.set_task_start(1); ei.set_task_end(1); la.add_to_executors(ei); final String topoName = "TOPO"; final int port = 8080; - final String user = "user"; final String simpleLocalName = "simple.txt"; final String simpleKey = "simple"; @@ -149,7 +152,6 @@ public class AsyncLocalizerTest { Map<String, Object> topoConf = new HashMap<>(conf); topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap); - topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user); topoConf.put(Config.TOPOLOGY_NAME, topoName); List<LocalizedResource> localizedList = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java index e81f7bf..65cad2d 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java @@ -80,7 +80,6 @@ public class TestResourceAwareScheduler { defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0); defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0); defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0); - defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo"); } @Test @@ -104,7 +103,8 @@ public class TestResourceAwareScheduler { Assert.assertEquals(0, node.totalSlotsUsed()); Assert.assertEquals(4, node.totalSlots()); - TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap<>(), 1, 0, 2, 0, 0, 0); + TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap<>(), 1, 0, 2, 0, 0, 0, + "user"); List<ExecutorDetails> executors11 = new ArrayList<>(); executors11.add(new ExecutorDetails(1, 1)); @@ -124,7 +124,8 @@ public class TestResourceAwareScheduler { Assert.assertEquals(2, node.totalSlotsUsed()); Assert.assertEquals(4, node.totalSlots()); - TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap<>(), 1, 0, 2, 0, 0, 0); + TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap<>(), 1, 0, 2, 0, 0, 0, + "user"); List<ExecutorDetails> executors21 = new ArrayList<>(); executors21.add(new ExecutorDetails(1, 1)); @@ -166,7 +167,7 @@ public class TestResourceAwareScheduler { Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); ResourceAwareScheduler rs = new ResourceAwareScheduler(); - TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 0, 0); + TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 0, 0, "user"); Map<String, TopologyDetails> topoMap = new HashMap<>(); topoMap.put(topology1.getId(), topology1); Topologies topologies = new Topologies(topoMap); @@ -209,14 +210,14 @@ public class TestResourceAwareScheduler { Config config = new Config(); config.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1); - TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0); + TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0, "user"); TopologyBuilder builder2 = new TopologyBuilder(); // a topology with two unconnected partitions builder2.setSpout("wordSpoutX", new TestWordSpout(), 1); builder2.setSpout("wordSpoutY", new TestWordSpout(), 1); StormTopology stormTopology2 = builder2.createTopology(); Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2); - TopologyDetails topology2 = new TopologyDetails("topology2", config, stormTopology2, 0, executorMap2, 0); + TopologyDetails topology2 = new TopologyDetails("topology2", config, stormTopology2, 0, executorMap2, 0, "user"); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); ResourceAwareScheduler rs = new ResourceAwareScheduler(); @@ -272,7 +273,7 @@ public class TestResourceAwareScheduler { Config config = new Config(); config.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1); - TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0); + TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0, "user"); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); ResourceAwareScheduler rs = new ResourceAwareScheduler(); @@ -320,7 +321,7 @@ public class TestResourceAwareScheduler { Config config = new Config(); config.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1); - TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 2, executorMap1, 0); + TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 2, executorMap1, 0, "user"); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); ResourceAwareScheduler rs = new ResourceAwareScheduler(); @@ -410,7 +411,7 @@ public class TestResourceAwareScheduler { Config config1 = new Config(); config1.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1); - TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 3, executorMap1, 0); + TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 3, executorMap1, 0, "user"); TopologyBuilder builder2 = new TopologyBuilder(); builder2.setSpout("wordSpout2", new TestWordSpout(), 2); @@ -420,7 +421,7 @@ public class TestResourceAwareScheduler { // memory requirement is large enough so that two executors can not be fully assigned to one node config2.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 1280.0); Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2); - TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 2, executorMap2, 0); + TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 2, executorMap2, 0, "user"); // Test1: When a worker fails, RAS does not alter existing assignments on healthy workers Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1); @@ -568,7 +569,7 @@ public class TestResourceAwareScheduler { Config config1 = new Config(); config1.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1); - TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 1, executorMap1, 0); + TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 1, executorMap1, 0, "user"); // topo2 has 4 large tasks TopologyBuilder builder2 = new TopologyBuilder(); @@ -577,7 +578,7 @@ public class TestResourceAwareScheduler { Config config2 = new Config(); config2.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2); - TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0); + TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0, "user"); // topo3 has 4 large tasks TopologyBuilder builder3 = new TopologyBuilder(); @@ -586,7 +587,7 @@ public class TestResourceAwareScheduler { Config config3 = new Config(); config3.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap3 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology3); - TopologyDetails topology3 = new TopologyDetails("topology3", config2, stormTopology3, 1, executorMap3, 0); + TopologyDetails topology3 = new TopologyDetails("topology3", config2, stormTopology3, 1, executorMap3, 0, "user"); // topo4 has 12 small tasks, whose mem usage does not exactly divide a node's mem capacity TopologyBuilder builder4 = new TopologyBuilder(); @@ -595,7 +596,7 @@ public class TestResourceAwareScheduler { Config config4 = new Config(); config4.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap4 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology4); - TopologyDetails topology4 = new TopologyDetails("topology4", config4, stormTopology4, 1, executorMap4, 0); + TopologyDetails topology4 = new TopologyDetails("topology4", config4, stormTopology4, 1, executorMap4, 0, "user"); // topo5 has 40 small tasks, it should be able to exactly use up both the cpu and mem in the cluster TopologyBuilder builder5 = new TopologyBuilder(); @@ -604,7 +605,7 @@ public class TestResourceAwareScheduler { Config config5 = new Config(); config5.putAll(defaultTopologyConf); Map<ExecutorDetails, String> executorMap5 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology5); - TopologyDetails topology5 = new TopologyDetails("topology5", config5, stormTopology5, 1, executorMap5, 0); + TopologyDetails topology5 = new TopologyDetails("topology5", config5, stormTopology5, 1, executorMap5, 0, "user"); // Test1: Launch topo 1-3 together, it should be able to use up either mem or cpu resource due to exact division Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1); @@ -692,7 +693,7 @@ public class TestResourceAwareScheduler { config1.putAll(defaultTopologyConf); config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0); Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1); - TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 1, executorMap1, 0); + TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 1, executorMap1, 0, "user"); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1); ResourceAwareScheduler rs = new ResourceAwareScheduler(); Map<String, TopologyDetails> topoMap = new HashMap<>(); @@ -714,7 +715,7 @@ public class TestResourceAwareScheduler { config2.putAll(defaultTopologyConf); config2.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0); Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2); - TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0); + TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0, "user"); cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config2); topoMap = new HashMap<>(); topoMap.put(topology2.getId(), topology2); @@ -770,16 +771,17 @@ public class TestResourceAwareScheduler { config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); - config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10, + "jerry"); + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20, + "jerry"); + TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20, + "jerry"); - TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10); - TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20); - TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20); - - config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby"); - - TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10); - TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 20); + TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10, + "bobby"); + TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 20, + "bobby"); Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); topoMap.put(topo1.getId(), topo1); @@ -829,8 +831,6 @@ public class TestResourceAwareScheduler { config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 128.0); config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 0.0); - config.put(Config.TOPOLOGY_SUBMITTER_USER, TOPOLOGY_SUBMITTER); - Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>(); resourceUserPool.put("jerry", new HashMap<String, Number>()); resourceUserPool.get("jerry").put("cpu", 1000); @@ -846,11 +846,16 @@ public class TestResourceAwareScheduler { config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool); - TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20); - TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 30); - TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 30); - TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20); - TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 30); + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, + 20, TOPOLOGY_SUBMITTER); + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, + 30, TOPOLOGY_SUBMITTER); + TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, + 30, TOPOLOGY_SUBMITTER); + TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, + 20, TOPOLOGY_SUBMITTER); + TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, + 30, TOPOLOGY_SUBMITTER); Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); topoMap.put(topo1.getId(), topo1); @@ -895,7 +900,8 @@ public class TestResourceAwareScheduler { LOG.info("{} - {}", topo.getName(), queue); Assert.assertEquals("check order", topo.getName(), "topo-2"); - TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 30, 10); + TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 30, + 10, TOPOLOGY_SUBMITTER); topoMap.put(topo6.getId(), topo6); topologies = new Topologies(topoMap); @@ -955,29 +961,38 @@ public class TestResourceAwareScheduler { config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); - config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); - - TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20); - TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29); - TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 29); - TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20); - TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 29); - - config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby"); - - TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 20); - TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 29); - TopologyDetails topo8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16, 29); - TopologyDetails topo9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16, 20); - TopologyDetails topo10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24, 29); - - config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek"); - - TopologyDetails topo11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, currentTime - 2, 20); - TopologyDetails topo12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, currentTime - 8, 29); - TopologyDetails topo13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, currentTime - 16, 29); - TopologyDetails topo14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, currentTime - 16, 20); - TopologyDetails topo15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, currentTime - 24, 29); + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20, + "jerry"); + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29, + "jerry"); + TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, + 29, "jerry"); + TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, + 20, "jerry"); + TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, + 29, "jerry"); + + TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 20, + "bobby"); + TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 29, + "bobby"); + TopologyDetails topo8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16, + 29, "bobby"); + TopologyDetails topo9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16, + 20, "bobby"); + TopologyDetails topo10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24, + 29, "bobby"); + + TopologyDetails topo11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, currentTime - 2, + 20, "derek"); + TopologyDetails topo12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, currentTime - 8, + 29, "derek"); + TopologyDetails topo13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, currentTime - 16, + 29, "derek"); + TopologyDetails topo14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, currentTime - 16, + 20, "derek"); + TopologyDetails topo15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, currentTime - 24, + 29, "derek"); Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); topoMap.put(topo1.getId(), topo1); @@ -1041,10 +1056,10 @@ public class TestResourceAwareScheduler { config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); - config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); - - TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20); - TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29); + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20, + "jerry"); + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29, + "jerry"); Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); topoMap.put(topo1.getId(), topo1); @@ -1104,20 +1119,20 @@ public class TestResourceAwareScheduler { config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); - config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); - - TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20); - TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20); - - config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby"); + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20, + "jerry"); + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20, + "jerry"); - TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 10); - TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10); + TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 10, + "bobby"); + TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10, + "bobby"); - config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek"); - - TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29); - TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10); + TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29, + "derek"); + TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10, + "derek"); Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); topoMap.put(topo1.getId(), topo1); @@ -1229,8 +1244,10 @@ public class TestResourceAwareScheduler { config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); - TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, currentTime - 2, 29); - TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 2, 0, currentTime - 2, 10); + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, currentTime - 2, 29, + "user"); + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 2, 0, currentTime - 2, 10, + "user"); Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); topoMap.put(topo1.getId(), topo1); @@ -1284,11 +1301,12 @@ public class TestResourceAwareScheduler { Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); - config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); - - TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, currentTime - 2, 10); - TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 2, 0, 2, 0, currentTime - 2, 20); - TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 2, 1, 1, currentTime - 2, 20); + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, currentTime - 2, 10, + "jerry"); + TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 2, 0, 2, 0, currentTime - 2, 20, + "jerry"); + TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 2, 1, 1, currentTime - 2, 20, + "jerry"); Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); topoMap.put(topo1.getId(), topo1); @@ -1344,13 +1362,10 @@ public class TestResourceAwareScheduler { StormTopology stormTopology = builder.createTopology(); TopologyDetails topo = new TopologyDetails("topo-1", config, stormTopology, - 0, - genExecsAndComps(stormTopology), 0); + 0, genExecsAndComps(stormTopology), 0, "jerry"); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config); - config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry"); - Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>(); topoMap.put(topo.getId(), topo); http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUser.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUser.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUser.java index 73c5d2c..b36d83d 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUser.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUser.java @@ -42,7 +42,7 @@ public class TestUser { Config config = new Config(); config.putAll(Utils.readDefaultConfig()); - List<TopologyDetails> topos = TestUtilsForResourceAwareScheduler.getListOfTopologies(config); + List<TopologyDetails> topos = TestUtilsForResourceAwareScheduler.getListOfTopologies(config, "user1"); User user1 = new User("user1"); for (TopologyDetails topo : topos) { @@ -64,7 +64,7 @@ public class TestUser { Config config = new Config(); config.putAll(Utils.readDefaultConfig()); - List<TopologyDetails> topos = TestUtilsForResourceAwareScheduler.getListOfTopologies(config); + List<TopologyDetails> topos = TestUtilsForResourceAwareScheduler.getListOfTopologies(config, "user1"); User user1 = new User("user1"); for (TopologyDetails topo : topos) { @@ -95,7 +95,8 @@ public class TestUser { config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 200); config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 200); - TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 1, 2, 1, Time.currentTimeSecs() - 24, 9); + TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 1, 2, 1, + Time.currentTimeSecs() - 24, 9, "user1"); User user1 = new User("user1", resourceGuaranteeMap); http://git-wip-us.apache.org/repos/asf/storm/blob/dbf7b998/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java index ba17ce3..beca679 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java @@ -63,20 +63,20 @@ public class TestUtilsForResourceAwareScheduler { private static final Logger LOG = LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class); - public static List<TopologyDetails> getListOfTopologies(Config config) { - - List<TopologyDetails> topos = new LinkedList<TopologyDetails>(); - - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 30)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 30)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 30)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 0)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 0)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16, 15)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16, 8)); - topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24, 9)); + public static List<TopologyDetails> getListOfTopologies(Config config, String user) { + + List<TopologyDetails> topos = new LinkedList<>(); + + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20, user)); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 30, user)); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 30, user)); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20, user)); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 30, user)); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 0, user)); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 0, user )); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16, 15, user)); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16, 8, user)); + topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24, 9, user)); return topos; } @@ -141,7 +141,7 @@ public class TestUtilsForResourceAwareScheduler { } public static TopologyDetails getTopology(String name, Map<String, Object> config, int numSpout, int numBolt, - int spoutParallelism, int boltParallelism, int launchTime, int priority) { + int spoutParallelism, int boltParallelism, int launchTime, int priority, String owner) { Config conf = new Config(); conf.putAll(config); @@ -151,7 +151,7 @@ public class TestUtilsForResourceAwareScheduler { StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism); TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology, 0, - genExecsAndComps(topology), launchTime); + genExecsAndComps(topology), launchTime, owner); return topo; }
