STORM-2083 Blacklist scheduler * addressed review comments from @revans2 * also fixed failing test
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/74cc7e2d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/74cc7e2d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/74cc7e2d Branch: refs/heads/master Commit: 74cc7e2d8283dffbb2cd6f97e4b3d410f87c40bb Parents: a7b86c3 Author: Jungtaek Lim <[email protected]> Authored: Wed Sep 27 10:18:15 2017 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Wed Sep 27 14:09:31 2017 +0900 ---------------------------------------------------------------------- .../test/clj/org/apache/storm/nimbus_test.clj | 4 +- storm-server/pom.xml | 2 +- .../java/org/apache/storm/DaemonConfig.java | 50 +++++++----- .../org/apache/storm/daemon/nimbus/Nimbus.java | 85 +++++++++++--------- .../scheduler/blacklist/BlacklistScheduler.java | 70 +++++++++------- .../apache/storm/scheduler/blacklist/Sets.java | 50 +++++++++++- .../blacklist/reporters/IReporter.java | 3 +- .../blacklist/reporters/LogReporter.java | 7 +- .../strategies/DefaultBlacklistStrategy.java | 45 ++++++----- .../strategies/IBlacklistStrategy.java | 11 +-- 10 files changed, 208 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-core/test/clj/org/apache/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj index 38b5da0..1f45f9b 100644 --- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj +++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj @@ -1619,13 +1619,12 @@ {STORM-ZOOKEEPER-AUTH-SCHEME scheme STORM-ZOOKEEPER-AUTH-PAYLOAD digest STORM-PRINCIPAL-TO-LOCAL-PLUGIN "org.apache.storm.security.auth.DefaultPrincipalToLocal" + NIMBUS-MONITOR-FREQ-SECS 10 NIMBUS-THRIFT-PORT 6666}) expected-acls Nimbus/ZK_ACLS fake-inimbus (reify INimbus (getForcedScheduler [this] nil) (prepare [this conf dir] nil)) fake-cu (proxy [ServerConfigUtils] [] (nimbusTopoHistoryStateImpl [conf] nil)) - fake-ru (proxy [ReflectionUtils] [] - (newInstanceImpl [_])) fake-utils (proxy [Utils] [] (makeUptimeComputer [] (proxy [Utils$UptimeComputer] [] (upTime [] 0)))) @@ -1633,7 +1632,6 @@ fake-common (proxy [StormCommon] [] (mkAuthorizationHandler [_] nil))] (with-open [_ (ServerConfigUtilsInstaller. fake-cu) - _ (ReflectionUtilsInstaller. fake-ru) _ (UtilsInstaller. fake-utils) - (StormCommonInstaller. fake-common) zk-le (MockedZookeeper. (proxy [Zookeeper] [] http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/pom.xml ---------------------------------------------------------------------- diff --git a/storm-server/pom.xml b/storm-server/pom.xml index b57151c..7449257 100644 --- a/storm-server/pom.xml +++ b/storm-server/pom.xml @@ -136,7 +136,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>4000</maxAllowedViolations> + <maxAllowedViolations>3590</maxAllowedViolations> </configuration> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/DaemonConfig.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index c5ee27a..00674c7 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -18,15 +18,25 @@ package org.apache.storm; -import org.apache.storm.scheduler.blacklist.reporters.IReporter; -import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy; -import org.apache.storm.container.ResourceIsolationInterface; -import org.apache.storm.nimbus.ITopologyActionNotifierPlugin; -import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy; -import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy; -import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy; +import static org.apache.storm.validation.ConfigValidationAnnotations.isInteger; +import static org.apache.storm.validation.ConfigValidationAnnotations.isString; +import static org.apache.storm.validation.ConfigValidationAnnotations.isStringList; +import static org.apache.storm.validation.ConfigValidationAnnotations.isStringOrStringList; +import static org.apache.storm.validation.ConfigValidationAnnotations.isPositiveNumber; +import static org.apache.storm.validation.ConfigValidationAnnotations.isType; +import static org.apache.storm.validation.ConfigValidationAnnotations.NotNull; +import static org.apache.storm.validation.ConfigValidationAnnotations.isListEntryCustom; +import static org.apache.storm.validation.ConfigValidationAnnotations.isBoolean; +import static org.apache.storm.validation.ConfigValidationAnnotations.isNumber; +import static org.apache.storm.validation.ConfigValidationAnnotations.isImplementationOfClass; +import static org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryType; +import static org.apache.storm.validation.ConfigValidationAnnotations.isNoDuplicateInList; +import static org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryCustom; + import org.apache.storm.container.ResourceIsolationInterface; import org.apache.storm.nimbus.ITopologyActionNotifierPlugin; +import org.apache.storm.scheduler.blacklist.reporters.IReporter; +import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy; import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy; import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy; import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy; @@ -36,8 +46,6 @@ import org.apache.storm.validation.Validated; import java.util.ArrayList; import java.util.Map; -import static org.apache.storm.validation.ConfigValidationAnnotations.*; - /** * Storm configs are specified as a plain old map. This class provides constants for * all the configurations possible on a Storm cluster. Each constant is paired with an annotation @@ -110,32 +118,32 @@ public class DaemonConfig implements Validated { public static final String STORM_SCHEDULER = "storm.scheduler"; /** - * The number of seconds that the blacklist scheduler will concern of bad slots or supervisors + * The number of seconds that the blacklist scheduler will concern of bad slots or supervisors. */ - @isInteger + @isPositiveNumber public static final String BLACKLIST_SCHEDULER_TOLERANCE_TIME = "blacklist.scheduler.tolerance.time.secs"; /** - * The number of hit count that will trigger blacklist in tolerance time + * The number of hit count that will trigger blacklist in tolerance time. */ - @isInteger + @isPositiveNumber public static final String BLACKLIST_SCHEDULER_TOLERANCE_COUNT = "blacklist.scheduler.tolerance.count"; /** - * The number of seconds that the blacklisted slots or supervisor will be resumed + * The number of seconds that the blacklisted slots or supervisor will be resumed. */ - @isInteger + @isPositiveNumber public static final String BLACKLIST_SCHEDULER_RESUME_TIME = "blacklist.scheduler.resume.time.secs"; /** - * The class that the blacklist scheduler will report the blacklist + * The class that the blacklist scheduler will report the blacklist. */ @NotNull @isImplementationOfClass(implementsClass = IReporter.class) public static final String BLACKLIST_SCHEDULER_REPORTER = "blacklist.scheduler.reporter"; /** - * The class that specifies the eviction strategy to use in blacklist scheduler + * The class that specifies the eviction strategy to use in blacklist scheduler. */ @NotNull @isImplementationOfClass(implementsClass = IBlacklistStrategy.class) @@ -144,7 +152,7 @@ public class DaemonConfig implements Validated { /** * Whether we want to display all the resource capacity and scheduled usage on the UI page. * You MUST have this variable set if you are using any kind of resource-related scheduler. - * + * <p/> * If this is not set, we will not display resource capacity and usage on the UI. */ @isBoolean @@ -155,7 +163,7 @@ public class DaemonConfig implements Validated { * Provides a way for a @link{STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN} * implementation to access optional settings. */ - @isType(type=Map.class) + @isType(type = Map.class) public static final String STORM_GROUP_MAPPING_SERVICE_PARAMS = "storm.group.mapping.service.params"; /** @@ -873,7 +881,9 @@ public class DaemonConfig implements Validated { * A map of users to another map of the resource guarantees of the user. Used by Resource Aware Scheduler to ensure * per user resource guarantees. */ - @isMapEntryCustom(keyValidatorClasses = {ConfigValidation.StringValidator.class}, valueValidatorClasses = {ConfigValidation.UserResourcePoolEntryValidator.class}) + @isMapEntryCustom( + keyValidatorClasses = {ConfigValidation.StringValidator.class}, + valueValidatorClasses = {ConfigValidation.UserResourcePoolEntryValidator.class}) public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS = "resource.aware.scheduler.user.pools"; /** http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index 7331906..e54aaeb 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -134,6 +134,7 @@ import org.apache.storm.nimbus.ITopologyActionNotifierPlugin; import org.apache.storm.nimbus.ITopologyValidator; import org.apache.storm.nimbus.NimbusInfo; import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.Cluster.SupervisorResources; import org.apache.storm.scheduler.DefaultScheduler; import org.apache.storm.scheduler.ExecutorDetails; import org.apache.storm.scheduler.INimbus; @@ -144,10 +145,9 @@ import org.apache.storm.scheduler.SupervisorDetails; import org.apache.storm.scheduler.Topologies; import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.blacklist.BlacklistScheduler; import org.apache.storm.scheduler.multitenant.MultitenantScheduler; import org.apache.storm.scheduler.resource.ResourceAwareScheduler; -import org.apache.storm.scheduler.Cluster.SupervisorResources; -import org.apache.storm.scheduler.blacklist.BlacklistScheduler; import org.apache.storm.scheduler.resource.ResourceUtils; import org.apache.storm.security.INimbusCredentialPlugin; import org.apache.storm.security.auth.AuthUtils; @@ -184,7 +184,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Nimbus implements Iface, Shutdownable, DaemonCommon { - private final static Logger LOG = LoggerFactory.getLogger(Nimbus.class); + private static final Logger LOG = LoggerFactory.getLogger(Nimbus.class); // Metrics private static final Meter submitTopologyWithOptsCalls = StormMetricsRegistry.registerMeter("nimbus:num-submitTopologyWithOpts-calls"); @@ -196,7 +196,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private static final Meter deactivateCalls = StormMetricsRegistry.registerMeter("nimbus:num-deactivate-calls"); private static final Meter debugCalls = StormMetricsRegistry.registerMeter("nimbus:num-debug-calls"); private static final Meter setWorkerProfilerCalls = StormMetricsRegistry.registerMeter("nimbus:num-setWorkerProfiler-calls"); - private static final Meter getComponentPendingProfileActionsCalls = StormMetricsRegistry.registerMeter("nimbus:num-getComponentPendingProfileActions-calls"); + private static final Meter getComponentPendingProfileActionsCalls = StormMetricsRegistry.registerMeter( + "nimbus:num-getComponentPendingProfileActions-calls"); private static final Meter setLogConfigCalls = StormMetricsRegistry.registerMeter("nimbus:num-setLogConfig-calls"); private static final Meter uploadNewCredentialsCalls = StormMetricsRegistry.registerMeter("nimbus:num-uploadNewCredentials-calls"); private static final Meter beginFileUploadCalls = StormMetricsRegistry.registerMeter("nimbus:num-beginFileUpload-calls"); @@ -212,21 +213,27 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private static final Meter getClusterInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getClusterInfo-calls"); private static final Meter getLeaderCalls = StormMetricsRegistry.registerMeter("nimbus:num-getLeader-calls"); private static final Meter isTopologyNameAllowedCalls = StormMetricsRegistry.registerMeter("nimbus:num-isTopologyNameAllowed-calls"); - private static final Meter getTopologyInfoWithOptsCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyInfoWithOpts-calls"); + private static final Meter getTopologyInfoWithOptsCalls = StormMetricsRegistry.registerMeter( + "nimbus:num-getTopologyInfoWithOpts-calls"); private static final Meter getTopologyInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyInfo-calls"); private static final Meter getTopologyPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyPageInfo-calls"); private static final Meter getSupervisorPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getSupervisorPageInfo-calls"); private static final Meter getComponentPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getComponentPageInfo-calls"); - private static final Meter getOwnerResourceSummariesCalls = StormMetricsRegistry.registerMeter("nimbus:num-getOwnerResourceSummaries-calls"); - private static final Histogram scheduleTopologyTimeMs = StormMetricsRegistry.registerHistogram("nimbus:time-scheduleTopology-ms", new ExponentiallyDecayingReservoir()); + private static final Histogram scheduleTopologyTimeMs = StormMetricsRegistry.registerHistogram("nimbus:time-scheduleTopology-ms", + new ExponentiallyDecayingReservoir()); + private static final Meter getOwnerResourceSummariesCalls = StormMetricsRegistry.registerMeter( + "nimbus:num-getOwnerResourceSummaries-calls"); private static final Meter shutdownCalls = StormMetricsRegistry.registerMeter("nimbus:num-shutdown-calls"); // END Metrics private static final String STORM_VERSION = VersionInfo.getVersion(); + @VisibleForTesting public static final List<ACL> ZK_ACLS = Arrays.asList(ZooDefs.Ids.CREATOR_ALL_ACL.get(0), new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE, ZooDefs.Ids.ANYONE_ID_UNSAFE)); + private static final Subject NIMBUS_SUBJECT = new Subject(); + static { NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal()); NIMBUS_SUBJECT.setReadOnly(); @@ -442,13 +449,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { @SuppressWarnings("deprecation") private static <T extends AutoCloseable> TimeCacheMap<String, T> fileCacheMap(Map<String, Object> conf) { return new TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_FILE_COPY_EXPIRATION_SECS), 600), - (id, stream) -> { - try { - stream.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + (id, stream) -> { + try { + stream.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } private static <K, V> Map<K, V> mapDiff(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> second) { @@ -488,26 +495,26 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { @SuppressWarnings("deprecation") private static <T extends AutoCloseable> TimeCacheMap<String, T> makeBlobCacheMap(Map<String, Object> conf) { return new TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_BLOBSTORE_EXPIRATION_SECS), 600), - (id, stream) -> { - try { - if (stream instanceof AtomicOutputStream) { - ((AtomicOutputStream) stream).cancel(); - } else { - stream.close(); - } - } catch (Exception e) { - throw new RuntimeException(e); + (id, stream) -> { + try { + if (stream instanceof AtomicOutputStream) { + ((AtomicOutputStream) stream).cancel(); + } else { + stream.close(); } - }); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } /** * Constructs a TimeCacheMap instance with a blobstore timeout and no callback function. - * @param conf - * @return + * @param conf the config to use + * @return the newly created TimeCacheMap */ @SuppressWarnings("deprecation") - private static TimeCacheMap<String, Iterator<String>> makeBlobListCachMap(Map<String, Object> conf) { + private static TimeCacheMap<String, Iterator<String>> makeBlobListCacheMap(Map<String, Object> conf) { return new TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_BLOBSTORE_EXPIRATION_SECS), 600)); } @@ -528,7 +535,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { @SuppressWarnings("unchecked") private static List<ClusterMetricsConsumerExecutor> makeClusterMetricsConsumerExecutors(Map<String, Object> conf) { - Collection<Map<String, Object>> consumers = (Collection<Map<String, Object>>) conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_REGISTER); + Collection<Map<String, Object>> consumers = (Collection<Map<String, Object>>) conf.get( + DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_REGISTER); List<ClusterMetricsConsumerExecutor> ret = new ArrayList<>(); if (consumers != null) { for (Map<String, Object> consumer : consumers) { @@ -561,15 +569,18 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { return kseq.getKeySequenceNumber(conf); } - private static StormTopology readStormTopology(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException { + private static StormTopology readStormTopology(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, + IOException { return tc.readTopology(topoId, getSubject()); } - private static Map<String, Object> readTopoConfAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException { + private static Map<String, Object> readTopoConfAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException, + AuthorizationException, IOException { return tc.readTopoConf(topoId, NIMBUS_SUBJECT); } - private static StormTopology readStormTopologyAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException { + private static StormTopology readStormTopologyAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException, + AuthorizationException, IOException { return tc.readTopology(topoId, NIMBUS_SUBJECT); } @@ -624,8 +635,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { return ret; } - private static Map<String, Map<List<Long>, List<Object>>> computeNewTopoToExecToNodePort(Map<String, SchedulerAssignment> schedAssignments, - Map<String, Assignment> existingAssignments) { + private static Map<String, Map<List<Long>, List<Object>>> computeNewTopoToExecToNodePort( + Map<String, SchedulerAssignment> schedAssignments, Map<String, Assignment> existingAssignments) { Map<String, Map<List<Long>, List<Object>>> ret = computeTopoToExecToNodePort(schedAssignments); // Print some useful information if (existingAssignments != null && !existingAssignments.isEmpty()) { @@ -672,7 +683,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { value.sort((a, b) -> a.get(0).compareTo(b.get(0))); slotAssigned.put(key, value); } - HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap<>() : Utils.reverseMap(newExecToNodePort); + HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap<>() : + Utils.reverseMap(newExecToNodePort); HashMap<List<Object>, List<List<Long>>> newSlotAssigned = new HashMap<>(); for (Entry<List<Object>, List<List<Long>>> entry: tmpNewSlotAssigned.entrySet()) { List<List<Long>> value = new ArrayList<>(entry.getValue()); @@ -713,7 +725,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { return state.getTopoId(topoName).isPresent(); } - private static Map<String, Object> tryReadTopoConf(String topoId, TopoCache tc) throws NotAliveException, AuthorizationException, IOException { + private static Map<String, Object> tryReadTopoConf(String topoId, TopoCache tc) throws NotAliveException, AuthorizationException, + IOException { try { return readTopoConfAsNimbus(topoId, tc); //Was a try-cause but I looked at the code around this and key not found is not wrapped in runtime, @@ -1090,7 +1103,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { this.topoCache = topoCache; this.blobDownloaders = makeBlobCacheMap(conf); this.blobUploaders = makeBlobCacheMap(conf); - this.blobListers = makeBlobListCachMap(conf); + this.blobListers = makeBlobListCacheMap(conf); this.uptime = Utils.makeUptimeComputer(); this.validator = ReflectionUtils.newInstance((String) conf.getOrDefault(DaemonConfig.NIMBUS_TOPOLOGY_VALIDATOR, DefaultTopologyValidator.class.getName())); this.timer = new StormTimer(null, (t, e) -> { http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java index a05d814..8083e01 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java @@ -15,9 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.scheduler.blacklist; import com.google.common.collect.EvictingQueue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + import org.apache.storm.DaemonConfig; import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.scheduler.Cluster; @@ -30,15 +39,10 @@ import org.apache.storm.scheduler.blacklist.reporters.LogReporter; import org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy; import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy; import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; public class BlacklistScheduler implements IScheduler { private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class); @@ -49,7 +53,7 @@ public class BlacklistScheduler implements IScheduler { private final IScheduler underlyingScheduler; @SuppressWarnings("rawtypes") - private Map _conf; + private Map conf; protected int toleranceTime; protected int toleranceCount; @@ -74,22 +78,25 @@ public class BlacklistScheduler implements IScheduler { public void prepare(Map conf) { LOG.info("Preparing black list scheduler"); underlyingScheduler.prepare(conf); - _conf = conf; + this.conf = conf; - toleranceTime = ObjectReader.getInt(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME); - toleranceCount = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT); - resumeTime = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME); + toleranceTime = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME), + DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME); + toleranceCount = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), + DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT); + resumeTime = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), + DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME); - String reporterClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER), + String reporterClassName = ObjectReader.getString(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER), LogReporter.class.getName()); reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter"); - String strategyClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY), + String strategyClassName = ObjectReader.getString(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY), DefaultBlacklistStrategy.class.getName()); blacklistStrategy = (IBlacklistStrategy) initializeInstance(strategyClassName, "blacklist strategy"); - nimbusMonitorFreqSecs = ObjectReader.getInt( _conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)); - blacklistStrategy.prepare(_conf); + nimbusMonitorFreqSecs = ObjectReader.getInt(this.conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)); + blacklistStrategy.prepare(this.conf); windowSize = toleranceTime / nimbusMonitorFreqSecs; badSupervisorsToleranceSlidingWindow = EvictingQueue.create(windowSize); @@ -108,11 +115,11 @@ public class BlacklistScheduler implements IScheduler { @Override public void schedule(Topologies topologies, Cluster cluster) { LOG.debug("running Black List scheduler"); - Map<String, SupervisorDetails> supervisors = cluster.getSupervisors(); LOG.debug("AssignableSlots: {}", cluster.getAssignableSlots()); LOG.debug("AvailableSlots: {}", cluster.getAvailableSlots()); LOG.debug("UsedSlots: {}", cluster.getUsedSlots()); + Map<String, SupervisorDetails> supervisors = cluster.getSupervisors(); blacklistStrategy.resumeFromBlacklist(); badSupervisors(supervisors); Set<String> blacklistHosts = getBlacklistHosts(cluster, topologies); @@ -169,7 +176,8 @@ public class BlacklistScheduler implements IScheduler { } private Set<String> getBlacklistHosts(Cluster cluster, Topologies topologies) { - Set<String> blacklistSet = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow), cluster, topologies); + Set<String> blacklistSet = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow), + cluster, topologies); Set<String> blacklistHostSet = new HashSet<>(); for (String supervisor : blacklistSet) { String host = cluster.getHost(supervisor); @@ -226,23 +234,29 @@ public class BlacklistScheduler implements IScheduler { slots.remove(slot); cachedSupervisors.put(supervisorKey, slots); } - LOG.info("Worker slot {} was never back to normal during tolerance period, probably dead. Will be removed from cache.", workerSlot); + LOG.info("Worker slot {} was never back to normal during tolerance period, probably dead. Will be removed from cache.", + workerSlot); } } } private Object initializeInstance(String className, String representation) { try { - return Class.forName(className).newInstance(); - } catch (ClassNotFoundException e) { - LOG.error("Can't find {} for name {}", representation, className); - throw new RuntimeException(e); - } catch (InstantiationException e) { - LOG.error("Throw InstantiationException {} for name {}", representation, className); - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - LOG.error("Throw IllegalAccessException {} for name {}", representation, className); - throw new RuntimeException(e); + return ReflectionUtils.newInstance(className); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + + if (cause instanceof ClassNotFoundException) { + LOG.error("Can't find {} for name {}", representation, className); + } else if (cause instanceof InstantiationException) { + LOG.error("Throw InstantiationException {} for name {}", representation, className); + } else if (cause instanceof IllegalAccessException) { + LOG.error("Throw IllegalAccessException {} for name {}", representation, className); + } else { + LOG.error("Throw unexpected exception {} {} for name {}", cause, representation, className); + } + + throw e; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java index 93c38cb..57344e6 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.scheduler.blacklist; import java.util.HashSet; @@ -22,35 +23,82 @@ import java.util.Set; public class Sets { + /** + * Calculate union of both sets. + * + * @param setA parameter 1 + * @param setB parameter 2 + * @param <T> generic type of Set elements. + * @return the Set which is union of both Sets. + */ public static <T> Set<T> union(Set<T> setA, Set<T> setB) { Set<T> result = new HashSet<T>(setA); result.addAll(setB); return result; } + /** + * Calculate intersection of both sets. + * + * @param setA parameter 1 + * @param setB parameter 2 + * @param <T> generic type of Set elements. + * @return the Set which is intersection of both Sets. + */ public static <T> Set<T> intersection(Set<T> setA, Set<T> setB) { Set<T> result = new HashSet<T>(setA); result.retainAll(setB); return result; } + /** + * Calculate difference of difference of two sets. + * + * @param setA parameter 1 + * @param setB parameter 2 + * @param <T> generic type of Set elements. + * @return the Set which is difference of two sets. + */ public static <T> Set<T> difference(Set<T> setA, Set<T> setB) { Set<T> result = new HashSet<T>(setA); result.removeAll(setB); return result; } + /** + * Calculate symmetric difference of two sets. + * + * @param setA parameter 1 + * @param setB parameter 2 + * @param <T> generic type of Set elements. + * @return the Set which is symmetric difference of two sets. + */ public static <T> Set<T> symDifference(Set<T> setA, Set<T> setB) { Set<T> union = union(setA, setB); Set<T> intersection = intersection(setA, setB); return difference(union, intersection); } + /** + * Check whether a set is a subset of another set. + * + * @param setA parameter 1 + * @param setB parameter 2 + * @param <T> generic type of Set elements. + * @return true when setB is a subset of setA, false otherwise. + */ public static <T> boolean isSubset(Set<T> setA, Set<T> setB) { return setB.containsAll(setA); } - + /** + * Check whether a set is a superset of another set. + * + * @param setA parameter 1 + * @param setB parameter 2 + * @param <T> generic type of Set elements. + * @return true when setA is a superset of setB, false otherwise. + */ public static <T> boolean isSuperset(Set<T> setA, Set<T> setB) { return setA.containsAll(setB); } http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java index 781c37a..153829c 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.scheduler.blacklist.reporters; import java.util.List; @@ -22,7 +23,7 @@ import java.util.Map; import java.util.Set; /** - * report blacklist to alert system + * report blacklist to alert system. */ public interface IReporter { void report(String message); http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java index 94cfebd..3255c9d 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java @@ -15,15 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.scheduler.blacklist.reporters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +package org.apache.storm.scheduler.blacklist.reporters; import java.util.List; import java.util.Map; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class LogReporter implements IReporter { private static Logger LOG = LoggerFactory.getLogger(LogReporter.class); http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java index cc7f403..00cf25a 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java @@ -15,8 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.scheduler.blacklist.strategies; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + import org.apache.storm.DaemonConfig; import org.apache.storm.scheduler.Cluster; import org.apache.storm.scheduler.SupervisorDetails; @@ -29,13 +37,6 @@ import org.apache.storm.utils.ObjectReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; - public class DefaultBlacklistStrategy implements IBlacklistStrategy { private static Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class); @@ -43,24 +44,25 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy { public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800; public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3; - private IReporter _reporter; + private IReporter reporter; - private int _toleranceCount; - private int _resumeTime; - private int _nimbusMonitorFreqSecs; + private int toleranceCount; + private int resumeTime; + private int nimbusMonitorFreqSecs; private TreeMap<String, Integer> blacklist; @Override - public void prepare(Map conf){ - _toleranceCount = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT); - _resumeTime = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME); + public void prepare(Map conf) { + toleranceCount = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), + DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT); + resumeTime = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME); String reporterClassName = ObjectReader.getString(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER), LogReporter.class.getName()); - _reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter"); + reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter"); - _nimbusMonitorFreqSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)); + nimbusMonitorFreqSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)); blacklist = new TreeMap<>(); } @@ -78,12 +80,12 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy { for (Map.Entry<String, Integer> entry : countMap.entrySet()) { String supervisor = entry.getKey(); int count = entry.getValue(); - if (count >= _toleranceCount) { + if (count >= toleranceCount) { if (!blacklist.containsKey(supervisor)) { // if not in blacklist then add it and set the resume time according to config LOG.debug("add supervisor {} to blacklist", supervisor); LOG.debug("supervisorsWithFailures : {}", supervisorsWithFailures); - _reporter.reportBlacklist(supervisor, supervisorsWithFailures); - blacklist.put(supervisor, _resumeTime / _nimbusMonitorFreqSecs); + reporter.reportBlacklist(supervisor, supervisorsWithFailures); + blacklist.put(supervisor, resumeTime / nimbusMonitorFreqSecs); } } } @@ -130,8 +132,9 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy { int shortage = totalNeedNumWorkers - availableSlotsNotInBlacklistCount; if (shortage > 0) { - LOG.info("total needed num of workers :{}, available num of slots not in blacklist :{},num blacklist :{}, will release some blacklist." - , totalNeedNumWorkers, availableSlotsNotInBlacklistCount, blacklist.size()); + LOG.info("total needed num of workers :{}, available num of slots not in blacklist :{}, num blacklist :{}, " + + "will release some blacklist.", totalNeedNumWorkers, availableSlotsNotInBlacklistCount, blacklist.size()); + //release earliest blacklist Set<String> readyToRemove = new HashSet<>(); for (String supervisor : blacklist.keySet()) { //blacklist is treeMap sorted by value, minimum value means earliest http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java index a35a1d2..f050006 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java @@ -15,22 +15,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.scheduler.blacklist.strategies; -import org.apache.storm.scheduler.Cluster; -import org.apache.storm.scheduler.Topologies; +package org.apache.storm.scheduler.blacklist.strategies; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.Topologies; + public interface IBlacklistStrategy { void prepare(Map conf); /** - * Get blacklist by blacklist strategy + * Get blacklist by blacklist strategy. + * * @param badSupervisorsToleranceSlidingWindow bad supervisors buffered in sliding window * @param cluster the cluster these topologies are running in. `cluster` contains everything user * need to develop a new scheduling logic. e.g. supervisors information, available slots, current
