STORM-2083 Blacklist scheduler * address review comments from @HeartSaVioR and @revans2
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a7b86c32 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a7b86c32 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a7b86c32 Branch: refs/heads/master Commit: a7b86c328ad56adf7e7259aeb2a9bcc1ece88b4f Parents: 25c27c6 Author: Jungtaek Lim <[email protected]> Authored: Mon Sep 25 22:16:48 2017 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Wed Sep 27 14:09:31 2017 +0900 ---------------------------------------------------------------------- .../scheduler/blacklist/BlacklistScheduler.java | 113 +++++++++---------- .../blacklist/reporters/IReporter.java | 4 +- .../blacklist/reporters/LogReporter.java | 8 +- .../strategies/DefaultBlacklistStrategy.java | 68 +++++------ .../strategies/IBlacklistStrategy.java | 2 +- .../scheduler/blacklist/FaultGenerateUtils.java | 11 -- .../blacklist/TestBlacklistScheduler.java | 67 +++++------ 7 files changed, 123 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java index 0080134..a05d814 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java @@ -18,7 +18,6 @@ package org.apache.storm.scheduler.blacklist; import com.google.common.collect.EvictingQueue; -import org.apache.storm.Config; import org.apache.storm.DaemonConfig; import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.scheduler.Cluster; @@ -27,9 +26,10 @@ import org.apache.storm.scheduler.SupervisorDetails; import org.apache.storm.scheduler.Topologies; import org.apache.storm.scheduler.WorkerSlot; import org.apache.storm.scheduler.blacklist.reporters.IReporter; +import org.apache.storm.scheduler.blacklist.reporters.LogReporter; +import org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy; import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy; import org.apache.storm.utils.ObjectReader; -import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +42,11 @@ import java.util.concurrent.Callable; public class BlacklistScheduler implements IScheduler { private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class); + + public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800; + public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3; + public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME = 300; + private final IScheduler underlyingScheduler; @SuppressWarnings("rawtypes") private Map _conf; @@ -67,51 +72,27 @@ public class BlacklistScheduler implements IScheduler { @Override public void prepare(Map conf) { - LOG.info("prepare black list scheduler"); + LOG.info("Preparing black list scheduler"); underlyingScheduler.prepare(conf); _conf = conf; - if (_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME)) { - toleranceTime = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME)); - } - if (_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)) { - toleranceCount = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)); - } - if (_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME)) { - resumeTime = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME)); - } - String reporterClassName = _conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) ? (String) _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) : "org.apache.storm.scheduler.blacklist.reporters.LogReporter" ; - try { - reporter = (IReporter) Class.forName(reporterClassName).newInstance(); - } catch (ClassNotFoundException e) { - LOG.error("Can't find blacklist reporter for name {}", reporterClassName); - throw new RuntimeException(e); - } catch (InstantiationException e) { - LOG.error("Throw InstantiationException blacklist reporter for name {}", reporterClassName); - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - LOG.error("Throw illegalAccessException blacklist reporter for name {}", reporterClassName); - throw new RuntimeException(e); - } - String strategyClassName = _conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY) ? (String) _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY) : "org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy"; - try { - blacklistStrategy = (IBlacklistStrategy) Class.forName(strategyClassName).newInstance(); - } catch (ClassNotFoundException e) { - LOG.error("Can't find blacklist strategy for name {}", strategyClassName); - throw new RuntimeException(e); - } catch (InstantiationException e) { - LOG.error("Throw InstantiationException blacklist strategy for name {}", strategyClassName); - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - LOG.error("Throw illegalAccessException blacklist strategy for name {}", strategyClassName); - throw new RuntimeException(e); - } + toleranceTime = ObjectReader.getInt(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME); + toleranceCount = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT); + resumeTime = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME); + + String reporterClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER), + LogReporter.class.getName()); + reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter"); + + String strategyClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY), + DefaultBlacklistStrategy.class.getName()); + blacklistStrategy = (IBlacklistStrategy) initializeInstance(strategyClassName, "blacklist strategy"); nimbusMonitorFreqSecs = ObjectReader.getInt( _conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)); blacklistStrategy.prepare(_conf); - windowSize=toleranceTime / nimbusMonitorFreqSecs; - badSupervisorsToleranceSlidingWindow =EvictingQueue.create(windowSize); + windowSize = toleranceTime / nimbusMonitorFreqSecs; + badSupervisorsToleranceSlidingWindow = EvictingQueue.create(windowSize); cachedSupervisors = new HashMap<>(); blacklistHost = new HashSet<>(); @@ -134,7 +115,9 @@ public class BlacklistScheduler implements IScheduler { blacklistStrategy.resumeFromBlacklist(); badSupervisors(supervisors); - cluster.setBlacklistedHosts(getBlacklistHosts(cluster, topologies)); + Set<String> blacklistHosts = getBlacklistHosts(cluster, topologies); + this.blacklistHost = blacklistHosts; + cluster.setBlacklistedHosts(blacklistHosts); removeLongTimeDisappearFromCache(); underlyingScheduler.schedule(topologies, cluster); @@ -149,7 +132,7 @@ public class BlacklistScheduler implements IScheduler { Set<String> cachedSupervisorsKeySet = cachedSupervisors.keySet(); Set<String> supervisorsKeySet = supervisors.keySet(); - Set<String> badSupervisorKeys = Sets.difference(cachedSupervisorsKeySet, supervisorsKeySet);//cached supervisor doesn't show up + Set<String> badSupervisorKeys = Sets.difference(cachedSupervisorsKeySet, supervisorsKeySet); //cached supervisor doesn't show up HashMap<String, Set<Integer>> badSupervisors = new HashMap<>(); for (String key : badSupervisorKeys) { badSupervisors.put(key, cachedSupervisors.get(key)); @@ -160,11 +143,11 @@ public class BlacklistScheduler implements IScheduler { SupervisorDetails supervisorDetails = entry.getValue(); if (cachedSupervisors.containsKey(key)) { Set<Integer> badSlots = badSlots(supervisorDetails, key); - if (badSlots.size() > 0) {//supervisor contains bad slots + if (badSlots.size() > 0) { //supervisor contains bad slots badSupervisors.put(key, badSlots); } } else { - cachedSupervisors.put(key, supervisorDetails.getAllPorts());//new supervisor to cache + cachedSupervisors.put(key, supervisorDetails.getAllPorts()); //new supervisor to cache } } @@ -185,7 +168,7 @@ public class BlacklistScheduler implements IScheduler { return badSlots; } - public Set<String> getBlacklistHosts(Cluster cluster, Topologies topologies) { + private Set<String> getBlacklistHosts(Cluster cluster, Topologies topologies) { Set<String> blacklistSet = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow), cluster, topologies); Set<String> blacklistHostSet = new HashSet<>(); for (String supervisor : blacklistSet) { @@ -193,10 +176,9 @@ public class BlacklistScheduler implements IScheduler { if (host != null) { blacklistHostSet.add(host); } else { - LOG.info("supervisor {} is not alive know, do not need to add to blacklist.", supervisor); + LOG.info("supervisor {} is not alive, do not need to add to blacklist.", supervisor); } } - this.blacklistHost = blacklistHostSet; return blacklistHostSet; } @@ -211,20 +193,14 @@ public class BlacklistScheduler implements IScheduler { for (Map<String, Set<Integer>> item : badSupervisorsToleranceSlidingWindow) { Set<String> supervisors = item.keySet(); for (String supervisor : supervisors) { - int supervisorCount = 0; - if (supervisorCountMap.containsKey(supervisor)) { - supervisorCount = supervisorCountMap.get(supervisor); - } + int supervisorCount = supervisorCountMap.getOrDefault(supervisor, 0); Set<Integer> slots = item.get(supervisor); - if(slots.equals(cachedSupervisors.get(supervisor))){//only all slots are bad means supervisor is bad + if (slots.equals(cachedSupervisors.get(supervisor))) { // treat supervisor as bad only if all slots are bad supervisorCountMap.put(supervisor, supervisorCount + 1); } for (Integer slot : slots) { - int slotCount = 0; WorkerSlot workerSlot = new WorkerSlot(supervisor, slot); - if (slotCountMap.containsKey(workerSlot)) { - slotCount = slotCountMap.get(workerSlot); - } + int slotCount = slotCountMap.getOrDefault(workerSlot, 0); slotCountMap.put(workerSlot, slotCount + 1); } } @@ -233,9 +209,9 @@ public class BlacklistScheduler implements IScheduler { for (Map.Entry<String, Integer> entry : supervisorCountMap.entrySet()) { String key = entry.getKey(); int value = entry.getValue(); - if (value == windowSize) {//supervisor never exits once in tolerance time will be removed from cache + if (value == windowSize) { // supervisor which was never back to normal in tolerance period will be removed from cache cachedSupervisors.remove(key); - LOG.info("supervisor {} has never exited once during tolerance time, proberbly be dead forever, removed from cache.", key); + LOG.info("Supervisor {} was never back to normal during tolerance period, probably dead. Will remove from cache.", key); } } @@ -244,14 +220,29 @@ public class BlacklistScheduler implements IScheduler { String supervisorKey = workerSlot.getNodeId(); Integer slot = workerSlot.getPort(); int value = entry.getValue(); - if (value == windowSize) {//port never exits once in tolerance time will be removed from cache + if (value == windowSize) { // worker slot which was never back to normal in tolerance period will be removed from cache Set<Integer> slots = cachedSupervisors.get(supervisorKey); - if (slots != null) {//slots will be null while supervisor has been removed from cached supervisors + if (slots != null) { // slots will be null while supervisor has been removed from cached supervisors slots.remove(slot); cachedSupervisors.put(supervisorKey, slots); } - LOG.info("slot {} has never exited once during tolerance time, proberbly be dead forever, removed from cache.", workerSlot); + LOG.info("Worker slot {} was never back to normal during tolerance period, probably dead. Will be removed from cache.", workerSlot); } } } + + private Object initializeInstance(String className, String representation) { + try { + return Class.forName(className).newInstance(); + } catch (ClassNotFoundException e) { + LOG.error("Can't find {} for name {}", representation, className); + throw new RuntimeException(e); + } catch (InstantiationException e) { + LOG.error("Throw InstantiationException {} for name {}", representation, className); + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + LOG.error("Throw IllegalAccessException {} for name {}", representation, className); + throw new RuntimeException(e); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java index 00dcaa4..781c37a 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java @@ -17,8 +17,8 @@ */ package org.apache.storm.scheduler.blacklist.reporters; -import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -27,5 +27,5 @@ import java.util.Set; public interface IReporter { void report(String message); - void reportBlacklist(String supervisor, List<HashMap<String, Set<Integer>>> toleranceBuffer); + void reportBlacklist(String supervisor, List<Map<String, Set<Integer>>> toleranceBuffer); } http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java index d8b7679..94cfebd 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java @@ -20,8 +20,8 @@ package org.apache.storm.scheduler.blacklist.reporters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; public class LogReporter implements IReporter { @@ -33,8 +33,8 @@ public class LogReporter implements IReporter { } @Override - public void reportBlacklist(String supervisor, List<HashMap<String, Set<Integer>>> toleranceBuffer) { - String message = "add supervisor " + supervisor + " to blacklist. The bad slot history of supervisors is :" + toleranceBuffer; - report(message); + public void reportBlacklist(String supervisor, List<Map<String, Set<Integer>>> toleranceBuffer) { + LOG.warn("add supervisor {} to blacklist. The bad slot history of supervisors is : {}", + supervisor, toleranceBuffer); } } http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java index 9b2e9b0..cc7f403 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java @@ -24,6 +24,7 @@ import org.apache.storm.scheduler.Topologies; import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.WorkerSlot; import org.apache.storm.scheduler.blacklist.reporters.IReporter; +import org.apache.storm.scheduler.blacklist.reporters.LogReporter; import org.apache.storm.utils.ObjectReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,9 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy { private static Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class); + public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800; + public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3; + private IReporter _reporter; private int _toleranceCount; @@ -49,41 +53,25 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy { @Override public void prepare(Map conf){ - if (conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)) { - _toleranceCount = ObjectReader.getInt( conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)); - } - if (conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME)) { - _resumeTime = ObjectReader.getInt( conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME)); - } - String reporterClassName = conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) ? (String) conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) : "org.apache.storm.scheduler.blacklist.reporters.LogReporter" ; - try { - _reporter = (IReporter) Class.forName(reporterClassName).newInstance(); - } catch (ClassNotFoundException e) { - LOG.error("Can't find blacklist reporter for name {}", reporterClassName); - throw new RuntimeException(e); - } catch (InstantiationException e) { - LOG.error("Throw InstantiationException blacklist reporter for name {}", reporterClassName); - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - LOG.error("Throw illegalAccessException blacklist reporter for name {}", reporterClassName); - throw new RuntimeException(e); - } + _toleranceCount = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT); + _resumeTime = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME); + + String reporterClassName = ObjectReader.getString(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER), + LogReporter.class.getName()); + _reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter"); - _nimbusMonitorFreqSecs = ObjectReader.getInt( conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)); + _nimbusMonitorFreqSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)); blacklist = new TreeMap<>(); } @Override - public Set<String> getBlacklist(List<HashMap<String, Set<Integer>>> supervisorsWithFailures, Cluster cluster, Topologies topologies) { + public Set<String> getBlacklist(List<Map<String, Set<Integer>>> supervisorsWithFailures, Cluster cluster, Topologies topologies) { Map<String, Integer> countMap = new HashMap<String, Integer>(); for (Map<String, Set<Integer>> item : supervisorsWithFailures) { Set<String> supervisors = item.keySet(); for (String supervisor : supervisors) { - int supervisorCount = 0; - if (countMap.containsKey(supervisor)) { - supervisorCount = countMap.get(supervisor); - } + int supervisorCount = countMap.getOrDefault(supervisor, 0); countMap.put(supervisor, supervisorCount + 1); } } @@ -91,9 +79,9 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy { String supervisor = entry.getKey(); int count = entry.getValue(); if (count >= _toleranceCount) { - if (!blacklist.containsKey(supervisor)) {// if not in blacklist then add it and set the resume time according to config - LOG.info("add supervisor {} to blacklist", supervisor); - LOG.info("supervisorsWithFailures : {}", supervisorsWithFailures); + if (!blacklist.containsKey(supervisor)) { // if not in blacklist then add it and set the resume time according to config + LOG.debug("add supervisor {} to blacklist", supervisor); + LOG.debug("supervisorsWithFailures : {}", supervisorsWithFailures); _reporter.reportBlacklist(supervisor, supervisorsWithFailures); blacklist.put(supervisor, _resumeTime / _nimbusMonitorFreqSecs); } @@ -103,6 +91,7 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy { return blacklist.keySet(); } + @Override public void resumeFromBlacklist() { Set<String> readyToRemove = new HashSet<String>(); for (Map.Entry<String, Integer> entry : blacklist.entrySet()) { @@ -116,11 +105,11 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy { } for (String key : readyToRemove) { blacklist.remove(key); - LOG.info("supervisor {} reach the resume time ,removed from blacklist", key); + LOG.info("Supervisor {} has been blacklisted more than resume period. Removed from blacklist.", key); } } - public void releaseBlacklistWhenNeeded(Cluster cluster, Topologies topologies) { + private void releaseBlacklistWhenNeeded(Cluster cluster, Topologies topologies) { if (blacklist.size() > 0) { int totalNeedNumWorkers = 0; List<TopologyDetails> needSchedulingTopologies = cluster.needsSchedulingTopologies(); @@ -145,12 +134,12 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy { , totalNeedNumWorkers, availableSlotsNotInBlacklistCount, blacklist.size()); //release earliest blacklist Set<String> readyToRemove = new HashSet<>(); - for (String supervisor : blacklist.keySet()) {//blacklist is treeMap sorted by value, value minimum meas earliest + for (String supervisor : blacklist.keySet()) { //blacklist is treeMap sorted by value, minimum value means earliest if (availableSupervisors.containsKey(supervisor)) { Set<Integer> ports = cluster.getAvailablePorts(availableSupervisors.get(supervisor)); readyToRemove.add(supervisor); shortage -= ports.size(); - if (shortage <= 0) {//released enough supervisor + if (shortage <= 0) { //released enough supervisor break; } } @@ -162,4 +151,19 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy { } } } + + private Object initializeInstance(String className, String representation) { + try { + return Class.forName(className).newInstance(); + } catch (ClassNotFoundException e) { + LOG.error("Can't find {} for name {}", representation, className); + throw new RuntimeException(e); + } catch (InstantiationException e) { + LOG.error("Throw InstantiationException {} for name {}", representation, className); + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + LOG.error("Throw IllegalAccessException {} for name {}", representation, className); + throw new RuntimeException(e); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java index df79b8c..a35a1d2 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java @@ -41,7 +41,7 @@ public interface IBlacklistStrategy { * the `cluster` object. * @return blacklisted supervisors' id set */ - Set<String> getBlacklist(List<HashMap<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow, Cluster cluster, Topologies topologies); + Set<String> getBlacklist(List<Map<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow, Cluster cluster, Topologies topologies); /** * resume supervisors form blacklist. Blacklist is just a temporary list for supervisors, http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java index 5b8bbe7..ece6357 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java @@ -30,17 +30,6 @@ import java.util.Map; public class FaultGenerateUtils { - public static List<Map<String, SupervisorDetails>> getSupervisorsList(int supervisorCount, int slotCount, int[][][] faults) { - List<Map<String, SupervisorDetails>> supervisorsList = new ArrayList<>(faults.length); - for (int[][] fault : faults) { - Map<String, SupervisorDetails> supervisors = TestUtilsForBlacklistScheduler.genSupervisors(supervisorCount, slotCount); - if (fault.length == 1 && fault[0][0] == -1) { - TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supervisors, "sup-0"); - } - } - return supervisorsList; - } - public static List<Map<String, SupervisorDetails>> getSupervisorsList(int supervisorCount, int slotCount, List<Map<Integer, List<Integer>>> faultList) { List<Map<String, SupervisorDetails>> supervisorsList = new ArrayList<>(faultList.size()); for (Map<Integer, List<Integer>> faults : faultList) { http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java index 049ef68..6cf8a0e 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java @@ -19,6 +19,7 @@ package org.apache.storm.scheduler.blacklist; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; import org.apache.storm.Config; import org.apache.storm.DaemonConfig; import org.apache.storm.scheduler.Cluster; @@ -34,6 +35,7 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -75,9 +77,7 @@ public class TestBlacklistScheduler { bs.schedule(topologies, cluster); cluster = new Cluster(iNimbus, supMap, TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config); bs.schedule(topologies, cluster); - Set<String> hosts = new HashSet<>(); - hosts.add("host-0"); - Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts()); + Assert.assertEquals("blacklist", Collections.singleton("host-0"), cluster.getBlacklistedHosts()); } @Test @@ -109,9 +109,7 @@ public class TestBlacklistScheduler { bs.schedule(topologies, cluster); cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config); bs.schedule(topologies, cluster); - Set<String> hosts = new HashSet<>(); - hosts.add("host-0"); - Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts()); + Assert.assertEquals("blacklist", Collections.singleton("host-0"), cluster.getBlacklistedHosts()); } @Test @@ -142,16 +140,13 @@ public class TestBlacklistScheduler { bs.schedule(topologies, cluster); cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config); bs.schedule(topologies, cluster); - Set<String> hosts = new HashSet<>(); - hosts.add("host-0"); - Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts()); + Assert.assertEquals("blacklist", Collections.singleton("host-0"), cluster.getBlacklistedHosts()); for (int i = 0; i < 300 / 10 - 2; i++) { bs.schedule(topologies, cluster); } - Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts()); + Assert.assertEquals("blacklist", Collections.singleton("host-0"), cluster.getBlacklistedHosts()); bs.schedule(topologies, cluster); - hosts.clear(); - Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts()); + Assert.assertEquals("blacklist", Collections.emptySet(), cluster.getBlacklistedHosts()); } @Test @@ -185,17 +180,14 @@ public class TestBlacklistScheduler { bs.schedule(topologies, cluster); cluster = new Cluster(iNimbus, supMap, TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config); bs.schedule(topologies, cluster); - Set<String> hosts = new HashSet<>(); - hosts.add("host-0"); - Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts()); + Assert.assertEquals("blacklist", Collections.singleton("host-0"), cluster.getBlacklistedHosts()); topoMap.put(topo2.getId(), topo2); topoMap.put(topo3.getId(), topo3); topoMap.put(topo4.getId(), topo4); topologies = new Topologies(topoMap); cluster = new Cluster(iNimbus, supMap, TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config); bs.schedule(topologies, cluster); - hosts.clear(); - Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts()); + Assert.assertEquals("blacklist", Collections.emptySet(), cluster.getBlacklistedHosts()); } @Test @@ -218,21 +210,21 @@ public class TestBlacklistScheduler { List<Map<Integer, List<Integer>>> faultList = new ArrayList<>(); - faultList.add(new HashMap<Integer, List<Integer>>()); - faultList.add((Map) ImmutableMap.of(0, ImmutableList.of(0, 1))); - faultList.add((Map) ImmutableMap.of(0, new ArrayList<>())); + faultList.add(new HashMap<>()); + faultList.add(ImmutableMap.of(0, ImmutableList.of(0, 1))); + faultList.add(ImmutableMap.of(0, new ArrayList<>())); for (int i = 0; i < 17; i++) { - faultList.add(new HashMap<Integer, List<Integer>>()); + faultList.add(new HashMap<>()); } - faultList.add((Map) ImmutableMap.of(0, ImmutableList.of(0, 1))); - faultList.add((Map) ImmutableMap.of(1, ImmutableList.of(1))); + faultList.add(ImmutableMap.of(0, ImmutableList.of(0, 1))); + faultList.add(ImmutableMap.of(1, ImmutableList.of(1))); for (int i = 0; i < 8; i++) { - faultList.add(new HashMap<Integer, List<Integer>>()); + faultList.add(new HashMap<>()); } - faultList.add((Map) ImmutableMap.of(0, ImmutableList.of(1))); - faultList.add((Map) ImmutableMap.of(1, ImmutableList.of(1))); + faultList.add(ImmutableMap.of(0, ImmutableList.of(1))); + faultList.add(ImmutableMap.of(1, ImmutableList.of(1))); for (int i = 0; i < 30; i++) { - faultList.add(new HashMap<Integer, List<Integer>>()); + faultList.add(new HashMap<>()); } List<Map<String, SupervisorDetails>> supervisorsList = FaultGenerateUtils.getSupervisorsList(3, 4, faultList); @@ -285,9 +277,9 @@ public class TestBlacklistScheduler { @Test public void removeLongTimeDisappearFromCache(){ - INimbus iNimbus=new TestUtilsForBlacklistScheduler.INimbusTest(); + INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest(); - Map<String, SupervisorDetails> supMap=TestUtilsForBlacklistScheduler.genSupervisors(3,4); + Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3,4); Config config = new Config(); config.putAll(Utils.readDefaultConfig()); @@ -301,27 +293,24 @@ public class TestBlacklistScheduler { Topologies topologies = new Topologies(topoMap); Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config); - BlacklistScheduler bs=new BlacklistScheduler(new DefaultScheduler()); + BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler()); bs.prepare(config); bs.schedule(topologies,cluster); cluster = new Cluster(iNimbus, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap,"sup-0"),TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config); - for(int i=0;i<20;i++){ + for (int i = 0 ; i < 20 ; i++){ bs.schedule(topologies,cluster); } - Set<String> cached=new HashSet<>(); + Set<String> cached = new HashSet<>(); cached.add("sup-1"); cached.add("sup-2"); Assert.assertEquals(cached,bs.cachedSupervisors.keySet()); cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config); bs.schedule(topologies,cluster); cluster = new Cluster(iNimbus, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap,"sup-0",0),TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config); - for(int i=0;i<20;i++){ - bs.schedule(topologies,cluster); + for (int i = 0 ;i < 20 ; i++){ + bs.schedule(topologies, cluster); } - Set<Integer> cachedPorts=new HashSet<>(); - cachedPorts.add(1); - cachedPorts.add(2); - cachedPorts.add(3); - Assert.assertEquals(cachedPorts,bs.cachedSupervisors.get("sup-0")); + Set<Integer> cachedPorts = Sets.newHashSet(1, 2, 3); + Assert.assertEquals(cachedPorts, bs.cachedSupervisors.get("sup-0")); } }
