Ethanlm commented on a change in pull request #3254:
URL: https://github.com/apache/storm/pull/3254#discussion_r413975679
##########
File path:
storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
##########
@@ -72,13 +74,23 @@ public void prepare(Map<String, Object> conf) {
}
}
+ // update countMap failures for sendAssignments failing
+ for (Map<String, Integer> item : sendAssignmentFailureCount) {
+ for (Map.Entry<String, Integer> entry : item.entrySet()) {
+ String supervisorNode = entry.getKey();
+ int sendAssignmentFailures = entry.getValue() +
countMap.getOrDefault(supervisorNode, 0);
+ countMap.put(supervisorNode, sendAssignmentFailures);
+ }
+ }
+
for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
String supervisor = entry.getKey();
int count = entry.getValue();
if (count >= toleranceCount) {
if (!blacklist.containsKey(supervisor)) { // if not in
blacklist then add it and set the resume time according to config
LOG.debug("Added supervisor {} to blacklist", supervisor);
LOG.debug("supervisorsWithFailures : {}",
supervisorsWithFailures);
+ LOG.debug("sendAssignmentFailureCount: {}",
sendAssignmentFailureCount);
reporter.reportBlacklist(supervisor,
supervisorsWithFailures);
Review comment:
This `reportBlacklist` no longer fits the need since we added
sentAssignmentFailure. Maybe need to fix it as a follow-up. Not sure what's the
best way to do it
##########
File path:
storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
##########
@@ -155,6 +163,18 @@ private void badSupervisors(Map<String, SupervisorDetails>
supervisors) {
badSupervisorsToleranceSlidingWindow.add(badSupervisors);
}
+ private void trackAssignmentFailures() {
+ if (!blacklistSendAssignentFailures) {
+ return;
+ }
+ Map<String, Integer> assignmentFailureWindow = new HashMap<>();
+ assignmentFailureWindow.putAll(this.assignmentFailures);
+ synchronized (assignmentFailures) {
Review comment:
Since we already `synchronized` on it, maybe should just as well put
`assignmentFailureWindow.putAll(this.assignmentFailures);` inside the block and
then it doesn't need to be a `concurrentHashMap`.
To reduce two synchronization mechanisms to one
##########
File path:
storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
##########
@@ -51,12 +52,15 @@
protected IBlacklistStrategy blacklistStrategy;
protected int nimbusMonitorFreqSecs;
protected Map<String, Set<Integer>> cachedSupervisors;
- //key is supervisor key ,value is supervisor ports
- protected EvictingQueue<HashMap<String, Set<Integer>>>
badSupervisorsToleranceSlidingWindow;
+ // key is supervisor nodeId, value is supervisor ports
+ protected EvictingQueue<Map<String, Set<Integer>>>
badSupervisorsToleranceSlidingWindow;
+ protected EvictingQueue<Map<String, Integer>> sendAssignmentFailureCount;
+ private Map<String, Integer> assignmentFailures = new
ConcurrentHashMap<>();
Review comment:
Make this `final` since we are `synchronize` on it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]