Github user jerrypeng commented on a diff in the pull request:
https://github.com/apache/storm/pull/921#discussion_r47992737
--- Diff:
storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
---
@@ -128,56 +308,121 @@ private void updateSupervisorsResources(Cluster
cluster, Topologies topologies)
cluster.setSupervisorsResourcesMap(supervisors_resources);
}
- private Map<String, Double> getUserConf() {
- Map<String, Double> ret = new HashMap<String, Double>();
- ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
- (Double)
_conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB));
- ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
- (Double)
_conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB));
- return ret;
+ public User getUser(String user) {
+ return this.userMap.get(user);
+ }
+
+ public Map<String, User> getUserMap() {
+ return this.userMap;
}
/**
- * print scheduling for debug purposes
- * @param cluster
+ * Intialize scheduling and running queues
+ *
* @param topologies
+ * @param cluster
*/
- public String printScheduling(Cluster cluster, Topologies topologies) {
- StringBuilder str = new StringBuilder();
- Map<String, Map<String, Map<WorkerSlot,
Collection<ExecutorDetails>>>> schedulingMap = new HashMap<String, Map<String,
Map<WorkerSlot, Collection<ExecutorDetails>>>>();
- for (TopologyDetails topo : topologies.getTopologies()) {
- if (cluster.getAssignmentById(topo.getId()) != null) {
- for (Map.Entry<ExecutorDetails, WorkerSlot> entry :
cluster.getAssignmentById(topo.getId()).getExecutorToSlot().entrySet()) {
- WorkerSlot slot = entry.getValue();
- String nodeId = slot.getNodeId();
- ExecutorDetails exec = entry.getKey();
- if (schedulingMap.containsKey(nodeId) == false) {
- schedulingMap.put(nodeId, new HashMap<String,
Map<WorkerSlot, Collection<ExecutorDetails>>>());
- }
- if
(schedulingMap.get(nodeId).containsKey(topo.getId()) == false) {
- schedulingMap.get(nodeId).put(topo.getId(), new
HashMap<WorkerSlot, Collection<ExecutorDetails>>());
- }
- if
(schedulingMap.get(nodeId).get(topo.getId()).containsKey(slot) == false) {
-
schedulingMap.get(nodeId).get(topo.getId()).put(slot, new
LinkedList<ExecutorDetails>());
- }
-
schedulingMap.get(nodeId).get(topo.getId()).get(slot).add(exec);
+ private void initUsers(Topologies topologies, Cluster cluster) {
+ this.userMap = new HashMap<String, User>();
+ Map<String, Map<String, Double>> userResourcePools =
this.getUserResourcePools();
+ LOG.debug("userResourcePools: {}", userResourcePools);
+
+ for (TopologyDetails td : topologies.getTopologies()) {
+ //Get user that submitted topology. If topology submitter is
null or empty string, the topologySubmitter
+ //will be set to anonymous
+ String topologySubmitter = td.getTopologySubmitter();
+ //additional safety check to make sure that topologySubmitter
is going to be a valid value
+ if (topologySubmitter == null || topologySubmitter.equals(""))
{
+ LOG.error("Cannot determine user for topology {}. Will
skip scheduling this topology", td.getName());
+ continue;
+ }
+ if (!this.userMap.containsKey(topologySubmitter)) {
+ this.userMap.put(topologySubmitter, new
User(topologySubmitter, userResourcePools.get(topologySubmitter)));
+ }
+ if (cluster.getUnassignedExecutors(td).size() > 0) {
+ LOG.debug("adding td: {} to pending queue", td.getName());
+
this.userMap.get(topologySubmitter).addTopologyToPendingQueue(td);
+ } else {
+ LOG.debug("adding td: {} to running queue with existing
status: {}", td.getName(), cluster.getStatusMap().get(td.getId()));
+
this.userMap.get(topologySubmitter).addTopologyToRunningQueue(td);
+ if (cluster.getStatusMap().get(td.getId()) == null ||
cluster.getStatusMap().get(td.getId()).equals("")) {
+ cluster.setStatus(td.getId(), "Fully Scheduled");
}
}
}
+ }
- for (Map.Entry<String, Map<String, Map<WorkerSlot,
Collection<ExecutorDetails>>>> entry : schedulingMap.entrySet()) {
- if (cluster.getSupervisorById(entry.getKey()) != null) {
- str.append("/** Node: " +
cluster.getSupervisorById(entry.getKey()).getHost() + "-" + entry.getKey() + "
**/\n");
- } else {
- str.append("/** Node: Unknown may be dead -" +
entry.getKey() + " **/\n");
+ private void initialize(Topologies topologies, Cluster cluster) {
+ this.cluster = cluster;
+ this.topologies = topologies;
+ this.nodes = new RAS_Nodes(this.cluster, this.topologies);
+ initUsers(topologies, cluster);
+ }
+
+ /**
+ * Get resource guarantee configs
+ *
+ * @return
+ */
+ private Map<String, Map<String, Double>> getUserResourcePools() {
+ Object raw =
this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
+ Map<String, Map<String, Double>> ret = new HashMap<String,
Map<String, Double>>();
+
+ if (raw != null) {
+ for (Map.Entry<String, Map<String, Number>> UserPoolEntry :
((Map<String, Map<String, Number>>) raw).entrySet()) {
+ String user = UserPoolEntry.getKey();
+ ret.put(user, new HashMap<String, Double>());
+ for (Map.Entry<String, Number> resourceEntry :
UserPoolEntry.getValue().entrySet()) {
+ ret.get(user).put(resourceEntry.getKey(),
resourceEntry.getValue().doubleValue());
+ }
}
- for (Map.Entry<String, Map<WorkerSlot,
Collection<ExecutorDetails>>> topo_sched :
schedulingMap.get(entry.getKey()).entrySet()) {
- str.append("\t-->Topology: " + topo_sched.getKey() + "\n");
- for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> ws
: topo_sched.getValue().entrySet()) {
- str.append("\t\t->Slot [" + ws.getKey().getPort() + "]
-> " + ws.getValue() + "\n");
+ }
+
+ Map fromFile =
Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
+ Map<String, Map<String, Number>> tmp = (Map<String, Map<String,
Number>>) fromFile.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
+ if (tmp != null) {
+ for (Map.Entry<String, Map<String, Number>> UserPoolEntry :
tmp.entrySet()) {
+ String user = UserPoolEntry.getKey();
+ ret.put(user, new HashMap<String, Double>());
+ for (Map.Entry<String, Number> resourceEntry :
UserPoolEntry.getValue().entrySet()) {
+ ret.get(user).put(resourceEntry.getKey(),
resourceEntry.getValue().doubleValue());
}
}
}
- return str.toString();
+ return ret;
+ }
+
+ private SchedulingState checkpointSchedulingState() {
+ LOG.debug("/*********Checkpoint scheduling state************/");
+ for (User user : this.getUserMap().values()) {
+ LOG.debug(user.getDetailedInfo());
+ }
+ LOG.debug(ResourceUtils.printScheduling(this.cluster,
this.topologies));
+ LOG.debug("nodes:\n{}", this.nodes);
+ LOG.debug("/*********End************/");
+ return new SchedulingState(this.userMap, this.cluster,
this.topologies, this.nodes, this.conf);
+ }
+
+ private void restoreCheckpointSchedulingState(SchedulingState
schedulingState) {
+ LOG.debug("/*********restoring scheduling state************/");
+ //reseting cluster
+ //Cannot simply set this.cluster=schedulingState.cluster since
clojure is immutable
+
this.cluster.setAssignments(schedulingState.cluster.getAssignments());
+
this.cluster.setSupervisorsResourcesMap(schedulingState.cluster.getSupervisorsResourcesMap());
+ this.cluster.setStatusMap(schedulingState.cluster.getStatusMap());
+
this.cluster.setResourcesMap(schedulingState.cluster.getResourcesMap());
+ //don't need to explicitly set data structues like Cluster since
nothing can really be changed
+ //unless this.topologies is set to another object
+ this.topologies = schedulingState.topologies;
+ this.conf = schedulingState.conf;
+ this.userMap = schedulingState.userMap;
+ this.nodes = schedulingState.nodes;
+
+ for (User user : this.getUserMap().values()) {
+ LOG.debug(user.getDetailedInfo());
+ }
+ LOG.debug(ResourceUtils.printScheduling(cluster, topologies));
+ LOG.debug("nodes:\n{}", this.nodes);
+ LOG.debug("/*********End************/");
--- End diff --
We could but, as per our discussion, can we perhaps looked into changing
that in the future if that refactor is really need
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---