Repository: storm
Updated Branches:
  refs/heads/master 293643f09 -> 177cb95f4


STORM-2083 Blacklist scheduler

Resolved conflict via Jungtaek Lim <[email protected]>

This commit squashes the commits into one and commit messages were below:

1. add apache header
2. rename method
3. move config define to Config
4. code style fix
5. change debug log level to debug

remove all blacklist enable config

remove unused default value

1.storm blacklist code style, header and other bugs
2.wrap blacklist scheduler in nimbus and rebase to master

change blacklist-scheduler schedule method log level.

rename some variables and refactor badSlots args

add blacklist.scheduler to default.yaml

1. removeLongTimeDisappearFromCache bug fix
2. add unit test for removeLongTimeDisappearFromCache
3. change blacklistScheduler fields to protected so it can be visited from 
sub-class and unit tests

1. remove CircularBuffer and replace it with guava EvictingQueue.
2. modify nimbus_test.clj to adapt blacklistScheduler
3. comments, Utils.getInt, DefaultBlacklistStrategy.prepare with conf


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/25c27c6d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/25c27c6d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/25c27c6d

Branch: refs/heads/master
Commit: 25c27c6d3a3680c6c3cf4d4e0a0dc0a1c18b82e8
Parents: 9d5515a
Author: howard.li <[email protected]>
Authored: Thu Jul 7 18:32:30 2016 +0800
Committer: Jungtaek Lim <[email protected]>
Committed: Wed Sep 27 14:09:27 2017 +0900

----------------------------------------------------------------------
 conf/defaults.yaml                              |   6 +
 .../src/jvm/org/apache/storm/Config.java        |   6 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |  18 +-
 storm-server/pom.xml                            |   2 +-
 .../java/org/apache/storm/DaemonConfig.java     |  39 +++
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |   8 +-
 .../scheduler/blacklist/BlacklistScheduler.java | 257 +++++++++++++++
 .../apache/storm/scheduler/blacklist/Sets.java  |  58 ++++
 .../blacklist/reporters/IReporter.java          |  31 ++
 .../blacklist/reporters/LogReporter.java        |  40 +++
 .../strategies/DefaultBlacklistStrategy.java    | 165 ++++++++++
 .../strategies/IBlacklistStrategy.java          |  53 +++
 .../scheduler/blacklist/FaultGenerateUtils.java |  74 +++++
 .../blacklist/TestBlacklistScheduler.java       | 327 +++++++++++++++++++
 .../TestUtilsForBlacklistScheduler.java         | 263 +++++++++++++++
 15 files changed, 1333 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 16df356..1478490 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -274,6 +274,12 @@ topology.scheduler.strategy: 
"org.apache.storm.scheduler.resource.strategies.sch
 resource.aware.scheduler.eviction.strategy: 
"org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy"
 resource.aware.scheduler.priority.strategy: 
"org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy"
 
+blacklist.scheduler.tolerance.time.secs: 300
+blacklist.scheduler.tolerance.count: 3
+blacklist.scheduler.resume.time.secs: 1800
+blacklist.scheduler.reporter: 
"org.apache.storm.scheduler.blacklist.reporters.LogReporter"
+blacklist.scheduler.strategy: 
"org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy"
+
 dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
 
 pacemaker.servers: []

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java 
b/storm-client/src/jvm/org/apache/storm/Config.java
index 7149631..e296e8f 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1515,8 +1515,8 @@ public class Config extends HashMap<String, Object> {
     public static final String STORM_EXHIBITOR_PORT = "storm.exhibitor.port";
 
     /*
- * How often to poll Exhibitor cluster in millis.
- */
+     * How often to poll Exhibitor cluster in millis.
+     */
     @isString
     public static final String 
STORM_EXHIBITOR_URIPATH="storm.exhibitor.poll.uripath";
 
@@ -1532,7 +1532,7 @@ public class Config extends HashMap<String, Object> {
     @isInteger
     public static final String 
STORM_EXHIBITOR_RETRY_TIMES="storm.exhibitor.retry.times";
 
-    /**
+    /*
      * The interval between retries of an Exhibitor operation.
      */
     @isInteger

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj 
b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index dcea44e..38b5da0 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -55,8 +55,12 @@
 (def ^:dynamic *STORM-CONF* (clojurify-structure 
(ConfigUtils/readStormConfig)))
 
 (defn- mk-nimbus
-  [conf inimbus blob-store leader-elector group-mapper cluster-state]
-  (Nimbus. conf inimbus cluster-state nil blob-store nil leader-elector 
group-mapper))
+  ([conf inimbus]
+   (mk-nimbus conf inimbus nil nil nil nil))
+  ([conf inimbus blob-store leader-elector group-mapper cluster-state]
+    ;blacklist scheduler requires nimbus-monitor-freq-secs as input parameter.
+   (let [conf-with-nimbus-monitor-freq (merge {NIMBUS-MONITOR-FREQ-SECS 10} 
conf)]
+     (Nimbus. conf-with-nimbus-monitor-freq inimbus cluster-state nil 
blob-store nil leader-elector group-mapper))))
 
 (defn- from-json
        [^String str]
@@ -1635,7 +1639,7 @@
                   zk-le (MockedZookeeper. (proxy [Zookeeper] []
                           (zkLeaderElectorImpl [conf blob-store tc] nil)))
                   mocked-cluster (MockedCluster. cluster-utils)]
-          (Nimbus. auth-conf fake-inimbus)
+          (mk-nimbus auth-conf fake-inimbus)
           (.mkStormClusterStateImpl (Mockito/verify cluster-utils 
(Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))
           ))))
 
@@ -1895,7 +1899,7 @@
         hb-cache (into {}(map vector inactive-topos '(nil nil)))
         mock-state (mock-cluster-state)
         mock-blob-store (Mockito/mock BlobStore)
-        conf {}]
+        conf {NIMBUS-MONITOR-FREQ-SECS 10}]
     (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
                     (zkLeaderElectorImpl [conf blob-store tc] 
(MockLeaderElector. ))))]
       (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil 
mock-blob-store nil nil))]
@@ -1940,7 +1944,7 @@
         hb-cache {"topo1" nil "topo2" nil}
         mock-state (mock-cluster-state)
         mock-blob-store (Mockito/mock BlobStore)
-        conf {}]
+        conf {NIMBUS-MONITOR-FREQ-SECS 10}]
     (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
                     (zkLeaderElectorImpl [conf blob-store tc] 
(MockLeaderElector. ))))]
       (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil 
mock-blob-store nil nil))]
@@ -1975,7 +1979,7 @@
         mock-state (mock-cluster-state)
         mock-blob-store (Mockito/mock BlobStore)
         mock-tc (Mockito/mock TopoCache)
-        nimbus (Nimbus. {} nil mock-state nil mock-blob-store mock-tc 
(MockLeaderElector. ) nil)]
+        nimbus (Nimbus. {NIMBUS-MONITOR-FREQ-SECS 10} nil mock-state nil 
mock-blob-store mock-tc (MockLeaderElector. ) nil)]
     (let [supervisor1-topologies (clojurify-structure 
(Nimbus/topologiesOnSupervisor assignments "super1"))
           user1-topologies (clojurify-structure (.filterAuthorized nimbus 
"getTopology" supervisor1-topologies))
           supervisor2-topologies (clojurify-structure 
(Nimbus/topologiesOnSupervisor assignments "super2"))
@@ -1996,7 +2000,7 @@
         mock-state (mock-cluster-state)
         mock-blob-store (Mockito/mock BlobStore)
         mock-tc (Mockito/mock TopoCache)
-        nimbus (Nimbus. {} nil mock-state nil mock-blob-store mock-tc 
(MockLeaderElector. ) nil)]
+        nimbus (Nimbus. {NIMBUS-MONITOR-FREQ-SECS 10} nil mock-state nil 
mock-blob-store mock-tc (MockLeaderElector. ) nil)]
     (.thenReturn (Mockito/when (.readTopoConf mock-tc (Mockito/eq 
"authorized") (Mockito/anyObject))) {TOPOLOGY-NAME "authorized"})
     (.thenReturn (Mockito/when (.readTopoConf mock-tc (Mockito/eq "topo1") 
(Mockito/anyObject))) {TOPOLOGY-NAME "topo1"})
     (.setAuthorizationHandler nimbus (reify IAuthorizer (permit [this context 
operation topo-conf] (= "authorized" (get topo-conf TOPOLOGY-NAME)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index af95261..b57151c 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -136,7 +136,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>3626</maxAllowedViolations>
+                    <maxAllowedViolations>4000</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java 
b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 75a9cad..c5ee27a 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -18,6 +18,13 @@
 
 package org.apache.storm;
 
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
+import 
org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
+import 
org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
+import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
 import org.apache.storm.container.ResourceIsolationInterface;
 import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
 import 
org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
@@ -103,6 +110,38 @@ public class DaemonConfig implements Validated {
     public static final String STORM_SCHEDULER = "storm.scheduler";
 
     /**
+     * The number of seconds that the blacklist scheduler will concern of bad 
slots or supervisors
+     */
+    @isInteger
+    public static final String BLACKLIST_SCHEDULER_TOLERANCE_TIME = 
"blacklist.scheduler.tolerance.time.secs";
+
+    /**
+     * The number of hit count that will trigger blacklist in tolerance time
+     */
+    @isInteger
+    public static final String BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 
"blacklist.scheduler.tolerance.count";
+
+    /**
+     * The number of seconds that the blacklisted slots or supervisor will be 
resumed
+     */
+    @isInteger
+    public static final String BLACKLIST_SCHEDULER_RESUME_TIME = 
"blacklist.scheduler.resume.time.secs";
+
+    /**
+     * The class that the blacklist scheduler will report the blacklist
+     */
+    @NotNull
+    @isImplementationOfClass(implementsClass = IReporter.class)
+    public static final String BLACKLIST_SCHEDULER_REPORTER = 
"blacklist.scheduler.reporter";
+
+    /**
+     * The class that specifies the eviction strategy to use in blacklist 
scheduler
+     */
+    @NotNull
+    @isImplementationOfClass(implementsClass = IBlacklistStrategy.class)
+    public static final String BLACKLIST_SCHEDULER_STRATEGY = 
"blacklist.scheduler.strategy";
+
+    /**
      * Whether we want to display all the resource capacity and scheduled 
usage on the UI page.
      * You MUST have this variable set if you are using any kind of 
resource-related scheduler.
      *

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index d6b6ab6..7331906 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -147,6 +147,7 @@ import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
 import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
 import org.apache.storm.scheduler.Cluster.SupervisorResources;
+import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
 import org.apache.storm.scheduler.resource.ResourceUtils;
 import org.apache.storm.security.INimbusCredentialPlugin;
 import org.apache.storm.security.auth.AuthUtils;
@@ -472,8 +473,9 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             LOG.info("Using default scheduler");
             scheduler = new DefaultScheduler();
         }
-        scheduler.prepare(conf);
-        return scheduler;
+        BlacklistScheduler blacklistWrappedScheduler = new 
BlacklistScheduler(scheduler);
+        blacklistWrappedScheduler.prepare(conf);
+        return blacklistWrappedScheduler;
     }
 
     /**
@@ -3867,7 +3869,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             throw new RuntimeException(e);
         }
     }
-    
+
     @Override
     public NimbusSummary getLeader() throws AuthorizationException, TException 
{
         getLeaderCalls.mark();

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
new file mode 100644
index 0000000..0080134
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import com.google.common.collect.EvictingQueue;
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class BlacklistScheduler implements IScheduler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlacklistScheduler.class);
+    private final IScheduler underlyingScheduler;
+    @SuppressWarnings("rawtypes")
+    private Map _conf;
+
+    protected int toleranceTime;
+    protected int toleranceCount;
+    protected int resumeTime;
+    protected IReporter reporter;
+    protected IBlacklistStrategy blacklistStrategy;
+
+    protected int nimbusMonitorFreqSecs;
+
+    protected Map<String, Set<Integer>> cachedSupervisors;
+
+    //key is supervisor key ,value is supervisor ports
+    protected EvictingQueue<HashMap<String, Set<Integer>>> 
badSupervisorsToleranceSlidingWindow;
+    protected int windowSize;
+    protected Set<String> blacklistHost;
+
+    public BlacklistScheduler(IScheduler underlyingScheduler) {
+        this.underlyingScheduler = underlyingScheduler;
+    }
+
+    @Override
+    public void prepare(Map conf) {
+        LOG.info("prepare black list scheduler");
+        underlyingScheduler.prepare(conf);
+        _conf = conf;
+        if 
(_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME)) {
+            toleranceTime = ObjectReader.getInt( 
_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME));
+        }
+        if 
(_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)) {
+            toleranceCount = ObjectReader.getInt( 
_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT));
+        }
+        if (_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME)) {
+            resumeTime = ObjectReader.getInt( 
_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME));
+        }
+        String reporterClassName = 
_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) ? (String) 
_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) : 
"org.apache.storm.scheduler.blacklist.reporters.LogReporter" ;
+        try {
+            reporter = (IReporter) 
Class.forName(reporterClassName).newInstance();
+        } catch (ClassNotFoundException e) {
+            LOG.error("Can't find blacklist reporter for name {}", 
reporterClassName);
+            throw new RuntimeException(e);
+        } catch (InstantiationException e) {
+            LOG.error("Throw InstantiationException blacklist reporter for 
name {}", reporterClassName);
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            LOG.error("Throw illegalAccessException blacklist reporter for 
name {}", reporterClassName);
+            throw new RuntimeException(e);
+        }
+
+        String strategyClassName = 
_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY) ? (String) 
_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY) : 
"org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy";
+        try {
+            blacklistStrategy = (IBlacklistStrategy) 
Class.forName(strategyClassName).newInstance();
+        } catch (ClassNotFoundException e) {
+            LOG.error("Can't find blacklist strategy for name {}", 
strategyClassName);
+            throw new RuntimeException(e);
+        } catch (InstantiationException e) {
+            LOG.error("Throw InstantiationException blacklist strategy for 
name {}", strategyClassName);
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            LOG.error("Throw illegalAccessException blacklist strategy for 
name {}", strategyClassName);
+            throw new RuntimeException(e);
+        }
+
+        nimbusMonitorFreqSecs = ObjectReader.getInt( 
_conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
+        blacklistStrategy.prepare(_conf);
+
+        windowSize=toleranceTime / nimbusMonitorFreqSecs;
+        badSupervisorsToleranceSlidingWindow =EvictingQueue.create(windowSize);
+        cachedSupervisors = new HashMap<>();
+        blacklistHost = new HashSet<>();
+
+        
StormMetricsRegistry.registerGauge("nimbus:num-blacklisted-supervisor", new 
Callable<Integer>() {
+            @Override
+            public Integer call() throws Exception {
+                //nimbus:num-blacklisted-supervisor + none blacklisted 
supervisor = nimbus:num-supervisors
+                return blacklistHost.size();
+            }
+        });
+    }
+
+    @Override
+    public void schedule(Topologies topologies, Cluster cluster) {
+        LOG.debug("running Black List scheduler");
+        Map<String, SupervisorDetails> supervisors = cluster.getSupervisors();
+        LOG.debug("AssignableSlots: {}", cluster.getAssignableSlots());
+        LOG.debug("AvailableSlots: {}", cluster.getAvailableSlots());
+        LOG.debug("UsedSlots: {}", cluster.getUsedSlots());
+
+        blacklistStrategy.resumeFromBlacklist();
+        badSupervisors(supervisors);
+        cluster.setBlacklistedHosts(getBlacklistHosts(cluster, topologies));
+        removeLongTimeDisappearFromCache();
+
+        underlyingScheduler.schedule(topologies, cluster);
+    }
+
+    @Override
+    public Map<String, Object> config() {
+        return underlyingScheduler.config();
+    }
+
+    private void badSupervisors(Map<String, SupervisorDetails> supervisors) {
+        Set<String> cachedSupervisorsKeySet = cachedSupervisors.keySet();
+        Set<String> supervisorsKeySet = supervisors.keySet();
+
+        Set<String> badSupervisorKeys = 
Sets.difference(cachedSupervisorsKeySet, supervisorsKeySet);//cached supervisor 
doesn't show up
+        HashMap<String, Set<Integer>> badSupervisors = new HashMap<>();
+        for (String key : badSupervisorKeys) {
+            badSupervisors.put(key, cachedSupervisors.get(key));
+        }
+
+        for (Map.Entry<String, SupervisorDetails> entry : 
supervisors.entrySet()) {
+            String key = entry.getKey();
+            SupervisorDetails supervisorDetails = entry.getValue();
+            if (cachedSupervisors.containsKey(key)) {
+                Set<Integer> badSlots = badSlots(supervisorDetails, key);
+                if (badSlots.size() > 0) {//supervisor contains bad slots
+                    badSupervisors.put(key, badSlots);
+                }
+            } else {
+                cachedSupervisors.put(key, 
supervisorDetails.getAllPorts());//new supervisor to cache
+            }
+        }
+
+        badSupervisorsToleranceSlidingWindow.add(badSupervisors);
+    }
+
+    private Set<Integer> badSlots(SupervisorDetails supervisor, String 
supervisorKey) {
+        Set<Integer> cachedSupervisorPorts = 
cachedSupervisors.get(supervisorKey);
+        Set<Integer> supervisorPorts = supervisor.getAllPorts();
+
+        Set<Integer> newPorts = Sets.difference(supervisorPorts, 
cachedSupervisorPorts);
+        if (newPorts.size() > 0) {
+            //add new ports to cached supervisor
+            cachedSupervisors.put(supervisorKey, Sets.union(newPorts, 
cachedSupervisorPorts));
+        }
+
+        Set<Integer> badSlots = Sets.difference(cachedSupervisorPorts, 
supervisorPorts);
+        return badSlots;
+    }
+
+    public Set<String> getBlacklistHosts(Cluster cluster, Topologies 
topologies) {
+        Set<String> blacklistSet = blacklistStrategy.getBlacklist(new 
ArrayList<>(badSupervisorsToleranceSlidingWindow), cluster, topologies);
+        Set<String> blacklistHostSet = new HashSet<>();
+        for (String supervisor : blacklistSet) {
+            String host = cluster.getHost(supervisor);
+            if (host != null) {
+                blacklistHostSet.add(host);
+            } else {
+                LOG.info("supervisor {} is not alive know, do not need to add 
to blacklist.", supervisor);
+            }
+        }
+        this.blacklistHost = blacklistHostSet;
+        return blacklistHostSet;
+    }
+
+    /**
+     * supervisor or port never exits once in tolerance time will be removed 
from cache.
+     */
+    private void removeLongTimeDisappearFromCache() {
+
+        Map<String, Integer> supervisorCountMap = new HashMap<String, 
Integer>();
+        Map<WorkerSlot, Integer> slotCountMap = new HashMap<WorkerSlot, 
Integer>();
+
+        for (Map<String, Set<Integer>> item : 
badSupervisorsToleranceSlidingWindow) {
+            Set<String> supervisors = item.keySet();
+            for (String supervisor : supervisors) {
+                int supervisorCount = 0;
+                if (supervisorCountMap.containsKey(supervisor)) {
+                    supervisorCount = supervisorCountMap.get(supervisor);
+                }
+                Set<Integer> slots = item.get(supervisor);
+                if(slots.equals(cachedSupervisors.get(supervisor))){//only all 
slots are bad means supervisor is bad
+                    supervisorCountMap.put(supervisor, supervisorCount + 1);
+                }
+                for (Integer slot : slots) {
+                    int slotCount = 0;
+                    WorkerSlot workerSlot = new WorkerSlot(supervisor, slot);
+                    if (slotCountMap.containsKey(workerSlot)) {
+                        slotCount = slotCountMap.get(workerSlot);
+                    }
+                    slotCountMap.put(workerSlot, slotCount + 1);
+                }
+            }
+        }
+
+        for (Map.Entry<String, Integer> entry : supervisorCountMap.entrySet()) 
{
+            String key = entry.getKey();
+            int value = entry.getValue();
+            if (value == windowSize) {//supervisor never exits once in 
tolerance time will be removed from cache
+                cachedSupervisors.remove(key);
+                LOG.info("supervisor {} has never exited once during tolerance 
time, proberbly be dead forever, removed from cache.", key);
+            }
+        }
+
+        for (Map.Entry<WorkerSlot, Integer> entry : slotCountMap.entrySet()) {
+            WorkerSlot workerSlot = entry.getKey();
+            String supervisorKey = workerSlot.getNodeId();
+            Integer slot = workerSlot.getPort();
+            int value = entry.getValue();
+            if (value == windowSize) {//port never exits once in tolerance 
time will be removed from cache
+                Set<Integer> slots = cachedSupervisors.get(supervisorKey);
+                if (slots != null) {//slots will be null while supervisor has 
been removed from cached supervisors
+                    slots.remove(slot);
+                    cachedSupervisors.put(supervisorKey, slots);
+                }
+                LOG.info("slot {} has never exited once during tolerance time, 
proberbly be dead forever, removed from cache.", workerSlot);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
new file mode 100644
index 0000000..93c38cb
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class Sets {
+
+    public static <T> Set<T> union(Set<T> setA, Set<T> setB) {
+        Set<T> result = new HashSet<T>(setA);
+        result.addAll(setB);
+        return result;
+    }
+
+    public static <T> Set<T> intersection(Set<T> setA, Set<T> setB) {
+        Set<T> result = new HashSet<T>(setA);
+        result.retainAll(setB);
+        return result;
+    }
+
+    public static <T> Set<T> difference(Set<T> setA, Set<T> setB) {
+        Set<T> result = new HashSet<T>(setA);
+        result.removeAll(setB);
+        return result;
+    }
+
+    public static <T> Set<T> symDifference(Set<T> setA, Set<T> setB) {
+        Set<T> union = union(setA, setB);
+        Set<T> intersection = intersection(setA, setB);
+        return difference(union, intersection);
+    }
+
+    public static <T> boolean isSubset(Set<T> setA, Set<T> setB) {
+        return setB.containsAll(setA);
+    }
+
+
+    public static <T> boolean isSuperset(Set<T> setA, Set<T> setB) {
+        return setA.containsAll(setB);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
new file mode 100644
index 0000000..00dcaa4
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist.reporters;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * report blacklist to alert system
+ */
+public interface IReporter {
+    void report(String message);
+
+    void reportBlacklist(String supervisor, List<HashMap<String, 
Set<Integer>>> toleranceBuffer);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
new file mode 100644
index 0000000..d8b7679
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist.reporters;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+public class LogReporter implements IReporter {
+    private static Logger LOG = LoggerFactory.getLogger(LogReporter.class);
+
+    @Override
+    public void report(String message) {
+        LOG.warn(message);
+    }
+
+    @Override
+    public void reportBlacklist(String supervisor, List<HashMap<String, 
Set<Integer>>> toleranceBuffer) {
+        String message = "add supervisor " + supervisor + " to blacklist. The 
bad slot history of supervisors is :" + toleranceBuffer;
+        report(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
new file mode 100644
index 0000000..9b2e9b0
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist.strategies;
+
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+public class DefaultBlacklistStrategy implements IBlacklistStrategy {
+
+    private static Logger LOG = 
LoggerFactory.getLogger(DefaultBlacklistStrategy.class);
+
+    private IReporter _reporter;
+
+    private int _toleranceCount;
+    private int _resumeTime;
+    private int _nimbusMonitorFreqSecs;
+
+    private TreeMap<String, Integer> blacklist;
+
+    @Override
+    public void prepare(Map conf){
+        if 
(conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)) {
+            _toleranceCount = ObjectReader.getInt( 
conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT));
+        }
+        if (conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME)) {
+            _resumeTime = ObjectReader.getInt( 
conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME));
+        }
+        String reporterClassName = 
conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) ? (String) 
conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) : 
"org.apache.storm.scheduler.blacklist.reporters.LogReporter" ;
+        try {
+            _reporter = (IReporter) 
Class.forName(reporterClassName).newInstance();
+        } catch (ClassNotFoundException e) {
+            LOG.error("Can't find blacklist reporter for name {}", 
reporterClassName);
+            throw new RuntimeException(e);
+        } catch (InstantiationException e) {
+            LOG.error("Throw InstantiationException blacklist reporter for 
name {}", reporterClassName);
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            LOG.error("Throw illegalAccessException blacklist reporter for 
name {}", reporterClassName);
+            throw new RuntimeException(e);
+        }
+
+        _nimbusMonitorFreqSecs = ObjectReader.getInt( 
conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
+        blacklist = new TreeMap<>();
+    }
+
+    @Override
+    public Set<String> getBlacklist(List<HashMap<String, Set<Integer>>> 
supervisorsWithFailures, Cluster cluster, Topologies topologies) {
+        Map<String, Integer> countMap = new HashMap<String, Integer>();
+
+        for (Map<String, Set<Integer>> item : supervisorsWithFailures) {
+            Set<String> supervisors = item.keySet();
+            for (String supervisor : supervisors) {
+                int supervisorCount = 0;
+                if (countMap.containsKey(supervisor)) {
+                    supervisorCount = countMap.get(supervisor);
+                }
+                countMap.put(supervisor, supervisorCount + 1);
+            }
+        }
+        for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
+            String supervisor = entry.getKey();
+            int count = entry.getValue();
+            if (count >= _toleranceCount) {
+                if (!blacklist.containsKey(supervisor)) {// if not in 
blacklist then add it and set the resume time according to config
+                    LOG.info("add supervisor {} to blacklist", supervisor);
+                    LOG.info("supervisorsWithFailures : {}", 
supervisorsWithFailures);
+                    _reporter.reportBlacklist(supervisor, 
supervisorsWithFailures);
+                    blacklist.put(supervisor, _resumeTime / 
_nimbusMonitorFreqSecs);
+                }
+            }
+        }
+        releaseBlacklistWhenNeeded(cluster, topologies);
+        return blacklist.keySet();
+    }
+
+    public void resumeFromBlacklist() {
+        Set<String> readyToRemove = new HashSet<String>();
+        for (Map.Entry<String, Integer> entry : blacklist.entrySet()) {
+            String key = entry.getKey();
+            int value = entry.getValue() - 1;
+            if (value == 0) {
+                readyToRemove.add(key);
+            } else {
+                blacklist.put(key, value);
+            }
+        }
+        for (String key : readyToRemove) {
+            blacklist.remove(key);
+            LOG.info("supervisor {} reach the resume time ,removed from 
blacklist", key);
+        }
+    }
+
+    public void releaseBlacklistWhenNeeded(Cluster cluster, Topologies 
topologies) {
+        if (blacklist.size() > 0) {
+            int totalNeedNumWorkers = 0;
+            List<TopologyDetails> needSchedulingTopologies = 
cluster.needsSchedulingTopologies();
+            for (TopologyDetails topologyDetails : needSchedulingTopologies) {
+                int numWorkers = topologyDetails.getNumWorkers();
+                int assignedNumWorkers = 
cluster.getAssignedNumWorkers(topologyDetails);
+                int unAssignedNumWorkers = numWorkers - assignedNumWorkers;
+                totalNeedNumWorkers += unAssignedNumWorkers;
+            }
+            Map<String, SupervisorDetails> availableSupervisors = 
cluster.getSupervisors();
+            List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
+            int availableSlotsNotInBlacklistCount = 0;
+            for (WorkerSlot slot : availableSlots) {
+                if (!blacklist.containsKey(slot.getNodeId())) {
+                    availableSlotsNotInBlacklistCount += 1;
+                }
+            }
+            int shortage = totalNeedNumWorkers - 
availableSlotsNotInBlacklistCount;
+
+            if (shortage > 0) {
+                LOG.info("total needed num of workers :{}, available num of 
slots not in blacklist :{},num blacklist :{}, will release some blacklist."
+                        , totalNeedNumWorkers, 
availableSlotsNotInBlacklistCount, blacklist.size());
+                //release earliest blacklist
+                Set<String> readyToRemove = new HashSet<>();
+                for (String supervisor : blacklist.keySet()) {//blacklist is 
treeMap sorted by value, value minimum meas earliest
+                    if (availableSupervisors.containsKey(supervisor)) {
+                        Set<Integer> ports = 
cluster.getAvailablePorts(availableSupervisors.get(supervisor));
+                        readyToRemove.add(supervisor);
+                        shortage -= ports.size();
+                        if (shortage <= 0) {//released enough supervisor
+                            break;
+                        }
+                    }
+                }
+                for (String key : readyToRemove) {
+                    blacklist.remove(key);
+                    LOG.info("release supervisor {} for shortage of worker 
slots.", key);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
new file mode 100644
index 0000000..df79b8c
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist.strategies;
+
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Topologies;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public interface IBlacklistStrategy {
+
+    void prepare(Map conf);
+
+    /**
+     * Get blacklist by blacklist strategy
+     * @param badSupervisorsToleranceSlidingWindow bad supervisors buffered in 
sliding window
+     * @param cluster the cluster these topologies are running in. `cluster` 
contains everything user
+     *       need to develop a new scheduling logic. e.g. supervisors 
information, available slots, current
+     *       assignments for all the topologies etc. User can set the new 
assignment for topologies using
+     *       cluster.setAssignmentById()`
+     * @param topologies all the topologies in the cluster, some of them need 
schedule. Topologies object here
+     *       only contain static information about topologies. Information 
like assignments, slots are all in
+     *       the `cluster` object.
+     * @return blacklisted supervisors' id set
+     */
+    Set<String> getBlacklist(List<HashMap<String, Set<Integer>>> 
badSupervisorsToleranceSlidingWindow, Cluster cluster, Topologies topologies);
+
+    /**
+     * resume supervisors form blacklist. Blacklist is just a temporary list 
for supervisors,
+     * or there will be less and less available resources.
+     * This will be called every time before getBlacklist() and schedule.
+     */
+    void resumeFromBlacklist();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
 
b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
new file mode 100644
index 0000000..5b8bbe7
--- /dev/null
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.SchedulerAssignmentImpl;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FaultGenerateUtils {
+
+    public static List<Map<String, SupervisorDetails>> getSupervisorsList(int 
supervisorCount, int slotCount, int[][][] faults) {
+        List<Map<String, SupervisorDetails>> supervisorsList = new 
ArrayList<>(faults.length);
+        for (int[][] fault : faults) {
+            Map<String, SupervisorDetails> supervisors = 
TestUtilsForBlacklistScheduler.genSupervisors(supervisorCount, slotCount);
+            if (fault.length == 1 && fault[0][0] == -1) {
+                
TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supervisors, 
"sup-0");
+            }
+        }
+        return supervisorsList;
+    }
+
+    public static List<Map<String, SupervisorDetails>> getSupervisorsList(int 
supervisorCount, int slotCount, List<Map<Integer, List<Integer>>> faultList) {
+        List<Map<String, SupervisorDetails>> supervisorsList = new 
ArrayList<>(faultList.size());
+        for (Map<Integer, List<Integer>> faults : faultList) {
+            Map<String, SupervisorDetails> supervisors = 
TestUtilsForBlacklistScheduler.genSupervisors(supervisorCount, slotCount);
+            for (Map.Entry<Integer, List<Integer>> fault : faults.entrySet()) {
+                int supervisor = fault.getKey();
+                List<Integer> slots = fault.getValue();
+                if (slots.isEmpty()) {
+                    supervisors = 
TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supervisors, 
"sup-" + supervisor);
+                } else {
+                    for (int slot : slots) {
+                        supervisors = 
TestUtilsForBlacklistScheduler.removePortFromSupervisors(supervisors, "sup-" + 
supervisor, slot);
+                    }
+                }
+            }
+            supervisorsList.add(supervisors);
+        }
+        return supervisorsList;
+    }
+
+    public static Cluster nextCluster(Cluster cluster, Map<String, 
SupervisorDetails> supervisors, INimbus iNimbus, Map config,
+                                      Topologies topologies) {
+        Map<String, SchedulerAssignmentImpl> assignment;
+        if (cluster == null) {
+            assignment = new HashMap<>();
+        } else {
+            assignment = 
TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments());
+        }
+        return new Cluster(iNimbus, supervisors, assignment, topologies, 
config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
 
b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
new file mode 100644
index 0000000..049ef68
--- /dev/null
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.DefaultScheduler;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.SchedulerAssignmentImpl;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.List;
+import java.util.ArrayList;
+
+public class TestBlacklistScheduler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestBlacklistScheduler.class);
+
+    private static int currentTime = 1468216504;
+
+    @Test
+    public void TestBadSupervisor() {
+        INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+
+        Map<String, SupervisorDetails> supMap = 
TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
+
+        TopologyDetails topo1 = 
TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, true);
+        topoMap.put(topo1.getId(), topo1);
+
+        Topologies topologies = new Topologies(topoMap);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), topologies, config);
+        BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler());
+        bs.prepare(config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, 
TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, 
"sup-0"), 
TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), 
topologies, config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, 
TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, 
"sup-0"), 
TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), 
topologies, config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, supMap, 
TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), 
topologies, config);
+        bs.schedule(topologies, cluster);
+        Set<String> hosts = new HashSet<>();
+        hosts.add("host-0");
+        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+    }
+
+    @Test
+    public void TestBadSlot() {
+        INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+
+        Map<String, SupervisorDetails> supMap = 
TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
+
+        TopologyDetails topo1 = 
TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, true);
+        topoMap.put(topo1.getId(), topo1);
+
+        Topologies topologies = new Topologies(topoMap);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), topologies, config);
+        BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler());
+        bs.prepare(config);
+        bs.schedule(topologies, cluster);
+
+        cluster = new Cluster(iNimbus, 
TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap, "sup-0", 0), 
TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), 
topologies, config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, 
TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap, "sup-0", 0), 
TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), 
topologies, config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), topologies, config);
+        bs.schedule(topologies, cluster);
+        Set<String> hosts = new HashSet<>();
+        hosts.add("host-0");
+        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+    }
+
+    @Test
+    public void TestResumeBlacklist() {
+        INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+
+        Map<String, SupervisorDetails> supMap = 
TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
+
+        TopologyDetails topo1 = 
TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, true);
+        topoMap.put(topo1.getId(), topo1);
+
+        Topologies topologies = new Topologies(topoMap);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), topologies, config);
+        BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler());
+        bs.prepare(config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, 
TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, 
"sup-0"), 
TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), 
topologies, config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, 
TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, 
"sup-0"), 
TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), 
topologies, config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), topologies, config);
+        bs.schedule(topologies, cluster);
+        Set<String> hosts = new HashSet<>();
+        hosts.add("host-0");
+        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        for (int i = 0; i < 300 / 10 - 2; i++) {
+            bs.schedule(topologies, cluster);
+        }
+        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        bs.schedule(topologies, cluster);
+        hosts.clear();
+        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+    }
+
+    @Test
+    public void TestReleaseBlacklist() {
+        INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+
+        Map<String, SupervisorDetails> supMap = 
TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
+
+        TopologyDetails topo1 = 
TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, true);
+        TopologyDetails topo2 = 
TestUtilsForBlacklistScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8, true);
+        TopologyDetails topo3 = 
TestUtilsForBlacklistScheduler.getTopology("topo-3", config, 5, 15, 1, 1, 
currentTime - 16, true);
+        TopologyDetails topo4 = 
TestUtilsForBlacklistScheduler.getTopology("topo-4", config, 5, 15, 1, 1, 
currentTime - 32, true);
+        topoMap.put(topo1.getId(), topo1);
+
+        Topologies topologies = new Topologies(topoMap);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), topologies, config);
+        BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler());
+        bs.prepare(config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, 
TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, 
"sup-0"), 
TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), 
topologies, config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, 
TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, 
"sup-0"), 
TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), 
topologies, config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, supMap, 
TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), 
topologies, config);
+        bs.schedule(topologies, cluster);
+        Set<String> hosts = new HashSet<>();
+        hosts.add("host-0");
+        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        topoMap.put(topo2.getId(), topo2);
+        topoMap.put(topo3.getId(), topo3);
+        topoMap.put(topo4.getId(), topo4);
+        topologies = new Topologies(topoMap);
+        cluster = new Cluster(iNimbus, supMap, 
TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), 
topologies, config);
+        bs.schedule(topologies, cluster);
+        hosts.clear();
+        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+    }
+
+    @Test
+    public void TestList() {
+        INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, 
TopologyDetails>();
+        TopologyDetails topo1 = 
TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, true);
+        TopologyDetails topo2 = 
TestUtilsForBlacklistScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 2, true);
+        topoMap.put(topo1.getId(), topo1);
+        topoMap.put(topo2.getId(), topo2);
+        Topologies topologies = new Topologies(topoMap);
+        BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler());
+        bs.prepare(config);
+
+        List<Map<Integer, List<Integer>>> faultList = new ArrayList<>();
+
+        faultList.add(new HashMap<Integer, List<Integer>>());
+        faultList.add((Map) ImmutableMap.of(0, ImmutableList.of(0, 1)));
+        faultList.add((Map) ImmutableMap.of(0, new ArrayList<>()));
+        for (int i = 0; i < 17; i++) {
+            faultList.add(new HashMap<Integer, List<Integer>>());
+        }
+        faultList.add((Map) ImmutableMap.of(0, ImmutableList.of(0, 1)));
+        faultList.add((Map) ImmutableMap.of(1, ImmutableList.of(1)));
+        for (int i = 0; i < 8; i++) {
+            faultList.add(new HashMap<Integer, List<Integer>>());
+        }
+        faultList.add((Map) ImmutableMap.of(0, ImmutableList.of(1)));
+        faultList.add((Map) ImmutableMap.of(1, ImmutableList.of(1)));
+        for (int i = 0; i < 30; i++) {
+            faultList.add(new HashMap<Integer, List<Integer>>());
+        }
+
+        List<Map<String, SupervisorDetails>> supervisorsList = 
FaultGenerateUtils.getSupervisorsList(3, 4, faultList);
+        Cluster cluster = null;
+        int count = 0;
+        for (Map<String, SupervisorDetails> supervisors : supervisorsList) {
+            cluster = FaultGenerateUtils.nextCluster(cluster, supervisors, 
iNimbus, config, topologies);
+            bs.schedule(topologies, cluster);
+            if (count == 0) {
+                Set<String> hosts = new HashSet<>();
+                Assert.assertEquals("blacklist", hosts, 
cluster.getBlacklistedHosts());
+            } else if (count == 2) {
+                Set<String> hosts = new HashSet<>();
+                Assert.assertEquals("blacklist", hosts, 
cluster.getBlacklistedHosts());
+            } else if (count == 3) {
+                Set<String> hosts = new HashSet<>();
+                hosts.add("host-0");
+                Assert.assertEquals("blacklist", hosts, 
cluster.getBlacklistedHosts());
+            } else if (count == 30) {
+                Set<String> hosts = new HashSet<>();
+                hosts.add("host-0");
+                Assert.assertEquals("blacklist", hosts, 
cluster.getBlacklistedHosts());
+            } else if (count == 31) {
+                Set<String> hosts = new HashSet<>();
+                hosts.add("host-0");
+                hosts.add("host-1");
+                Assert.assertEquals("blacklist", hosts, 
cluster.getBlacklistedHosts());
+            } else if (count == 32) {
+                Set<String> hosts = new HashSet<>();
+                hosts.add("host-0");
+                hosts.add("host-1");
+                Assert.assertEquals("blacklist", hosts, 
cluster.getBlacklistedHosts());
+            } else if (count == 60) {
+                Set<String> hosts = new HashSet<>();
+                hosts.add("host-0");
+                hosts.add("host-1");
+                Assert.assertEquals("blacklist", hosts, 
cluster.getBlacklistedHosts());
+            } else if (count == 61) {
+                Set<String> hosts = new HashSet<>();
+                hosts.add("host-0");
+                Assert.assertEquals("blacklist", hosts, 
cluster.getBlacklistedHosts());
+            } else if (count == 62) {
+                Set<String> hosts = new HashSet<>();
+                Assert.assertEquals("blacklist", hosts, 
cluster.getBlacklistedHosts());
+            }
+            count++;
+        }
+
+    }
+
+    @Test
+    public void removeLongTimeDisappearFromCache(){
+        INimbus iNimbus=new TestUtilsForBlacklistScheduler.INimbusTest();
+
+        Map<String, SupervisorDetails> 
supMap=TestUtilsForBlacklistScheduler.genSupervisors(3,4);
+
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME,200);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT,2);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME,300);
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+
+        TopologyDetails topo1 = 
TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2,true);
+        topoMap.put(topo1.getId(), topo1);
+
+        Topologies topologies = new Topologies(topoMap);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), topologies, config);
+        BlacklistScheduler bs=new BlacklistScheduler(new DefaultScheduler());
+        bs.prepare(config);
+        bs.schedule(topologies,cluster);
+        cluster = new Cluster(iNimbus, 
TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap,"sup-0"),TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()),
 topologies, config);
+        for(int i=0;i<20;i++){
+            bs.schedule(topologies,cluster);
+        }
+        Set<String> cached=new HashSet<>();
+        cached.add("sup-1");
+        cached.add("sup-2");
+        Assert.assertEquals(cached,bs.cachedSupervisors.keySet());
+        cluster = new Cluster(iNimbus, supMap, new HashMap<String, 
SchedulerAssignmentImpl>(), topologies, config);
+        bs.schedule(topologies,cluster);
+        cluster = new Cluster(iNimbus, 
TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap,"sup-0",0),TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()),
 topologies, config);
+        for(int i=0;i<20;i++){
+            bs.schedule(topologies,cluster);
+        }
+        Set<Integer> cachedPorts=new HashSet<>();
+        cachedPorts.add(1);
+        cachedPorts.add(2);
+        cachedPorts.add(3);
+        Assert.assertEquals(cachedPorts,bs.cachedSupervisors.get("sup-0"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
 
b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
new file mode 100644
index 0000000..71055b0
--- /dev/null
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.SchedulerAssignmentImpl;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.SpoutDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+
+public class TestUtilsForBlacklistScheduler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestUtilsForBlacklistScheduler.class);
+
+    public static Map<String, SupervisorDetails> 
removeSupervisorFromSupervisors(Map<String, SupervisorDetails> 
supervisorDetailsMap, String supervisor) {
+        Map<String, SupervisorDetails> retList = new HashMap<String, 
SupervisorDetails>();
+        retList.putAll(supervisorDetailsMap);
+        retList.remove(supervisor);
+        return retList;
+    }
+
+    public static Map<String, SupervisorDetails> 
removePortFromSupervisors(Map<String, SupervisorDetails> supervisorDetailsMap, 
String supervisor, int port) {
+        Map<String, SupervisorDetails> retList = new HashMap<String, 
SupervisorDetails>();
+        for (Map.Entry<String, SupervisorDetails> supervisorDetailsEntry : 
supervisorDetailsMap.entrySet()) {
+            String supervisorKey = supervisorDetailsEntry.getKey();
+            SupervisorDetails supervisorDetails = 
supervisorDetailsEntry.getValue();
+            Set<Integer> ports = new HashSet<>();
+            ports.addAll(supervisorDetails.getAllPorts());
+            if (supervisorKey.equals(supervisor)) {
+                ports.remove(port);
+            }
+            SupervisorDetails sup = new 
SupervisorDetails(supervisorDetails.getId(), supervisorDetails.getHost(), null, 
(HashSet) ports, null);
+            retList.put(sup.getId(), sup);
+        }
+        return retList;
+    }
+
+    public static Map<String, SupervisorDetails> genSupervisors(int numSup, 
int numPorts) {
+        Map<String, SupervisorDetails> retList = new HashMap<String, 
SupervisorDetails>();
+        for (int i = 0; i < numSup; i++) {
+            List<Number> ports = new LinkedList<Number>();
+            for (int j = 0; j < numPorts; j++) {
+                ports.add(j);
+            }
+            SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" 
+ i, null, ports, null);
+            retList.put(sup.getId(), sup);
+        }
+        return retList;
+    }
+
+
+    public static TopologyDetails getTopology(String name, Map config, int 
numSpout, int numBolt,
+                                              int spoutParallelism, int 
boltParallelism, int launchTime, boolean blacklistEnable) {
+
+        Config conf = new Config();
+        conf.putAll(config);
+        conf.put(Config.TOPOLOGY_NAME, name);
+        StormTopology topology = buildTopology(numSpout, numBolt, 
spoutParallelism, boltParallelism);
+        TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, 
conf, topology,
+                3, genExecsAndComps(topology, spoutParallelism, 
boltParallelism), launchTime, "user");
+        return topo;
+    }
+
+    public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology 
topology, int spoutParallelism, int boltParallelism) {
+        Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, 
String>();
+        int startTask = 0;
+        int endTask = 1;
+        for (Map.Entry<String, SpoutSpec> entry : 
topology.get_spouts().entrySet()) {
+            for (int i = 0; i < spoutParallelism; i++) {
+                retMap.put(new ExecutorDetails(startTask, endTask), 
entry.getKey());
+                startTask++;
+                endTask++;
+            }
+        }
+
+        for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
+            for (int i = 0; i < boltParallelism; i++) {
+                retMap.put(new ExecutorDetails(startTask, endTask), 
entry.getKey());
+                startTask++;
+                endTask++;
+            }
+        }
+        return retMap;
+    }
+
+    public static StormTopology buildTopology(int numSpout, int numBolt,
+                                              int spoutParallelism, int 
boltParallelism) {
+        LOG.debug("buildTopology with -> numSpout: " + numSpout + " 
spoutParallelism: "
+                + spoutParallelism + " numBolt: "
+                + numBolt + " boltParallelism: " + boltParallelism);
+        TopologyBuilder builder = new TopologyBuilder();
+
+        for (int i = 0; i < numSpout; i++) {
+            SpoutDeclarer s1 = builder.setSpout("spout-" + i, new TestSpout(),
+                    spoutParallelism);
+        }
+        int j = 0;
+        for (int i = 0; i < numBolt; i++) {
+            if (j >= numSpout) {
+                j = 0;
+            }
+            BoltDeclarer b1 = builder.setBolt("bolt-" + i, new TestBolt(),
+                    boltParallelism).shuffleGrouping("spout-" + j);
+        }
+
+        return builder.createTopology();
+    }
+
+    public static class TestSpout extends BaseRichSpout {
+        boolean _isDistributed;
+        SpoutOutputCollector _collector;
+
+        public TestSpout() {
+            this(true);
+        }
+
+        public TestSpout(boolean isDistributed) {
+            _isDistributed = isDistributed;
+        }
+
+        public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+            _collector = collector;
+        }
+
+        public void close() {
+        }
+
+        public void nextTuple() {
+            Utils.sleep(100);
+            final String[] words = new String[]{"nathan", "mike", "jackson", 
"golda", "bertels"};
+            final Random rand = new Random();
+            final String word = words[rand.nextInt(words.length)];
+            _collector.emit(new Values(word));
+        }
+
+        public void ack(Object msgId) {
+        }
+
+        public void fail(Object msgId) {
+        }
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+
+        @Override
+        public Map<String, Object> getComponentConfiguration() {
+            if (!_isDistributed) {
+                Map<String, Object> ret = new HashMap<String, Object>();
+                ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
+                return ret;
+            } else {
+                return null;
+            }
+        }
+    }
+
+    public static class TestBolt extends BaseRichBolt {
+        OutputCollector _collector;
+
+        @Override
+        public void prepare(Map conf, TopologyContext context,
+                            OutputCollector collector) {
+            _collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple tuple) {
+            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+    }
+
+    public static class INimbusTest implements INimbus {
+        @Override
+        public void prepare(Map stormConf, String schedulerLocalDir) {
+
+        }
+
+        @Override
+        public Collection<WorkerSlot> 
allSlotsAvailableForScheduling(Collection<SupervisorDetails> 
existingSupervisors, Topologies topologies, Set<String> 
topologiesMissingAssignments) {
+            return null;
+        }
+
+        @Override
+        public void assignSlots(Topologies topologies, Map<String, 
Collection<WorkerSlot>> newSlotsByTopologyId) {
+
+        }
+
+        @Override
+        public String getHostName(Map<String, SupervisorDetails> 
existingSupervisors, String nodeId) {
+            if (existingSupervisors.containsKey(nodeId)) {
+                return existingSupervisors.get(nodeId).getHost();
+            }
+            return null;
+        }
+
+        @Override
+        public IScheduler getForcedScheduler() {
+            return null;
+        }
+    }
+
+    public static Map<String, SchedulerAssignmentImpl> 
assignmentMapToImpl(Map<String, SchedulerAssignment> assignmentMap) {
+        Map<String, SchedulerAssignmentImpl> impl = new HashMap<>();
+        for (Map.Entry<String, SchedulerAssignment> entry : 
assignmentMap.entrySet()) {
+            impl.put(entry.getKey(), (SchedulerAssignmentImpl) 
entry.getValue());
+        }
+        return impl;
+    }
+}

Reply via email to