Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2343#discussion_r141188696
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
 ---
    @@ -0,0 +1,248 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.scheduler.blacklist;
    +
    +import com.google.common.collect.EvictingQueue;
    +import org.apache.storm.DaemonConfig;
    +import org.apache.storm.metric.StormMetricsRegistry;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.IScheduler;
    +import org.apache.storm.scheduler.SupervisorDetails;
    +import org.apache.storm.scheduler.Topologies;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.blacklist.reporters.IReporter;
    +import org.apache.storm.scheduler.blacklist.reporters.LogReporter;
    +import 
org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy;
    +import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
    +import org.apache.storm.utils.ObjectReader;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +
    +public class BlacklistScheduler implements IScheduler {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(BlacklistScheduler.class);
    +
    +    public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
    +    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 
3;
    +    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME = 
300;
    +
    +    private final IScheduler underlyingScheduler;
    +    @SuppressWarnings("rawtypes")
    +    private Map _conf;
    +
    +    protected int toleranceTime;
    +    protected int toleranceCount;
    +    protected int resumeTime;
    +    protected IReporter reporter;
    +    protected IBlacklistStrategy blacklistStrategy;
    +
    +    protected int nimbusMonitorFreqSecs;
    +
    +    protected Map<String, Set<Integer>> cachedSupervisors;
    +
    +    //key is supervisor key ,value is supervisor ports
    +    protected EvictingQueue<HashMap<String, Set<Integer>>> 
badSupervisorsToleranceSlidingWindow;
    +    protected int windowSize;
    +    protected Set<String> blacklistHost;
    +
    +    public BlacklistScheduler(IScheduler underlyingScheduler) {
    +        this.underlyingScheduler = underlyingScheduler;
    +    }
    +
    +    @Override
    +    public void prepare(Map conf) {
    +        LOG.info("Preparing black list scheduler");
    +        underlyingScheduler.prepare(conf);
    +        _conf = conf;
    +
    +        toleranceTime = 
ObjectReader.getInt(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME), 
DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME);
    +        toleranceCount = ObjectReader.getInt( 
_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), 
DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
    +        resumeTime = ObjectReader.getInt( 
_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), 
DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
    +
    +        String reporterClassName = 
ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
    +                LogReporter.class.getName());
    +        reporter = (IReporter) initializeInstance(reporterClassName, 
"blacklist reporter");
    +
    +        String strategyClassName = 
ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY),
    +                DefaultBlacklistStrategy.class.getName());
    +        blacklistStrategy = (IBlacklistStrategy) 
initializeInstance(strategyClassName, "blacklist strategy");
    +
    +        nimbusMonitorFreqSecs = ObjectReader.getInt( 
_conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
    +        blacklistStrategy.prepare(_conf);
    +
    +        windowSize = toleranceTime / nimbusMonitorFreqSecs;
    +        badSupervisorsToleranceSlidingWindow = 
EvictingQueue.create(windowSize);
    +        cachedSupervisors = new HashMap<>();
    +        blacklistHost = new HashSet<>();
    +
    +        
StormMetricsRegistry.registerGauge("nimbus:num-blacklisted-supervisor", new 
Callable<Integer>() {
    +            @Override
    +            public Integer call() throws Exception {
    +                //nimbus:num-blacklisted-supervisor + none blacklisted 
supervisor = nimbus:num-supervisors
    +                return blacklistHost.size();
    +            }
    +        });
    +    }
    +
    +    @Override
    +    public void schedule(Topologies topologies, Cluster cluster) {
    +        LOG.debug("running Black List scheduler");
    +        Map<String, SupervisorDetails> supervisors = 
cluster.getSupervisors();
    +        LOG.debug("AssignableSlots: {}", cluster.getAssignableSlots());
    +        LOG.debug("AvailableSlots: {}", cluster.getAvailableSlots());
    +        LOG.debug("UsedSlots: {}", cluster.getUsedSlots());
    +
    +        blacklistStrategy.resumeFromBlacklist();
    +        badSupervisors(supervisors);
    +        Set<String> blacklistHosts = getBlacklistHosts(cluster, 
topologies);
    +        this.blacklistHost = blacklistHosts;
    +        cluster.setBlacklistedHosts(blacklistHosts);
    +        removeLongTimeDisappearFromCache();
    +
    +        underlyingScheduler.schedule(topologies, cluster);
    +    }
    +
    +    @Override
    +    public Map<String, Object> config() {
    +        return underlyingScheduler.config();
    +    }
    +
    +    private void badSupervisors(Map<String, SupervisorDetails> 
supervisors) {
    +        Set<String> cachedSupervisorsKeySet = cachedSupervisors.keySet();
    +        Set<String> supervisorsKeySet = supervisors.keySet();
    +
    +        Set<String> badSupervisorKeys = 
Sets.difference(cachedSupervisorsKeySet, supervisorsKeySet); //cached 
supervisor doesn't show up
    +        HashMap<String, Set<Integer>> badSupervisors = new HashMap<>();
    +        for (String key : badSupervisorKeys) {
    +            badSupervisors.put(key, cachedSupervisors.get(key));
    +        }
    +
    +        for (Map.Entry<String, SupervisorDetails> entry : 
supervisors.entrySet()) {
    +            String key = entry.getKey();
    +            SupervisorDetails supervisorDetails = entry.getValue();
    +            if (cachedSupervisors.containsKey(key)) {
    +                Set<Integer> badSlots = badSlots(supervisorDetails, key);
    +                if (badSlots.size() > 0) { //supervisor contains bad slots
    +                    badSupervisors.put(key, badSlots);
    +                }
    +            } else {
    +                cachedSupervisors.put(key, 
supervisorDetails.getAllPorts()); //new supervisor to cache
    +            }
    +        }
    +
    +        badSupervisorsToleranceSlidingWindow.add(badSupervisors);
    +    }
    +
    +    private Set<Integer> badSlots(SupervisorDetails supervisor, String 
supervisorKey) {
    +        Set<Integer> cachedSupervisorPorts = 
cachedSupervisors.get(supervisorKey);
    +        Set<Integer> supervisorPorts = supervisor.getAllPorts();
    +
    +        Set<Integer> newPorts = Sets.difference(supervisorPorts, 
cachedSupervisorPorts);
    +        if (newPorts.size() > 0) {
    +            //add new ports to cached supervisor
    +            cachedSupervisors.put(supervisorKey, Sets.union(newPorts, 
cachedSupervisorPorts));
    +        }
    +
    +        Set<Integer> badSlots = Sets.difference(cachedSupervisorPorts, 
supervisorPorts);
    +        return badSlots;
    +    }
    +
    +    private Set<String> getBlacklistHosts(Cluster cluster, Topologies 
topologies) {
    +        Set<String> blacklistSet = blacklistStrategy.getBlacklist(new 
ArrayList<>(badSupervisorsToleranceSlidingWindow), cluster, topologies);
    +        Set<String> blacklistHostSet = new HashSet<>();
    +        for (String supervisor : blacklistSet) {
    +            String host = cluster.getHost(supervisor);
    +            if (host != null) {
    +                blacklistHostSet.add(host);
    +            } else {
    +                LOG.info("supervisor {} is not alive, do not need to add 
to blacklist.", supervisor);
    +            }
    +        }
    +        return blacklistHostSet;
    +    }
    +
    +    /**
    +     * supervisor or port never exits once in tolerance time will be 
removed from cache.
    +     */
    +    private void removeLongTimeDisappearFromCache() {
    +
    +        Map<String, Integer> supervisorCountMap = new HashMap<String, 
Integer>();
    +        Map<WorkerSlot, Integer> slotCountMap = new HashMap<WorkerSlot, 
Integer>();
    +
    +        for (Map<String, Set<Integer>> item : 
badSupervisorsToleranceSlidingWindow) {
    +            Set<String> supervisors = item.keySet();
    +            for (String supervisor : supervisors) {
    +                int supervisorCount = 
supervisorCountMap.getOrDefault(supervisor, 0);
    +                Set<Integer> slots = item.get(supervisor);
    +                if (slots.equals(cachedSupervisors.get(supervisor))) { // 
treat supervisor as bad only if all slots are bad
    +                    supervisorCountMap.put(supervisor, supervisorCount + 
1);
    +                }
    +                for (Integer slot : slots) {
    +                    WorkerSlot workerSlot = new WorkerSlot(supervisor, 
slot);
    +                    int slotCount = slotCountMap.getOrDefault(workerSlot, 
0);
    +                    slotCountMap.put(workerSlot, slotCount + 1);
    +                }
    +            }
    +        }
    +
    +        for (Map.Entry<String, Integer> entry : 
supervisorCountMap.entrySet()) {
    +            String key = entry.getKey();
    +            int value = entry.getValue();
    +            if (value == windowSize) { // supervisor which was never back 
to normal in tolerance period will be removed from cache
    +                cachedSupervisors.remove(key);
    +                LOG.info("Supervisor {} was never back to normal during 
tolerance period, probably dead. Will remove from cache.", key);
    +            }
    +        }
    +
    +        for (Map.Entry<WorkerSlot, Integer> entry : 
slotCountMap.entrySet()) {
    +            WorkerSlot workerSlot = entry.getKey();
    +            String supervisorKey = workerSlot.getNodeId();
    +            Integer slot = workerSlot.getPort();
    +            int value = entry.getValue();
    +            if (value == windowSize) { // worker slot which was never back 
to normal in tolerance period will be removed from cache
    +                Set<Integer> slots = cachedSupervisors.get(supervisorKey);
    +                if (slots != null) { // slots will be null while 
supervisor has been removed from cached supervisors
    +                    slots.remove(slot);
    +                    cachedSupervisors.put(supervisorKey, slots);
    +                }
    +                LOG.info("Worker slot {} was never back to normal during 
tolerance period, probably dead. Will be removed from cache.", workerSlot);
    +            }
    +        }
    +    }
    +
    +    private Object initializeInstance(String className, String 
representation) {
    --- End diff --
    
    I think there is a function in Utils, or ServerUtils that can do this for 
us.


---

Reply via email to