bipinprasad commented on a change in pull request #3215: Storm3585 - New
compact Constraint config including maxCoLocationCnt
URL: https://github.com/apache/storm/pull/3215#discussion_r387904802
##########
File path:
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
##########
@@ -134,48 +252,51 @@ private static boolean checkConstraintsSatisfied(Cluster
cluster, TopologyDetail
return workerToNodes;
}
- private static boolean checkSpreadSchedulingValid(Cluster cluster,
TopologyDetails topo) {
+ private static boolean checkSpreadSchedulingValid(Cluster cluster,
TopologyDetails topo, ConstraintConfig constraintConfig) {
LOG.info("Checking for a valid scheduling...");
assert (cluster.getAssignmentById(topo.getId()) != null);
- Map<ExecutorDetails, WorkerSlot> result =
cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
+ if (constraintConfig == null) {
+ constraintConfig = new ConstraintConfig(topo);
+ }
Map<ExecutorDetails, String> execToComp =
topo.getExecutorToComponent();
- Map<WorkerSlot, HashSet<ExecutorDetails>> workerExecMap = new
HashMap<>();
- Map<WorkerSlot, HashSet<String>> workerCompMap = new HashMap<>();
- Map<RasNode, HashSet<String>> nodeCompMap = new HashMap<>();
+ Map<String, Map<String, Integer>> nodeCompMap = new HashMap<>(); //
this is the critical count
Map<WorkerSlot, RasNode> workerToNodes = workerToNodes(cluster);
boolean ret = true;
- HashSet<String> spreadComps = getSpreadComps(topo);
- for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet())
{
+ Map<String, Integer> spreadCompCnts =
constraintConfig.maxCoLocationCnts;
+ for (Map.Entry<ExecutorDetails, WorkerSlot> entry :
cluster.getAssignmentById(topo.getId()).getExecutorToSlot().entrySet()) {
ExecutorDetails exec = entry.getKey();
+ String comp = execToComp.get(exec);
WorkerSlot worker = entry.getValue();
RasNode node = workerToNodes.get(worker);
-
- if (workerExecMap.computeIfAbsent(worker, (k) -> new
HashSet<>()).contains(exec)) {
- LOG.error("Incorrect Scheduling: Found duplicate in
scheduling");
- return false;
- }
- workerExecMap.get(worker).add(exec);
- String comp = execToComp.get(exec);
- workerCompMap.computeIfAbsent(worker, (k) -> new
HashSet<>()).add(comp);
- if (spreadComps.contains(comp)) {
- if (nodeCompMap.computeIfAbsent(node, (k) -> new
HashSet<>()).contains(comp)) {
- LOG.error("Incorrect Scheduling: Spread for Component: {}
{} on node {} not satisfied {}",
- comp, exec, node.getId(), nodeCompMap.get(node));
+ String nodeId = node.getId();
+
+ if (spreadCompCnts.containsKey(comp)) {
+ int allowedColocationMaxCnt = spreadCompCnts.get(comp);
+ Map<String, Integer> oneNodeCompMap =
nodeCompMap.computeIfAbsent(nodeId, (k) -> new HashMap<>());
+ oneNodeCompMap.put(comp, oneNodeCompMap.getOrDefault(comp, 0)
+ 1);
+ if (allowedColocationMaxCnt < oneNodeCompMap.get(comp)) {
+ LOG.error("Incorrect Scheduling: MaxCoLocationCnt for
Component: {} {} on node {} not satisfied, cnt {} > allowed {}",
+ comp, exec, nodeId, oneNodeCompMap.get(comp),
allowedColocationMaxCnt);
ret = false;
}
}
- nodeCompMap.computeIfAbsent(node, (k) -> new
HashSet<>()).add(comp);
+ }
+ if (!ret) {
+ LOG.error("Incorrect MaxCoLocationCnts: Node-Component-Cnt {}",
nodeCompMap);
}
return ret;
}
/**
* Check if resource constraints satisfied.
*/
- private static boolean checkResourcesCorrect(Cluster cluster,
TopologyDetails topo) {
+ private static boolean checkResourcesCorrect(Cluster cluster,
TopologyDetails topo, ConstraintConfig constraintConfig) {
Review comment:
removed unused param
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services