STORM-2083 Blacklist scheduler

* address review comments from @HeartSaVioR and @revans2


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a7b86c32
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a7b86c32
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a7b86c32

Branch: refs/heads/master
Commit: a7b86c328ad56adf7e7259aeb2a9bcc1ece88b4f
Parents: 25c27c6
Author: Jungtaek Lim <[email protected]>
Authored: Mon Sep 25 22:16:48 2017 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Wed Sep 27 14:09:31 2017 +0900

----------------------------------------------------------------------
 .../scheduler/blacklist/BlacklistScheduler.java | 113 +++++++++----------
 .../blacklist/reporters/IReporter.java          |   4 +-
 .../blacklist/reporters/LogReporter.java        |   8 +-
 .../strategies/DefaultBlacklistStrategy.java    |  68 +++++------
 .../strategies/IBlacklistStrategy.java          |   2 +-
 .../scheduler/blacklist/FaultGenerateUtils.java |  11 --
 .../blacklist/TestBlacklistScheduler.java       |  67 +++++------
 7 files changed, 123 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index 0080134..a05d814 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -18,7 +18,6 @@
 package org.apache.storm.scheduler.blacklist;
 
 import com.google.common.collect.EvictingQueue;
-import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.Cluster;
@@ -27,9 +26,10 @@ import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.reporters.LogReporter;
+import 
org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy;
 import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
 import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +42,11 @@ import java.util.concurrent.Callable;
 
 public class BlacklistScheduler implements IScheduler {
     private static final Logger LOG = 
LoggerFactory.getLogger(BlacklistScheduler.class);
+
+    public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
+    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
+    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME = 300;
+
     private final IScheduler underlyingScheduler;
     @SuppressWarnings("rawtypes")
     private Map _conf;
@@ -67,51 +72,27 @@ public class BlacklistScheduler implements IScheduler {
 
     @Override
     public void prepare(Map conf) {
-        LOG.info("prepare black list scheduler");
+        LOG.info("Preparing black list scheduler");
         underlyingScheduler.prepare(conf);
         _conf = conf;
-        if 
(_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME)) {
-            toleranceTime = ObjectReader.getInt( 
_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME));
-        }
-        if 
(_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)) {
-            toleranceCount = ObjectReader.getInt( 
_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT));
-        }
-        if (_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME)) {
-            resumeTime = ObjectReader.getInt( 
_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME));
-        }
-        String reporterClassName = 
_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) ? (String) 
_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) : 
"org.apache.storm.scheduler.blacklist.reporters.LogReporter" ;
-        try {
-            reporter = (IReporter) 
Class.forName(reporterClassName).newInstance();
-        } catch (ClassNotFoundException e) {
-            LOG.error("Can't find blacklist reporter for name {}", 
reporterClassName);
-            throw new RuntimeException(e);
-        } catch (InstantiationException e) {
-            LOG.error("Throw InstantiationException blacklist reporter for 
name {}", reporterClassName);
-            throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-            LOG.error("Throw illegalAccessException blacklist reporter for 
name {}", reporterClassName);
-            throw new RuntimeException(e);
-        }
 
-        String strategyClassName = 
_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY) ? (String) 
_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY) : 
"org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy";
-        try {
-            blacklistStrategy = (IBlacklistStrategy) 
Class.forName(strategyClassName).newInstance();
-        } catch (ClassNotFoundException e) {
-            LOG.error("Can't find blacklist strategy for name {}", 
strategyClassName);
-            throw new RuntimeException(e);
-        } catch (InstantiationException e) {
-            LOG.error("Throw InstantiationException blacklist strategy for 
name {}", strategyClassName);
-            throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-            LOG.error("Throw illegalAccessException blacklist strategy for 
name {}", strategyClassName);
-            throw new RuntimeException(e);
-        }
+        toleranceTime = 
ObjectReader.getInt(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME), 
DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME);
+        toleranceCount = ObjectReader.getInt( 
_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), 
DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
+        resumeTime = ObjectReader.getInt( 
_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), 
DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
+
+        String reporterClassName = 
ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
+                LogReporter.class.getName());
+        reporter = (IReporter) initializeInstance(reporterClassName, 
"blacklist reporter");
+
+        String strategyClassName = 
ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY),
+                DefaultBlacklistStrategy.class.getName());
+        blacklistStrategy = (IBlacklistStrategy) 
initializeInstance(strategyClassName, "blacklist strategy");
 
         nimbusMonitorFreqSecs = ObjectReader.getInt( 
_conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
         blacklistStrategy.prepare(_conf);
 
-        windowSize=toleranceTime / nimbusMonitorFreqSecs;
-        badSupervisorsToleranceSlidingWindow =EvictingQueue.create(windowSize);
+        windowSize = toleranceTime / nimbusMonitorFreqSecs;
+        badSupervisorsToleranceSlidingWindow = 
EvictingQueue.create(windowSize);
         cachedSupervisors = new HashMap<>();
         blacklistHost = new HashSet<>();
 
@@ -134,7 +115,9 @@ public class BlacklistScheduler implements IScheduler {
 
         blacklistStrategy.resumeFromBlacklist();
         badSupervisors(supervisors);
-        cluster.setBlacklistedHosts(getBlacklistHosts(cluster, topologies));
+        Set<String> blacklistHosts = getBlacklistHosts(cluster, topologies);
+        this.blacklistHost = blacklistHosts;
+        cluster.setBlacklistedHosts(blacklistHosts);
         removeLongTimeDisappearFromCache();
 
         underlyingScheduler.schedule(topologies, cluster);
@@ -149,7 +132,7 @@ public class BlacklistScheduler implements IScheduler {
         Set<String> cachedSupervisorsKeySet = cachedSupervisors.keySet();
         Set<String> supervisorsKeySet = supervisors.keySet();
 
-        Set<String> badSupervisorKeys = 
Sets.difference(cachedSupervisorsKeySet, supervisorsKeySet);//cached supervisor 
doesn't show up
+        Set<String> badSupervisorKeys = 
Sets.difference(cachedSupervisorsKeySet, supervisorsKeySet); //cached 
supervisor doesn't show up
         HashMap<String, Set<Integer>> badSupervisors = new HashMap<>();
         for (String key : badSupervisorKeys) {
             badSupervisors.put(key, cachedSupervisors.get(key));
@@ -160,11 +143,11 @@ public class BlacklistScheduler implements IScheduler {
             SupervisorDetails supervisorDetails = entry.getValue();
             if (cachedSupervisors.containsKey(key)) {
                 Set<Integer> badSlots = badSlots(supervisorDetails, key);
-                if (badSlots.size() > 0) {//supervisor contains bad slots
+                if (badSlots.size() > 0) { //supervisor contains bad slots
                     badSupervisors.put(key, badSlots);
                 }
             } else {
-                cachedSupervisors.put(key, 
supervisorDetails.getAllPorts());//new supervisor to cache
+                cachedSupervisors.put(key, supervisorDetails.getAllPorts()); 
//new supervisor to cache
             }
         }
 
@@ -185,7 +168,7 @@ public class BlacklistScheduler implements IScheduler {
         return badSlots;
     }
 
-    public Set<String> getBlacklistHosts(Cluster cluster, Topologies 
topologies) {
+    private Set<String> getBlacklistHosts(Cluster cluster, Topologies 
topologies) {
         Set<String> blacklistSet = blacklistStrategy.getBlacklist(new 
ArrayList<>(badSupervisorsToleranceSlidingWindow), cluster, topologies);
         Set<String> blacklistHostSet = new HashSet<>();
         for (String supervisor : blacklistSet) {
@@ -193,10 +176,9 @@ public class BlacklistScheduler implements IScheduler {
             if (host != null) {
                 blacklistHostSet.add(host);
             } else {
-                LOG.info("supervisor {} is not alive know, do not need to add 
to blacklist.", supervisor);
+                LOG.info("supervisor {} is not alive, do not need to add to 
blacklist.", supervisor);
             }
         }
-        this.blacklistHost = blacklistHostSet;
         return blacklistHostSet;
     }
 
@@ -211,20 +193,14 @@ public class BlacklistScheduler implements IScheduler {
         for (Map<String, Set<Integer>> item : 
badSupervisorsToleranceSlidingWindow) {
             Set<String> supervisors = item.keySet();
             for (String supervisor : supervisors) {
-                int supervisorCount = 0;
-                if (supervisorCountMap.containsKey(supervisor)) {
-                    supervisorCount = supervisorCountMap.get(supervisor);
-                }
+                int supervisorCount = 
supervisorCountMap.getOrDefault(supervisor, 0);
                 Set<Integer> slots = item.get(supervisor);
-                if(slots.equals(cachedSupervisors.get(supervisor))){//only all 
slots are bad means supervisor is bad
+                if (slots.equals(cachedSupervisors.get(supervisor))) { // 
treat supervisor as bad only if all slots are bad
                     supervisorCountMap.put(supervisor, supervisorCount + 1);
                 }
                 for (Integer slot : slots) {
-                    int slotCount = 0;
                     WorkerSlot workerSlot = new WorkerSlot(supervisor, slot);
-                    if (slotCountMap.containsKey(workerSlot)) {
-                        slotCount = slotCountMap.get(workerSlot);
-                    }
+                    int slotCount = slotCountMap.getOrDefault(workerSlot, 0);
                     slotCountMap.put(workerSlot, slotCount + 1);
                 }
             }
@@ -233,9 +209,9 @@ public class BlacklistScheduler implements IScheduler {
         for (Map.Entry<String, Integer> entry : supervisorCountMap.entrySet()) 
{
             String key = entry.getKey();
             int value = entry.getValue();
-            if (value == windowSize) {//supervisor never exits once in 
tolerance time will be removed from cache
+            if (value == windowSize) { // supervisor which was never back to 
normal in tolerance period will be removed from cache
                 cachedSupervisors.remove(key);
-                LOG.info("supervisor {} has never exited once during tolerance 
time, proberbly be dead forever, removed from cache.", key);
+                LOG.info("Supervisor {} was never back to normal during 
tolerance period, probably dead. Will remove from cache.", key);
             }
         }
 
@@ -244,14 +220,29 @@ public class BlacklistScheduler implements IScheduler {
             String supervisorKey = workerSlot.getNodeId();
             Integer slot = workerSlot.getPort();
             int value = entry.getValue();
-            if (value == windowSize) {//port never exits once in tolerance 
time will be removed from cache
+            if (value == windowSize) { // worker slot which was never back to 
normal in tolerance period will be removed from cache
                 Set<Integer> slots = cachedSupervisors.get(supervisorKey);
-                if (slots != null) {//slots will be null while supervisor has 
been removed from cached supervisors
+                if (slots != null) { // slots will be null while supervisor 
has been removed from cached supervisors
                     slots.remove(slot);
                     cachedSupervisors.put(supervisorKey, slots);
                 }
-                LOG.info("slot {} has never exited once during tolerance time, 
proberbly be dead forever, removed from cache.", workerSlot);
+                LOG.info("Worker slot {} was never back to normal during 
tolerance period, probably dead. Will be removed from cache.", workerSlot);
             }
         }
     }
+
+    private Object initializeInstance(String className, String representation) 
{
+        try {
+            return Class.forName(className).newInstance();
+        } catch (ClassNotFoundException e) {
+            LOG.error("Can't find {} for name {}", representation, className);
+            throw new RuntimeException(e);
+        } catch (InstantiationException e) {
+            LOG.error("Throw InstantiationException {} for name {}", 
representation, className);
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            LOG.error("Throw IllegalAccessException {} for name {}", 
representation, className);
+            throw new RuntimeException(e);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
index 00dcaa4..781c37a 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
@@ -17,8 +17,8 @@
  */
 package org.apache.storm.scheduler.blacklist.reporters;
 
-import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -27,5 +27,5 @@ import java.util.Set;
 public interface IReporter {
     void report(String message);
 
-    void reportBlacklist(String supervisor, List<HashMap<String, 
Set<Integer>>> toleranceBuffer);
+    void reportBlacklist(String supervisor, List<Map<String, Set<Integer>>> 
toleranceBuffer);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
index d8b7679..94cfebd 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
@@ -20,8 +20,8 @@ package org.apache.storm.scheduler.blacklist.reporters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class LogReporter implements IReporter {
@@ -33,8 +33,8 @@ public class LogReporter implements IReporter {
     }
 
     @Override
-    public void reportBlacklist(String supervisor, List<HashMap<String, 
Set<Integer>>> toleranceBuffer) {
-        String message = "add supervisor " + supervisor + " to blacklist. The 
bad slot history of supervisors is :" + toleranceBuffer;
-        report(message);
+    public void reportBlacklist(String supervisor, List<Map<String, 
Set<Integer>>> toleranceBuffer) {
+        LOG.warn("add supervisor {}  to blacklist. The bad slot history of 
supervisors is : {}",
+                supervisor, toleranceBuffer);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
index 9b2e9b0..cc7f403 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -24,6 +24,7 @@ import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.reporters.LogReporter;
 import org.apache.storm.utils.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +40,9 @@ public class DefaultBlacklistStrategy implements 
IBlacklistStrategy {
 
     private static Logger LOG = 
LoggerFactory.getLogger(DefaultBlacklistStrategy.class);
 
+    public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
+    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
+
     private IReporter _reporter;
 
     private int _toleranceCount;
@@ -49,41 +53,25 @@ public class DefaultBlacklistStrategy implements 
IBlacklistStrategy {
 
     @Override
     public void prepare(Map conf){
-        if 
(conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)) {
-            _toleranceCount = ObjectReader.getInt( 
conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT));
-        }
-        if (conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME)) {
-            _resumeTime = ObjectReader.getInt( 
conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME));
-        }
-        String reporterClassName = 
conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) ? (String) 
conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) : 
"org.apache.storm.scheduler.blacklist.reporters.LogReporter" ;
-        try {
-            _reporter = (IReporter) 
Class.forName(reporterClassName).newInstance();
-        } catch (ClassNotFoundException e) {
-            LOG.error("Can't find blacklist reporter for name {}", 
reporterClassName);
-            throw new RuntimeException(e);
-        } catch (InstantiationException e) {
-            LOG.error("Throw InstantiationException blacklist reporter for 
name {}", reporterClassName);
-            throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-            LOG.error("Throw illegalAccessException blacklist reporter for 
name {}", reporterClassName);
-            throw new RuntimeException(e);
-        }
+        _toleranceCount = 
ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), 
DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
+        _resumeTime = 
ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), 
DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
+
+        String reporterClassName = 
ObjectReader.getString(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
+                LogReporter.class.getName());
+        _reporter = (IReporter) initializeInstance(reporterClassName, 
"blacklist reporter");
 
-        _nimbusMonitorFreqSecs = ObjectReader.getInt( 
conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
+        _nimbusMonitorFreqSecs = 
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
         blacklist = new TreeMap<>();
     }
 
     @Override
-    public Set<String> getBlacklist(List<HashMap<String, Set<Integer>>> 
supervisorsWithFailures, Cluster cluster, Topologies topologies) {
+    public Set<String> getBlacklist(List<Map<String, Set<Integer>>> 
supervisorsWithFailures, Cluster cluster, Topologies topologies) {
         Map<String, Integer> countMap = new HashMap<String, Integer>();
 
         for (Map<String, Set<Integer>> item : supervisorsWithFailures) {
             Set<String> supervisors = item.keySet();
             for (String supervisor : supervisors) {
-                int supervisorCount = 0;
-                if (countMap.containsKey(supervisor)) {
-                    supervisorCount = countMap.get(supervisor);
-                }
+                int supervisorCount = countMap.getOrDefault(supervisor, 0);
                 countMap.put(supervisor, supervisorCount + 1);
             }
         }
@@ -91,9 +79,9 @@ public class DefaultBlacklistStrategy implements 
IBlacklistStrategy {
             String supervisor = entry.getKey();
             int count = entry.getValue();
             if (count >= _toleranceCount) {
-                if (!blacklist.containsKey(supervisor)) {// if not in 
blacklist then add it and set the resume time according to config
-                    LOG.info("add supervisor {} to blacklist", supervisor);
-                    LOG.info("supervisorsWithFailures : {}", 
supervisorsWithFailures);
+                if (!blacklist.containsKey(supervisor)) { // if not in 
blacklist then add it and set the resume time according to config
+                    LOG.debug("add supervisor {} to blacklist", supervisor);
+                    LOG.debug("supervisorsWithFailures : {}", 
supervisorsWithFailures);
                     _reporter.reportBlacklist(supervisor, 
supervisorsWithFailures);
                     blacklist.put(supervisor, _resumeTime / 
_nimbusMonitorFreqSecs);
                 }
@@ -103,6 +91,7 @@ public class DefaultBlacklistStrategy implements 
IBlacklistStrategy {
         return blacklist.keySet();
     }
 
+    @Override
     public void resumeFromBlacklist() {
         Set<String> readyToRemove = new HashSet<String>();
         for (Map.Entry<String, Integer> entry : blacklist.entrySet()) {
@@ -116,11 +105,11 @@ public class DefaultBlacklistStrategy implements 
IBlacklistStrategy {
         }
         for (String key : readyToRemove) {
             blacklist.remove(key);
-            LOG.info("supervisor {} reach the resume time ,removed from 
blacklist", key);
+            LOG.info("Supervisor {} has been blacklisted more than resume 
period. Removed from blacklist.", key);
         }
     }
 
-    public void releaseBlacklistWhenNeeded(Cluster cluster, Topologies 
topologies) {
+    private void releaseBlacklistWhenNeeded(Cluster cluster, Topologies 
topologies) {
         if (blacklist.size() > 0) {
             int totalNeedNumWorkers = 0;
             List<TopologyDetails> needSchedulingTopologies = 
cluster.needsSchedulingTopologies();
@@ -145,12 +134,12 @@ public class DefaultBlacklistStrategy implements 
IBlacklistStrategy {
                         , totalNeedNumWorkers, 
availableSlotsNotInBlacklistCount, blacklist.size());
                 //release earliest blacklist
                 Set<String> readyToRemove = new HashSet<>();
-                for (String supervisor : blacklist.keySet()) {//blacklist is 
treeMap sorted by value, value minimum meas earliest
+                for (String supervisor : blacklist.keySet()) { //blacklist is 
treeMap sorted by value, minimum value means earliest
                     if (availableSupervisors.containsKey(supervisor)) {
                         Set<Integer> ports = 
cluster.getAvailablePorts(availableSupervisors.get(supervisor));
                         readyToRemove.add(supervisor);
                         shortage -= ports.size();
-                        if (shortage <= 0) {//released enough supervisor
+                        if (shortage <= 0) { //released enough supervisor
                             break;
                         }
                     }
@@ -162,4 +151,19 @@ public class DefaultBlacklistStrategy implements 
IBlacklistStrategy {
             }
         }
     }
+
+    private Object initializeInstance(String className, String representation) 
{
+        try {
+            return Class.forName(className).newInstance();
+        } catch (ClassNotFoundException e) {
+            LOG.error("Can't find {} for name {}", representation, className);
+            throw new RuntimeException(e);
+        } catch (InstantiationException e) {
+            LOG.error("Throw InstantiationException {} for name {}", 
representation, className);
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            LOG.error("Throw IllegalAccessException {} for name {}", 
representation, className);
+            throw new RuntimeException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
index df79b8c..a35a1d2 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
@@ -41,7 +41,7 @@ public interface IBlacklistStrategy {
      *       the `cluster` object.
      * @return blacklisted supervisors' id set
      */
-    Set<String> getBlacklist(List<HashMap<String, Set<Integer>>> 
badSupervisorsToleranceSlidingWindow, Cluster cluster, Topologies topologies);
+    Set<String> getBlacklist(List<Map<String, Set<Integer>>> 
badSupervisorsToleranceSlidingWindow, Cluster cluster, Topologies topologies);
 
     /**
      * resume supervisors form blacklist. Blacklist is just a temporary list 
for supervisors,

http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
 
b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
index 5b8bbe7..ece6357 100644
--- 
a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
@@ -30,17 +30,6 @@ import java.util.Map;
 
 public class FaultGenerateUtils {
 
-    public static List<Map<String, SupervisorDetails>> getSupervisorsList(int 
supervisorCount, int slotCount, int[][][] faults) {
-        List<Map<String, SupervisorDetails>> supervisorsList = new 
ArrayList<>(faults.length);
-        for (int[][] fault : faults) {
-            Map<String, SupervisorDetails> supervisors = 
TestUtilsForBlacklistScheduler.genSupervisors(supervisorCount, slotCount);
-            if (fault.length == 1 && fault[0][0] == -1) {
-                
TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supervisors, 
"sup-0");
-            }
-        }
-        return supervisorsList;
-    }
-
     public static List<Map<String, SupervisorDetails>> getSupervisorsList(int 
supervisorCount, int slotCount, List<Map<Integer, List<Integer>>> faultList) {
         List<Map<String, SupervisorDetails>> supervisorsList = new 
ArrayList<>(faultList.size());
         for (Map<Integer, List<Integer>> faults : faultList) {

http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
 
b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
index 049ef68..6cf8a0e 100644
--- 
a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
@@ -19,6 +19,7 @@ package org.apache.storm.scheduler.blacklist;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.scheduler.Cluster;
@@ -34,6 +35,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -75,9 +77,7 @@ public class TestBlacklistScheduler {
         bs.schedule(topologies, cluster);
         cluster = new Cluster(iNimbus, supMap, 
TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), 
topologies, config);
         bs.schedule(topologies, cluster);
-        Set<String> hosts = new HashSet<>();
-        hosts.add("host-0");
-        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        Assert.assertEquals("blacklist", Collections.singleton("host-0"), 
cluster.getBlacklistedHosts());
     }
 
     @Test
@@ -109,9 +109,7 @@ public class TestBlacklistScheduler {
         bs.schedule(topologies, cluster);
         cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), topologies, config);
         bs.schedule(topologies, cluster);
-        Set<String> hosts = new HashSet<>();
-        hosts.add("host-0");
-        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        Assert.assertEquals("blacklist", Collections.singleton("host-0"), 
cluster.getBlacklistedHosts());
     }
 
     @Test
@@ -142,16 +140,13 @@ public class TestBlacklistScheduler {
         bs.schedule(topologies, cluster);
         cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), topologies, config);
         bs.schedule(topologies, cluster);
-        Set<String> hosts = new HashSet<>();
-        hosts.add("host-0");
-        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        Assert.assertEquals("blacklist", Collections.singleton("host-0"), 
cluster.getBlacklistedHosts());
         for (int i = 0; i < 300 / 10 - 2; i++) {
             bs.schedule(topologies, cluster);
         }
-        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        Assert.assertEquals("blacklist", Collections.singleton("host-0"), 
cluster.getBlacklistedHosts());
         bs.schedule(topologies, cluster);
-        hosts.clear();
-        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        Assert.assertEquals("blacklist", Collections.emptySet(), 
cluster.getBlacklistedHosts());
     }
 
     @Test
@@ -185,17 +180,14 @@ public class TestBlacklistScheduler {
         bs.schedule(topologies, cluster);
         cluster = new Cluster(iNimbus, supMap, 
TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), 
topologies, config);
         bs.schedule(topologies, cluster);
-        Set<String> hosts = new HashSet<>();
-        hosts.add("host-0");
-        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        Assert.assertEquals("blacklist", Collections.singleton("host-0"), 
cluster.getBlacklistedHosts());
         topoMap.put(topo2.getId(), topo2);
         topoMap.put(topo3.getId(), topo3);
         topoMap.put(topo4.getId(), topo4);
         topologies = new Topologies(topoMap);
         cluster = new Cluster(iNimbus, supMap, 
TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), 
topologies, config);
         bs.schedule(topologies, cluster);
-        hosts.clear();
-        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        Assert.assertEquals("blacklist", Collections.emptySet(), 
cluster.getBlacklistedHosts());
     }
 
     @Test
@@ -218,21 +210,21 @@ public class TestBlacklistScheduler {
 
         List<Map<Integer, List<Integer>>> faultList = new ArrayList<>();
 
-        faultList.add(new HashMap<Integer, List<Integer>>());
-        faultList.add((Map) ImmutableMap.of(0, ImmutableList.of(0, 1)));
-        faultList.add((Map) ImmutableMap.of(0, new ArrayList<>()));
+        faultList.add(new HashMap<>());
+        faultList.add(ImmutableMap.of(0, ImmutableList.of(0, 1)));
+        faultList.add(ImmutableMap.of(0, new ArrayList<>()));
         for (int i = 0; i < 17; i++) {
-            faultList.add(new HashMap<Integer, List<Integer>>());
+            faultList.add(new HashMap<>());
         }
-        faultList.add((Map) ImmutableMap.of(0, ImmutableList.of(0, 1)));
-        faultList.add((Map) ImmutableMap.of(1, ImmutableList.of(1)));
+        faultList.add(ImmutableMap.of(0, ImmutableList.of(0, 1)));
+        faultList.add(ImmutableMap.of(1, ImmutableList.of(1)));
         for (int i = 0; i < 8; i++) {
-            faultList.add(new HashMap<Integer, List<Integer>>());
+            faultList.add(new HashMap<>());
         }
-        faultList.add((Map) ImmutableMap.of(0, ImmutableList.of(1)));
-        faultList.add((Map) ImmutableMap.of(1, ImmutableList.of(1)));
+        faultList.add(ImmutableMap.of(0, ImmutableList.of(1)));
+        faultList.add(ImmutableMap.of(1, ImmutableList.of(1)));
         for (int i = 0; i < 30; i++) {
-            faultList.add(new HashMap<Integer, List<Integer>>());
+            faultList.add(new HashMap<>());
         }
 
         List<Map<String, SupervisorDetails>> supervisorsList = 
FaultGenerateUtils.getSupervisorsList(3, 4, faultList);
@@ -285,9 +277,9 @@ public class TestBlacklistScheduler {
 
     @Test
     public void removeLongTimeDisappearFromCache(){
-        INimbus iNimbus=new TestUtilsForBlacklistScheduler.INimbusTest();
+        INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
 
-        Map<String, SupervisorDetails> 
supMap=TestUtilsForBlacklistScheduler.genSupervisors(3,4);
+        Map<String, SupervisorDetails> supMap = 
TestUtilsForBlacklistScheduler.genSupervisors(3,4);
 
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
@@ -301,27 +293,24 @@ public class TestBlacklistScheduler {
 
         Topologies topologies = new Topologies(topoMap);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), topologies, config);
-        BlacklistScheduler bs=new BlacklistScheduler(new DefaultScheduler());
+        BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler());
         bs.prepare(config);
         bs.schedule(topologies,cluster);
         cluster = new Cluster(iNimbus, 
TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap,"sup-0"),TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()),
 topologies, config);
-        for(int i=0;i<20;i++){
+        for (int i = 0 ; i < 20 ; i++){
             bs.schedule(topologies,cluster);
         }
-        Set<String> cached=new HashSet<>();
+        Set<String> cached = new HashSet<>();
         cached.add("sup-1");
         cached.add("sup-2");
         Assert.assertEquals(cached,bs.cachedSupervisors.keySet());
         cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), topologies, config);
         bs.schedule(topologies,cluster);
         cluster = new Cluster(iNimbus, 
TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap,"sup-0",0),TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()),
 topologies, config);
-        for(int i=0;i<20;i++){
-            bs.schedule(topologies,cluster);
+        for (int i = 0 ;i < 20 ; i++){
+            bs.schedule(topologies, cluster);
         }
-        Set<Integer> cachedPorts=new HashSet<>();
-        cachedPorts.add(1);
-        cachedPorts.add(2);
-        cachedPorts.add(3);
-        Assert.assertEquals(cachedPorts,bs.cachedSupervisors.get("sup-0"));
+        Set<Integer> cachedPorts = Sets.newHashSet(1, 2, 3);
+        Assert.assertEquals(cachedPorts, bs.cachedSupervisors.get("sup-0"));
     }
 }

Reply via email to