Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1744#discussion_r90521664
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -0,0 +1,3729 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * 
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.daemon.nimbus;
    +
    +import static org.apache.storm.metric.StormMetricsRegistry.registerMeter;
    +import static org.apache.storm.utils.Utils.OR;
    +
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.InterruptedIOException;
    +import java.io.OutputStream;
    +import java.net.BindException;
    +import java.net.ServerSocket;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.WritableByteChannel;
    +import java.security.Principal;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.function.UnaryOperator;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.security.auth.Subject;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.StormTimer;
    +import org.apache.storm.blobstore.AtomicOutputStream;
    +import org.apache.storm.blobstore.BlobStore;
    +import org.apache.storm.blobstore.BlobStoreAclHandler;
    +import org.apache.storm.blobstore.BlobSynchronizer;
    +import org.apache.storm.blobstore.InputStreamWithMeta;
    +import org.apache.storm.blobstore.KeySequenceNumber;
    +import org.apache.storm.blobstore.LocalFsBlobStore;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.daemon.DaemonCommon;
    +import org.apache.storm.daemon.Shutdownable;
    +import org.apache.storm.daemon.StormCommon;
    +import org.apache.storm.generated.AlreadyAliveException;
    +import org.apache.storm.generated.Assignment;
    +import org.apache.storm.generated.AuthorizationException;
    +import org.apache.storm.generated.BeginDownloadResult;
    +import org.apache.storm.generated.ClusterSummary;
    +import org.apache.storm.generated.CommonAggregateStats;
    +import org.apache.storm.generated.ComponentAggregateStats;
    +import org.apache.storm.generated.ComponentPageInfo;
    +import org.apache.storm.generated.ComponentType;
    +import org.apache.storm.generated.Credentials;
    +import org.apache.storm.generated.DebugOptions;
    +import org.apache.storm.generated.ErrorInfo;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.ExecutorStats;
    +import org.apache.storm.generated.ExecutorSummary;
    +import org.apache.storm.generated.GetInfoOptions;
    +import org.apache.storm.generated.InvalidTopologyException;
    +import org.apache.storm.generated.KeyAlreadyExistsException;
    +import org.apache.storm.generated.KeyNotFoundException;
    +import org.apache.storm.generated.KillOptions;
    +import org.apache.storm.generated.LSTopoHistory;
    +import org.apache.storm.generated.ListBlobsResult;
    +import org.apache.storm.generated.LogConfig;
    +import org.apache.storm.generated.LogLevel;
    +import org.apache.storm.generated.LogLevelAction;
    +import org.apache.storm.generated.Nimbus.Iface;
    +import org.apache.storm.generated.Nimbus.Processor;
    +import org.apache.storm.generated.NimbusSummary;
    +import org.apache.storm.generated.NodeInfo;
    +import org.apache.storm.generated.NotAliveException;
    +import org.apache.storm.generated.NumErrorsChoice;
    +import org.apache.storm.generated.ProfileAction;
    +import org.apache.storm.generated.ProfileRequest;
    +import org.apache.storm.generated.ReadableBlobMeta;
    +import org.apache.storm.generated.RebalanceOptions;
    +import org.apache.storm.generated.SettableBlobMeta;
    +import org.apache.storm.generated.StormBase;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.generated.SubmitOptions;
    +import org.apache.storm.generated.SupervisorInfo;
    +import org.apache.storm.generated.SupervisorPageInfo;
    +import org.apache.storm.generated.SupervisorSummary;
    +import org.apache.storm.generated.TopologyActionOptions;
    +import org.apache.storm.generated.TopologyHistoryInfo;
    +import org.apache.storm.generated.TopologyInfo;
    +import org.apache.storm.generated.TopologyInitialStatus;
    +import org.apache.storm.generated.TopologyPageInfo;
    +import org.apache.storm.generated.TopologyStatus;
    +import org.apache.storm.generated.TopologySummary;
    +import org.apache.storm.generated.WorkerResources;
    +import org.apache.storm.generated.WorkerSummary;
    +import org.apache.storm.logging.ThriftAccessLogger;
    +import org.apache.storm.metric.ClusterMetricsConsumerExecutor;
    +import org.apache.storm.metric.StormMetricsRegistry;
    +import org.apache.storm.metric.api.DataPoint;
    +import org.apache.storm.metric.api.IClusterMetricsConsumer;
    +import org.apache.storm.metric.api.IClusterMetricsConsumer.ClusterInfo;
    +import org.apache.storm.nimbus.DefaultTopologyValidator;
    +import org.apache.storm.nimbus.ILeaderElector;
    +import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
    +import org.apache.storm.nimbus.ITopologyValidator;
    +import org.apache.storm.nimbus.NimbusInfo;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.DefaultScheduler;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.INimbus;
    +import org.apache.storm.scheduler.IScheduler;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.SchedulerAssignmentImpl;
    +import org.apache.storm.scheduler.SupervisorDetails;
    +import org.apache.storm.scheduler.Topologies;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.ResourceUtils;
    +import org.apache.storm.security.INimbusCredentialPlugin;
    +import org.apache.storm.security.auth.AuthUtils;
    +import org.apache.storm.security.auth.IAuthorizer;
    +import org.apache.storm.security.auth.ICredentialsRenewer;
    +import org.apache.storm.security.auth.IGroupMappingServiceProvider;
    +import org.apache.storm.security.auth.IPrincipalToLocal;
    +import org.apache.storm.security.auth.NimbusPrincipal;
    +import org.apache.storm.security.auth.ReqContext;
    +import org.apache.storm.security.auth.ThriftConnectionType;
    +import org.apache.storm.security.auth.ThriftServer;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.BufferInputStream;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.LocalState;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.TimeCacheMap;
    +import org.apache.storm.utils.TupleUtils;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.utils.Utils.UptimeComputer;
    +import org.apache.storm.utils.VersionInfo;
    +import org.apache.storm.validation.ConfigValidation;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.thrift.TException;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.json.simple.JSONValue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.codahale.metrics.Meter;
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.collect.ImmutableMap;
    +
    +public class Nimbus implements Iface, Shutdownable, DaemonCommon {
    +    private final static Logger LOG = 
LoggerFactory.getLogger(Nimbus.class);
    +    
    +    //    Metrics
    +    private static final Meter submitTopologyWithOptsCalls = 
registerMeter("nimbus:num-submitTopologyWithOpts-calls");
    +    private static final Meter submitTopologyCalls = 
registerMeter("nimbus:num-submitTopology-calls");
    +    private static final Meter killTopologyWithOptsCalls = 
registerMeter("nimbus:num-killTopologyWithOpts-calls");
    +    private static final Meter killTopologyCalls = 
registerMeter("nimbus:num-killTopology-calls");
    +    private static final Meter rebalanceCalls = 
registerMeter("nimbus:num-rebalance-calls");
    +    private static final Meter activateCalls = 
registerMeter("nimbus:num-activate-calls");
    +    private static final Meter deactivateCalls = 
registerMeter("nimbus:num-deactivate-calls");
    +    private static final Meter debugCalls = 
registerMeter("nimbus:num-debug-calls");
    +    private static final Meter setWorkerProfilerCalls = 
registerMeter("nimbus:num-setWorkerProfiler-calls");
    +    private static final Meter getComponentPendingProfileActionsCalls = 
registerMeter("nimbus:num-getComponentPendingProfileActions-calls");
    +    private static final Meter setLogConfigCalls = 
registerMeter("nimbus:num-setLogConfig-calls");
    +    private static final Meter uploadNewCredentialsCalls = 
registerMeter("nimbus:num-uploadNewCredentials-calls");
    +    private static final Meter beginFileUploadCalls = 
registerMeter("nimbus:num-beginFileUpload-calls");
    +    private static final Meter uploadChunkCalls = 
registerMeter("nimbus:num-uploadChunk-calls");
    +    private static final Meter finishFileUploadCalls = 
registerMeter("nimbus:num-finishFileUpload-calls");
    +    private static final Meter beginFileDownloadCalls = 
registerMeter("nimbus:num-beginFileDownload-calls");
    +    private static final Meter downloadChunkCalls = 
registerMeter("nimbus:num-downloadChunk-calls");
    +    private static final Meter getNimbusConfCalls = 
registerMeter("nimbus:num-getNimbusConf-calls");
    +    private static final Meter getLogConfigCalls = 
registerMeter("nimbus:num-getLogConfig-calls");
    +    private static final Meter getTopologyConfCalls = 
registerMeter("nimbus:num-getTopologyConf-calls");
    +    private static final Meter getTopologyCalls = 
registerMeter("nimbus:num-getTopology-calls");
    +    private static final Meter getUserTopologyCalls = 
registerMeter("nimbus:num-getUserTopology-calls");
    +    private static final Meter getClusterInfoCalls = 
registerMeter("nimbus:num-getClusterInfo-calls");
    +    private static final Meter getTopologyInfoWithOptsCalls = 
registerMeter("nimbus:num-getTopologyInfoWithOpts-calls");
    +    private static final Meter getTopologyInfoCalls = 
registerMeter("nimbus:num-getTopologyInfo-calls");
    +    private static final Meter getTopologyPageInfoCalls = 
registerMeter("nimbus:num-getTopologyPageInfo-calls");
    +    private static final Meter getSupervisorPageInfoCalls = 
registerMeter("nimbus:num-getSupervisorPageInfo-calls");
    +    private static final Meter getComponentPageInfoCalls = 
registerMeter("nimbus:num-getComponentPageInfo-calls");
    +    private static final Meter shutdownCalls = 
registerMeter("nimbus:num-shutdown-calls");
    +    // END Metrics
    +    
    +    private static final String STORM_VERSION = VersionInfo.getVersion();
    +    @VisibleForTesting
    +    public static final List<ACL> ZK_ACLS = 
Arrays.asList(ZooDefs.Ids.CREATOR_ALL_ACL.get(0),
    +            new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE, 
ZooDefs.Ids.ANYONE_ID_UNSAFE));
    +    private static final Subject NIMBUS_SUBJECT = new Subject();
    +    static {
    +        NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal());
    +        NIMBUS_SUBJECT.setReadOnly();
    +    }
    +    
    +    // TOPOLOGY STATE TRANSITIONS
    +    private static StormBase make(TopologyStatus status) {
    +        StormBase ret = new StormBase();
    +        ret.set_status(status);
    +        //The following are required for backwards compatibility with 
clojure code
    +        ret.set_component_executors(Collections.emptyMap());
    +        ret.set_component_debug(Collections.emptyMap());
    +        return ret;
    +    }
    +    
    +    private static final TopologyStateTransition NOOP_TRANSITION = (arg, 
nimbus, topoId, base) -> null;
    +    private static final TopologyStateTransition INACTIVE_TRANSITION = 
(arg, nimbus, topoId, base) -> Nimbus.make(TopologyStatus.INACTIVE);
    +    private static final TopologyStateTransition ACTIVE_TRANSITION = (arg, 
nimbus, topoId, base) -> Nimbus.make(TopologyStatus.ACTIVE);
    +    private static final TopologyStateTransition KILL_TRANSITION = 
(killTime, nimbus, topoId, base) -> {
    +        int delay = 0;
    +        if (killTime != null) {
    +            delay = ((Number)killTime).intValue();
    +        } else {
    +            delay = Utils.getInt(Nimbus.readTopoConf(topoId, 
nimbus.getBlobStore()).get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
    +        }
    +        nimbus.delayEvent(topoId, delay, TopologyActions.REMOVE, null);
    +        StormBase sb = new StormBase();
    +        sb.set_status(TopologyStatus.KILLED);
    +        TopologyActionOptions tao = new TopologyActionOptions();
    +        KillOptions opts = new KillOptions();
    +        opts.set_wait_secs(delay);
    +        tao.set_kill_options(opts);
    +        sb.set_topology_action_options(tao);
    +        sb.set_component_executors(Collections.emptyMap());
    +        sb.set_component_debug(Collections.emptyMap());
    +        return sb;
    +    };
    +    
    +    private static final TopologyStateTransition REBALANCE_TRANSITION = 
(args, nimbus, topoId, base) -> {
    +        RebalanceOptions rbo = ((RebalanceOptions) args).deepCopy();
    +        int delay = 0;
    +        if (rbo.is_set_wait_secs()) {
    +            delay = rbo.get_wait_secs();
    +        } else {
    +            delay = Utils.getInt(Nimbus.readTopoConf(topoId, 
nimbus.getBlobStore()).get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
    +        }
    +        nimbus.delayEvent(topoId, delay, TopologyActions.DO_REBALANCE, 
null);
    +        
    +        rbo.set_wait_secs(delay);
    +        if (!rbo.is_set_num_executors()) {
    +            rbo.set_num_executors(Collections.emptyMap());
    +        }
    +        
    +        StormBase sb = new StormBase();
    +        sb.set_status(TopologyStatus.REBALANCING);
    +        sb.set_prev_status(base.get_status());
    +        TopologyActionOptions tao = new TopologyActionOptions();
    +        tao.set_rebalance_options(rbo);
    +        sb.set_topology_action_options(tao);
    +        sb.set_component_executors(Collections.emptyMap());
    +        sb.set_component_debug(Collections.emptyMap());
    +        
    +        return sb;
    +    };
    +    
    +    private static final TopologyStateTransition 
STARTUP_WHEN_KILLED_TRANSITION = (args, nimbus, topoId, base) -> {
    +        int delay = 
base.get_topology_action_options().get_kill_options().get_wait_secs();
    +        nimbus.delayEvent(topoId, delay, TopologyActions.REMOVE, null);
    +        return null;
    +    };
    +    
    +    private static final TopologyStateTransition REMOVE_TRANSITION = 
(args, nimbus, topoId, base) -> {
    +        LOG.info("Killing topology: {}", topoId);
    +        IStormClusterState state = nimbus.getStormClusterState();
    +        state.removeStorm(topoId);
    +        BlobStore store = nimbus.getBlobStore();
    +        if (store instanceof LocalFsBlobStore) {
    +            for (String key: Nimbus.getKeyListFromId(nimbus.getConf(), 
topoId)) {
    +                state.removeBlobstoreKey(key);
    +                state.removeKeyVersion(key);
    +            }
    +        }
    +        return null;
    +    };
    +    
    +    private static final TopologyStateTransition 
STARTUP_WHEN_REBALANCING_TRANSITION = (args, nimbus, topoId, base) -> {
    +        int delay = 
base.get_topology_action_options().get_rebalance_options().get_wait_secs();
    +        nimbus.delayEvent(topoId, delay, TopologyActions.DO_REBALANCE, 
null);
    +        return null;
    +    };
    +    
    +    private static final TopologyStateTransition DO_REBALANCE_TRANSITION = 
(args, nimbus, topoId, base) -> {
    +        nimbus.doRebalance(topoId, base);
    +        return Nimbus.make(base.get_prev_status());
    +    };
    +    
    +    private static final Map<TopologyStatus, Map<TopologyActions, 
TopologyStateTransition>> TOPO_STATE_TRANSITIONS = 
    +            new ImmutableMap.Builder<TopologyStatus, Map<TopologyActions, 
TopologyStateTransition>>()
    +            .put(TopologyStatus.ACTIVE, new 
ImmutableMap.Builder<TopologyActions, TopologyStateTransition>()
    +                    .put(TopologyActions.INACTIVATE, INACTIVE_TRANSITION)
    +                    .put(TopologyActions.ACTIVATE, NOOP_TRANSITION)
    +                    .put(TopologyActions.REBALANCE, REBALANCE_TRANSITION)
    +                    .put(TopologyActions.KILL, KILL_TRANSITION)
    +                    .build())
    +            .put(TopologyStatus.INACTIVE, new 
ImmutableMap.Builder<TopologyActions, TopologyStateTransition>()
    +                    .put(TopologyActions.ACTIVATE, ACTIVE_TRANSITION)
    +                    .put(TopologyActions.INACTIVATE, NOOP_TRANSITION)
    +                    .put(TopologyActions.REBALANCE, REBALANCE_TRANSITION)
    +                    .put(TopologyActions.KILL, KILL_TRANSITION)
    +                    .build())
    +            .put(TopologyStatus.KILLED, new 
ImmutableMap.Builder<TopologyActions, TopologyStateTransition>()
    +                    .put(TopologyActions.STARTUP, 
STARTUP_WHEN_KILLED_TRANSITION)
    +                    .put(TopologyActions.KILL, KILL_TRANSITION)
    +                    .put(TopologyActions.REMOVE, REMOVE_TRANSITION)
    +                    .build())
    +            .put(TopologyStatus.REBALANCING, new 
ImmutableMap.Builder<TopologyActions, TopologyStateTransition>()
    +                    .put(TopologyActions.STARTUP, 
STARTUP_WHEN_REBALANCING_TRANSITION)
    +                    .put(TopologyActions.KILL, KILL_TRANSITION)
    +                    .put(TopologyActions.DO_REBALANCE, 
DO_REBALANCE_TRANSITION)
    +                    .build())
    +            .build();
    +    
    +    // END TOPOLOGY STATE TRANSITIONS
    +    
    +    private static final class Assoc<K,V> implements UnaryOperator<Map<K, 
V>> {
    +        private final K key;
    +        private final V value;
    +        
    +        public Assoc(K key, V value) {
    +            this.key = key;
    +            this.value = value;
    +        }
    +        
    +        @Override
    +        public Map<K, V> apply(Map<K, V> t) {
    +            Map<K, V> ret = new HashMap<>(t);
    +            ret.put(key, value);
    +            return ret;
    +        }
    +    }
    +    
    +    private static final class Dissoc<K,V> implements UnaryOperator<Map<K, 
V>> {
    +        private final K key;
    +        
    +        public Dissoc(K key) {
    +            this.key = key;
    +        }
    +        
    +        @Override
    +        public Map<K, V> apply(Map<K, V> t) {
    +            Map<K, V> ret = new HashMap<>(t);
    +            ret.remove(key);
    +            return ret;
    +        }
    +    }
    +    
    +    @VisibleForTesting
    +    public static class StandaloneINimbus implements INimbus {
    +
    +        @Override
    +        public void prepare(@SuppressWarnings("rawtypes") Map stormConf, 
String schedulerLocalDir) {
    +            //NOOP
    +        }
    +
    +        @SuppressWarnings("unchecked")
    +        @Override
    +        public Collection<WorkerSlot> 
allSlotsAvailableForScheduling(Collection<SupervisorDetails> supervisors,
    +                Topologies topologies, Set<String> 
topologiesMissingAssignments) {
    +            Set<WorkerSlot> ret = new HashSet<>();
    +            for (SupervisorDetails sd: supervisors) {
    +                String id = sd.getId();
    +                for (Number port: (Collection<Number>)sd.getMeta()) {
    +                    ret.add(new WorkerSlot(id, port));
    +                }
    +            }
    +            return ret;
    +        }
    +
    +        @Override
    +        public void assignSlots(Topologies topologies, Map<String, 
Collection<WorkerSlot>> newSlotsByTopologyId) {
    +            //NOOP
    +        }
    +
    +        @Override
    +        public String getHostName(Map<String, SupervisorDetails> 
supervisors, String nodeId) {
    +            SupervisorDetails sd = supervisors.get(nodeId);
    +            if (sd != null) {
    +                return sd.getHost();
    +            }
    +            return null;
    +        }
    +
    +        @Override
    +        public IScheduler getForcedScheduler() {
    +            return null;
    +        }
    +        
    +    };
    +    
    +    private static class CommonTopoInfo {
    +        public Map<String, Object> topoConf;
    +        public String topoName;
    +        public StormTopology topology;
    +        public Map<Integer, String> taskToComponent;
    +        public StormBase base;
    +        public int launchTimeSecs;
    +        public Assignment assignment;
    +        public Map<List<Integer>, Map<String, Object>> beats;
    +        public HashSet<String> allComponents;
    +
    +    }
    +    
    +    @SuppressWarnings("deprecation")
    +    private static <T extends AutoCloseable> TimeCacheMap<String, T> 
fileCacheMap(Map<String, Object> conf) {
    +        return new 
TimeCacheMap<>(Utils.getInt(conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 
600),
    +                (id, stream) -> {
    +                    try {
    +                        stream.close();
    +                    } catch (Exception e) {
    +                        throw new RuntimeException(e);
    +                    }
    +                });
    +    }
    +
    +    private static <K, V> Map<K, V> merge(Map<? extends K, ? extends V> 
first, Map<? extends K, ? extends V> other) {
    +        Map<K, V> ret = new HashMap<>(first);
    +        if (other != null) {
    +            ret.putAll(other);
    +        }
    +        return ret;
    +    }
    +    
    +    private static <K, V> Map<K, V> mapDiff(Map<? extends K, ? extends V> 
first, Map<? extends K, ? extends V> second) {
    +        Map<K, V> ret = new HashMap<>();
    +        for (Entry<? extends K, ? extends V> entry: second.entrySet()) {
    +            if (!entry.getValue().equals(first.get(entry.getKey()))) {
    +                ret.put(entry.getKey(), entry.getValue());
    +            }
    +        }
    +        return ret;
    +    }
    +
    +    private static IScheduler makeScheduler(Map<String, Object> conf, 
INimbus inimbus) {
    +        String schedClass = (String) conf.get(Config.STORM_SCHEDULER);
    +        IScheduler scheduler = inimbus == null ? null : 
inimbus.getForcedScheduler();
    +        if (scheduler != null) {
    +            LOG.info("Using forced scheduler from INimbus {} {}", 
scheduler.getClass(), scheduler);
    +        } else if (schedClass != null) {
    +            LOG.info("Using custom scheduler: {}", schedClass);
    +            scheduler = Utils.newInstance(schedClass);
    +        } else {
    +            LOG.info("Using default scheduler");
    +            scheduler = new DefaultScheduler();
    +        }
    +        scheduler.prepare(conf);
    +        return scheduler;
    +    }
    +
    +    /**
    +     * Constructs a TimeCacheMap instance with a blob store timeout whose
    +     * expiration callback invokes cancel on the value held by an expired 
entry when
    +     * that value is an AtomicOutputStream and calls close otherwise.
    +     * @param conf the config to use
    +     * @return the newly created map
    +     */
    +    @SuppressWarnings("deprecation")
    +    private static <T extends AutoCloseable> TimeCacheMap<String, T> 
makeBlobCacheMap(Map<String, Object> conf) {
    +        return new 
TimeCacheMap<>(Utils.getInt(conf.get(Config.NIMBUS_BLOBSTORE_EXPIRATION_SECS), 
600),
    +                (id, stream) -> {
    +                    try {
    +                        if (stream instanceof AtomicOutputStream) {
    +                            ((AtomicOutputStream) stream).cancel();
    +                        } else {
    +                            stream.close();
    +                        }
    +                    } catch (Exception e) {
    +                        throw new RuntimeException(e);
    +                    }
    +                });
    +    }
    +    
    +    /**
    +     * Constructs a TimeCacheMap instance with a blobstore timeout and no 
callback function.
    +     * @param conf
    +     * @return
    +     */
    +    @SuppressWarnings("deprecation")
    +    private static TimeCacheMap<String, Iterator<String>> 
makeBlobListCachMap(Map<String, Object> conf) {
    +        return new 
TimeCacheMap<>(Utils.getInt(conf.get(Config.NIMBUS_BLOBSTORE_EXPIRATION_SECS), 
600));
    +    }
    +    
    +    private static ITopologyActionNotifierPlugin 
createTopologyActionNotifier(Map<String, Object> conf) {
    +        String clazz = (String) 
conf.get(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN);
    +        ITopologyActionNotifierPlugin ret = null;
    +        if (clazz != null && !clazz.isEmpty()) {
    +            ret = Utils.newInstance(clazz);
    +            try {
    +                ret.prepare(conf);
    +            } catch (Exception e) {
    +                LOG.warn("Ignoring exception, Could not initialize {}", 
clazz, e);
    +                ret = null;
    +            }
    +        }
    +        return ret;
    +    }
    +    
    +    @SuppressWarnings("unchecked")
    +    private static List<ClusterMetricsConsumerExecutor> 
makeClusterMetricsConsumerExecutors(Map<String, Object> conf) {
    +        Collection<Map<String, Object>> consumers = 
(Collection<Map<String, Object>>) 
conf.get(Config.STORM_CLUSTER_METRICS_CONSUMER_REGISTER);
    +        List<ClusterMetricsConsumerExecutor> ret = new ArrayList<>();
    +        if (consumers != null) {
    +            for (Map<String, Object> consumer : consumers) {
    +                ret.add(new ClusterMetricsConsumerExecutor((String) 
consumer.get("class"), consumer.get("argument")));
    +            }
    +        }
    +        return ret;
    +    }
    +    
    +    private static Subject getSubject() {
    +        return ReqContext.context().subject();
    +    }
    +    
    +    static Map<String, Object> readTopoConf(String topoId, BlobStore 
blobStore) throws KeyNotFoundException, AuthorizationException, IOException {
    +        return blobStore.readTopologyConf(topoId, getSubject());
    +    }
    +    
    +    static List<String> getKeyListFromId(Map<String, Object> conf, String 
id) {
    +        List<String> ret = new ArrayList<>(3);
    +        ret.add(ConfigUtils.masterStormCodeKey(id));
    +        ret.add(ConfigUtils.masterStormConfKey(id));
    +        if (!ConfigUtils.isLocalMode(conf)) {
    +            ret.add(ConfigUtils.masterStormJarKey(id));
    +        }
    +        return ret;
    +    }
    +    
    +    private static int getVersionForKey(String key, NimbusInfo nimbusInfo, 
Map<String, Object> conf) {
    +        KeySequenceNumber kseq = new KeySequenceNumber(key, nimbusInfo);
    +        return kseq.getKeySequenceNumber(conf);
    +    }
    +    
    +    private static StormTopology readStormTopology(String topoId, 
BlobStore store) throws KeyNotFoundException, AuthorizationException, 
IOException {
    +        return store.readTopology(topoId, getSubject());
    +    }
    +    
    +    private static Map<String, Object> readTopoConfAsNimbus(String topoId, 
BlobStore store) throws KeyNotFoundException, AuthorizationException, 
IOException {
    +        return store.readTopologyConf(topoId, NIMBUS_SUBJECT);
    +    }
    +    
    +    private static StormTopology readStormTopologyAsNimbus(String topoId, 
BlobStore store) throws KeyNotFoundException, AuthorizationException, 
IOException {
    +        return store.readTopology(topoId, NIMBUS_SUBJECT);
    +    }
    +    
    +    /**
    +     * convert {topology-id -> SchedulerAssignment} to
    +     *         {topology-id -> {executor [node port]}}
    +     * @return
    +     */
    +    private static Map<String, Map<List<Long>, List<Object>>> 
computeTopoToExecToNodePort(Map<String, SchedulerAssignment> schedAssignments) {
    +        Map<String, Map<List<Long>, List<Object>>> ret = new HashMap<>();
    +        for (Entry<String, SchedulerAssignment> schedEntry: 
schedAssignments.entrySet()) {
    +            Map<List<Long>, List<Object>> execToNodePort = new HashMap<>();
    +            for (Entry<ExecutorDetails, WorkerSlot> execAndNodePort: 
schedEntry.getValue().getExecutorToSlot().entrySet()) {
    +                ExecutorDetails exec = execAndNodePort.getKey();
    +                WorkerSlot slot = execAndNodePort.getValue();
    +                
    +                List<Long> listExec = new ArrayList<>(2);
    +                listExec.add((long) exec.getStartTask());
    +                listExec.add((long) exec.getEndTask());
    +                
    +                List<Object> nodePort = new ArrayList<>(2);
    +                nodePort.add(slot.getNodeId());
    +                nodePort.add((long)slot.getPort());
    +                
    +                execToNodePort.put(listExec, nodePort);
    +            }
    +            ret.put(schedEntry.getKey(), execToNodePort);
    +        }
    +        return ret;
    +    }
    +    
    +    private static int numUsedWorkers(SchedulerAssignment assignment) {
    +        if (assignment == null) {
    +            return 0;
    +        }
    +        return assignment.getSlots().size();
    +    }
    +    
    +    /**
    +     * convert {topology-id -> SchedulerAssignment} to
    +     *         {topology-id -> {[node port] [mem-on-heap mem-off-heap 
cpu]}}
    +     * Make sure this can deal with other non-RAS schedulers
    +     * later we may further support map-for-any-resources
    +     * @param schedAssignments the assignments
    +     * @return  {topology-id {[node port] [mem-on-heap mem-off-heap cpu]}}
    +     */
    +    private static Map<String, Map<List<Object>, List<Double>>> 
computeTopoToNodePortToResources(Map<String, SchedulerAssignment> 
schedAssignments) {
    +        Map<String, Map<List<Object>, List<Double>>> ret = new HashMap<>();
    +        for (Entry<String, SchedulerAssignment> schedEntry: 
schedAssignments.entrySet()) {
    +            Map<List<Object>, List<Double>> nodePortToResources = new 
HashMap<>();
    +            for (WorkerSlot slot: 
schedEntry.getValue().getExecutorToSlot().values()) {
    +                List<Object> nodePort = new ArrayList<>(2);
    +                nodePort.add(slot.getNodeId());
    +                nodePort.add((long)slot.getPort());
    +                
    +                List<Double> resources = new ArrayList<>(3);
    +                resources.add(slot.getAllocatedMemOnHeap());
    +                resources.add(slot.getAllocatedMemOffHeap());
    +                resources.add(slot.getAllocatedCpu());
    +                
    +                nodePortToResources.put(nodePort, resources);
    +            }
    +            ret.put(schedEntry.getKey(), nodePortToResources);
    +        }
    +        return ret;
    +    }
    +
    +    private static Map<String, Map<List<Long>, List<Object>>> 
computeNewTopoToExecToNodePort(Map<String, SchedulerAssignment> 
schedAssignments,
    +            Map<String, Assignment> existingAssignments) {
    +        Map<String, Map<List<Long>, List<Object>>> ret = 
computeTopoToExecToNodePort(schedAssignments);
    +        // Print some useful information
    +        if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
    +            for (Entry<String, Map<List<Long>, List<Object>>> entry: 
ret.entrySet()) {
    +                String topoId = entry.getKey();
    +                Map<List<Long>, List<Object>> execToNodePort = 
entry.getValue();
    +                Assignment assignment = existingAssignments.get(topoId);
    +                if (assignment == null) {
    +                    continue;
    +                }
    +                Map<List<Long>, NodeInfo> old = 
assignment.get_executor_node_port();
    +                Map<List<Long>, List<Object>> reassigned = new HashMap<>();
    +                for (Entry<List<Long>, List<Object>> execAndNodePort: 
execToNodePort.entrySet()) {
    +                    NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
    +                    String node = (String) 
execAndNodePort.getValue().get(0);
    +                    Long port = (Long) execAndNodePort.getValue().get(1);
    +                    if (oldAssigned == null || 
!oldAssigned.get_node().equals(node) 
    +                            || 
!port.equals(oldAssigned.get_port_iterator().next())) {
    +                        reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
    +                    }
    +                }
    +
    +                if (!reassigned.isEmpty()) {
    +                    int count = (new 
HashSet<>(execToNodePort.values())).size();
    +                    Set<List<Long>> reExecs = reassigned.keySet();
    +                    LOG.info("Reassigning {} to {} slots", topoId, count);
    +                    LOG.info("Reassign executors: {}", reExecs);
    +                }
    +            }
    +        }
    +        return ret;
    +    }
    +    
    +    private static List<List<Long>> changedExecutors(Map<List<Long>, 
NodeInfo> map,
    +            Map<List<Long>, List<Object>> newExecToNodePort) {
    +        HashMap<NodeInfo, List<List<Long>>> tmpSlotAssigned = map == null 
? new HashMap<>() : Utils.reverseMap(map);
    +        HashMap<List<Object>, List<List<Long>>> slotAssigned = new 
HashMap<>();
    +        for (Entry<NodeInfo, List<List<Long>>> entry: 
tmpSlotAssigned.entrySet()) {
    +            NodeInfo ni = entry.getKey();
    +            List<Object> key = new ArrayList<>(2);
    +            key.add(ni.get_node());
    +            key.add(ni.get_port_iterator().next());
    +            List<List<Long>> value = new ArrayList<>(entry.getValue());
    +            value.sort((a, b) -> a.get(0).compareTo(b.get(0)));
    +            slotAssigned.put(key, value);
    +        }
    +        HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = 
newExecToNodePort == null ? new HashMap<>() : 
Utils.reverseMap(newExecToNodePort);
    +        HashMap<List<Object>, List<List<Long>>> newSlotAssigned = new 
HashMap<>();
    +        for (Entry<List<Object>, List<List<Long>>> entry: 
tmpNewSlotAssigned.entrySet()) {
    +            List<List<Long>> value = new ArrayList<>(entry.getValue());
    +            value.sort((a, b) -> a.get(0).compareTo(b.get(0)));
    +            newSlotAssigned.put(entry.getKey(), value);
    +        }
    +        Map<List<Object>, List<List<Long>>> diff = mapDiff(slotAssigned, 
newSlotAssigned);
    +        List<List<Long>> ret = new ArrayList<>();
    +        for (List<List<Long>> val: diff.values()) {
    +            ret.addAll(val);
    +        }
    +        return ret;
    +    }
    +
    +    private static Set<WorkerSlot> newlyAddedSlots(Assignment old, 
Assignment current) {
    +        Set<NodeInfo> oldSlots = new 
HashSet<>(old.get_executor_node_port().values());
    +        Set<NodeInfo> niRet = new 
HashSet<>(current.get_executor_node_port().values());
    +        niRet.removeAll(oldSlots);
    +        Set<WorkerSlot> ret = new HashSet<>();
    +        for (NodeInfo ni: niRet) {
    +            ret.add(new WorkerSlot(ni.get_node(), 
ni.get_port_iterator().next()));
    +        }
    +        return ret;
    +    }
    +    
    +    private static Map<String, SupervisorDetails> 
basicSupervisorDetailsMap(IStormClusterState state) {
    +        Map<String, SupervisorDetails> ret = new HashMap<>();
    +        for (Entry<String, SupervisorInfo> entry: 
state.allSupervisorInfo().entrySet()) {
    +            String id = entry.getKey();
    +            SupervisorInfo info = entry.getValue();
    +            ret.put(id, new SupervisorDetails(id, info.get_hostname(), 
info.get_scheduler_meta(), null,
    +                    info.get_resources_map()));
    +        }
    +        return ret;
    +    }
    +    
    +    private static boolean isTopologyActive(IStormClusterState state, 
String topoName) {
    +        return state.getTopoId(topoName).isPresent();
    +    }
    +    
    +    private static Map<String, Object> tryReadTopoConf(String topoId, 
BlobStore store) throws NotAliveException, AuthorizationException, IOException {
    +        try {
    +            return readTopoConfAsNimbus(topoId, store);
    +            //Was a try-cause but I looked at the code around this and key 
not found is not wrapped in runtime,
    +            // so it is not needed
    +        } catch (KeyNotFoundException e) {
    +            if (topoId == null) {
    +                throw new NullPointerException();
    +            }
    +            throw new NotAliveException(topoId);
    +        }
    +    }
    +    
    +    private static final List<String> EMPTY_STRING_LIST = 
Collections.unmodifiableList(Collections.emptyList());
    +    private static final Set<String> EMPTY_STRING_SET = 
Collections.unmodifiableSet(Collections.emptySet());
    +    
    +    @VisibleForTesting
    +    public static Set<String> topoIdsToClean(IStormClusterState state, 
BlobStore store) {
    +        Set<String> ret = new HashSet<>();
    +        ret.addAll(OR(state.heartbeatStorms(), EMPTY_STRING_LIST));
    +        ret.addAll(OR(state.errorTopologies(), EMPTY_STRING_LIST));
    +        ret.addAll(OR(store.storedTopoIds(), EMPTY_STRING_SET));
    +        ret.addAll(OR(state.backpressureTopologies(), EMPTY_STRING_LIST));
    +        ret.removeAll(OR(state.activeStorms(), EMPTY_STRING_LIST));
    +        return ret;
    +    }
    +    
    +    private static String extractStatusStr(StormBase base) {
    +        String ret = null;
    +        TopologyStatus status = base.get_status();
    +        if (status != null) {
    +            ret = status.name().toUpperCase();
    +        }
    +        return ret;
    +    }
    +    
    +    private static int componentParallelism(Map<String, Object> topoConf, 
Object component) throws InvalidTopologyException {
    +        Map<String, Object> combinedConf = merge(topoConf, 
StormCommon.componentConf(component));
    +        int numTasks = 
Utils.getInt(combinedConf.get(Config.TOPOLOGY_TASKS), 
StormCommon.numStartExecutors(component));
    +        Integer maxParallel = 
Utils.getInt(combinedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM), null);
    +        int ret = numTasks;
    +        if (maxParallel != null) {
    +            ret = Math.min(maxParallel, numTasks);
    +        }
    +        return ret;
    +    }
    +    
    +    private static StormTopology normalizeTopology(Map<String, Object> 
topoConf, StormTopology topology) throws InvalidTopologyException {
    +        StormTopology ret = topology.deepCopy();
    +        for (Object comp: StormCommon.allComponents(ret).values()) {
    +            Map<String, Object> mergedConf = 
StormCommon.componentConf(comp);
    +            mergedConf.put(Config.TOPOLOGY_TASKS, 
componentParallelism(topoConf, comp));
    +            String jsonConf = JSONValue.toJSONString(mergedConf);
    +            StormCommon.getComponentCommon(comp).set_json_conf(jsonConf);
    +        }
    +        return ret;
    +    }
    +    
    +    private static void addToDecorators(Set<String> decorators, 
List<String> conf) {
    +        if (conf != null) {
    +            decorators.addAll(conf);
    +        }
    +    }
    +    
    +    @SuppressWarnings("unchecked")
    +    private static void addToSerializers(Map<String, String> ser, 
List<Object> conf) {
    +        if (conf != null) {
    +            for (Object o: conf) {
    +                if (o instanceof Map) {
    +                    ser.putAll((Map<String,String>)o);
    +                } else {
    +                    ser.put((String)o, null);
    +                }
    +            }
    +        }
    +    }
    +    
    +    @SuppressWarnings("unchecked")
    +    private static Map<String, Object> normalizeConf(Map<String,Object> 
conf, Map<String, Object> topoConf, StormTopology topology) {
    +        //ensure that serializations are same for all tasks no matter 
what's on
    +        // the supervisors. this also allows you to declare the 
serializations as a sequence
    +        List<Map<String, Object>> allConfs = new ArrayList<>();
    +        for (Object comp: StormCommon.allComponents(topology).values()) {
    +            allConfs.add(StormCommon.componentConf(comp));
    +        }
    +
    +        Set<String> decorators = new HashSet<>();
    +        //Yes we are putting in a config that is not the same type we 
pulled out.
    +        Map<String, String> serializers = new HashMap<>();
    +        for (Map<String, Object> c: allConfs) {
    +            addToDecorators(decorators, (List<String>) 
c.get(Config.TOPOLOGY_KRYO_DECORATORS));
    +            addToSerializers(serializers, (List<Object>) 
c.get(Config.TOPOLOGY_KRYO_REGISTER));
    +        }
    +        addToDecorators(decorators, 
(List<String>)topoConf.getOrDefault(Config.TOPOLOGY_KRYO_DECORATORS, 
    +                conf.get(Config.TOPOLOGY_KRYO_DECORATORS)));
    +        addToSerializers(serializers, 
(List<Object>)topoConf.getOrDefault(Config.TOPOLOGY_KRYO_REGISTER, 
    +                conf.get(Config.TOPOLOGY_KRYO_REGISTER)));
    +        
    +        Map<String, Object> mergedConf = merge(conf, topoConf);
    +        Map<String, Object> ret = new HashMap<>(topoConf);
    +        ret.put(Config.TOPOLOGY_KRYO_REGISTER, serializers);
    +        ret.put(Config.TOPOLOGY_KRYO_DECORATORS, new 
ArrayList<>(decorators));
    +        ret.put(Config.TOPOLOGY_ACKER_EXECUTORS, 
mergedConf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
    +        ret.put(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS, 
mergedConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS));
    +        ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 
mergedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
    +        return ret;
    +    }
    +    
    +    private static void rmBlobKey(BlobStore store, String key, 
IStormClusterState state) {
    +        try {
    +            store.deleteBlob(key, NIMBUS_SUBJECT);
    +            if (store instanceof LocalFsBlobStore) {
    +                state.removeBlobstoreKey(key);
    +            }
    +        } catch (Exception e) {
    +            //Yes eat the exception
    +            LOG.info("Exception {}", e);
    +        }
    +    }
    +    
    +    /**
    +     * Deletes jar files in dirLoc older than seconds.
    +     * @param dirLoc the location to look in for file
    +     * @param seconds how old is too old and should be deleted
    +     */
    +    @VisibleForTesting
    +    public static void cleanInbox(String dirLoc, int seconds) {
    +        final long now = Time.currentTimeMillis();
    +        final long ms = Time.secsToMillis(seconds);
    +        File dir = new File(dirLoc);
    +        for (File f : dir.listFiles((f) -> f.isFile() && 
((f.lastModified() + ms) <= now))) {
    +            if (f.delete()) {
    +                LOG.info("Cleaning inbox ... deleted: {}", f.getName());
    +            } else {
    +                LOG.error("Cleaning inbox ... error deleting: {}", 
f.getName());
    +            }
    +        }
    +    }
    +    
    +    private static ExecutorInfo toExecInfo(List<Long> exec) {
    +        return new ExecutorInfo(exec.get(0).intValue(), 
exec.get(1).intValue());
    +    }
    +    
    +    private static final Pattern TOPOLOGY_NAME_REGEX = 
Pattern.compile("^[^/.:\\\\]+$");
    +    private static void validateTopologyName(String name) throws 
InvalidTopologyException {
    +        Matcher m = TOPOLOGY_NAME_REGEX.matcher(name);
    +        if (!m.matches()) {
    +            throw new InvalidTopologyException("Topology name must match " 
+ TOPOLOGY_NAME_REGEX);
    +        }
    +    }
    +    
    +    private static StormTopology tryReadTopology(String topoId, BlobStore 
store) throws NotAliveException, AuthorizationException, IOException {
    +        try {
    +            return readStormTopologyAsNimbus(topoId, store);
    +        } catch (KeyNotFoundException e) {
    +            throw new NotAliveException(topoId);
    +        }
    +    }
    +    
    +    private static void validateTopologySize(Map<String, Object> topoConf, 
Map<String, Object> nimbusConf, StormTopology topology) throws 
InvalidTopologyException {
    +        int workerCount = 
Utils.getInt(topoConf.get(Config.TOPOLOGY_WORKERS), 1);
    +        Integer allowedWorkers = 
Utils.getInt(nimbusConf.get(Config.NIMBUS_SLOTS_PER_TOPOLOGY), null);
    +        int executorsCount = 0;
    +        for (Object comp : StormCommon.allComponents(topology).values()) {
    +            executorsCount += StormCommon.numStartExecutors(comp);
    +        }
    +        Integer allowedExecutors = 
Utils.getInt(nimbusConf.get(Config.NIMBUS_EXECUTORS_PER_TOPOLOGY), null);
    +        if (allowedExecutors != null && executorsCount > allowedExecutors) 
{
    +            throw new InvalidTopologyException("Failed to submit topology. 
Topology requests more than " +
    +                    allowedExecutors + " executors.");
    +        }
    +        
    +        if (allowedWorkers != null && workerCount > allowedWorkers) {
    +            throw new InvalidTopologyException("Failed to submit topology. 
Topology requests more than " +
    +                    allowedWorkers + " workers.");
    +        }
    +    }
    +    
    +    private static void setLoggerTimeouts(LogLevel level) {
    +        int timeoutSecs = level.get_reset_log_level_timeout_secs();
    +        if (timeoutSecs > 0) {
    +            level.set_reset_log_level_timeout_epoch(Time.currentTimeSecs() 
+ timeoutSecs);
    +        } else {
    +            level.unset_reset_log_level_timeout_epoch();
    +        }
    +    }
    +    
    +    @VisibleForTesting
    +    public static List<String> topologiesOnSupervisor(Map<String, 
Assignment> assignments, String supervisorId) {
    +        Set<String> ret = new HashSet<>();
    +        for (Entry<String, Assignment> entry: assignments.entrySet()) {
    +            Assignment assignment = entry.getValue();
    +            for (NodeInfo nodeInfo: 
assignment.get_executor_node_port().values()) {
    +                if (supervisorId.equals(nodeInfo.get_node())) {
    +                    ret.add(entry.getKey());
    +                    break;
    +                }
    +            }
    +        }
    +        
    +        return new ArrayList<>(ret);
    +    }
    +    
    +    private static IClusterMetricsConsumer.ClusterInfo mkClusterInfo() {
    +        return new 
IClusterMetricsConsumer.ClusterInfo(Time.currentTimeSecs());
    +    }
    +    
    +    private static List<DataPoint> extractClusterMetrics(ClusterSummary 
summ) {
    +        List<DataPoint> ret = new ArrayList<>();
    +        ret.add(new DataPoint("supervisors", summ.get_supervisors_size()));
    +        ret.add(new DataPoint("topologies", summ.get_topologies_size()));
    +        
    +        int totalSlots = 0;
    +        int usedSlots = 0;
    +        for (SupervisorSummary sup: summ.get_supervisors()) {
    +            usedSlots += sup.get_num_used_workers();
    +            totalSlots += sup.get_num_workers();
    +        }
    +        ret.add(new DataPoint("slotsTotal", totalSlots));
    +        ret.add(new DataPoint("slotsUsed", usedSlots));
    +        ret.add(new DataPoint("slotsFree", totalSlots - usedSlots));
    +        
    +        int totalExecutors = 0;
    +        int totalTasks = 0;
    +        for (TopologySummary topo: summ.get_topologies()) {
    +            totalExecutors += topo.get_num_executors();
    +            totalTasks += topo.get_num_tasks();
    +        }
    +        ret.add(new DataPoint("executorsTotal", totalExecutors));
    +        ret.add(new DataPoint("tasksTotal", totalTasks));
    +        return ret;
    +    }
    +
    +    private static Map<IClusterMetricsConsumer.SupervisorInfo, 
List<DataPoint>> extractSupervisorMetrics(ClusterSummary summ) {
    +        Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> ret = 
new HashMap<>();
    +        for (SupervisorSummary sup: summ.get_supervisors()) {
    +            IClusterMetricsConsumer.SupervisorInfo info = new 
IClusterMetricsConsumer.SupervisorInfo(sup.get_host(), sup.get_supervisor_id(), 
Time.currentTimeSecs());
    +            List<DataPoint> metrics = new ArrayList<>();
    +            metrics.add(new DataPoint("slotsTotal", 
sup.get_num_workers()));
    +            metrics.add(new DataPoint("slotsUsed", 
sup.get_num_used_workers()));
    +            metrics.add(new DataPoint("totalMem", 
sup.get_total_resources().get(Config.SUPERVISOR_MEMORY_CAPACITY_MB)));
    +            metrics.add(new DataPoint("totalCpu", 
sup.get_total_resources().get(Config.SUPERVISOR_CPU_CAPACITY)));
    +            metrics.add(new DataPoint("usedMem", sup.get_used_mem()));
    +            metrics.add(new DataPoint("usedCpu", sup.get_used_cpu()));
    +            ret.put(info, metrics);
    +        }
    +        return ret;
    +    }
    +    
    +    private static Map<String, Double> 
setResourcesDefaultIfNotSet(Map<String, Map<String, Double>> compResourcesMap, 
String compId, Map<String, Object> topoConf) {
    +        Map<String, Double> resourcesMap = compResourcesMap.get(compId);
    +        if (resourcesMap == null) {
    +            resourcesMap = new HashMap<>();
    +        }
    +        ResourceUtils.checkIntialization(resourcesMap, compId, topoConf);
    +        return resourcesMap;
    +    }
    +    
    +    private static void validatePortAvailable(Map<String, Object> conf) 
throws IOException {
    +        int port = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
    +        try (ServerSocket socket = new ServerSocket(port)) {
    +            //Nothing
    +        } catch (BindException e) {
    +            LOG.error("{} is not available. Check if another process is 
already listening on {}", port, port);
    +            System.exit(0);
    +        }
    +    }
    +    
    +    private static Nimbus launchServer(Map<String, Object> conf, INimbus 
inimbus) throws Exception {
    +        StormCommon.validateDistributedMode(conf);
    +        validatePortAvailable(conf);
    +        final Nimbus nimbus = new Nimbus(conf, inimbus);
    +        nimbus.launchServer();
    +        final ThriftServer server = new ThriftServer(conf, new 
Processor<>(nimbus), ThriftConnectionType.NIMBUS);
    +        Utils.addShutdownHookWithForceKillIn1Sec(() -> {
    +            nimbus.shutdown();
    +            server.stop();
    +        });
    +        LOG.info("Starting nimbus server for storm version '{}'", 
STORM_VERSION);
    +        server.serve();
    +        return nimbus;
    +    }
    +    
    +    public static Nimbus launch(INimbus inimbus) throws Exception {
    +        Map<String, Object> conf = merge(ConfigUtils.readStormConfig(),
    +                ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", 
false));
    +        return launchServer(conf, inimbus);
    +    }
    +    
    +    public static void main(String[] args) throws Exception {
    +        Utils.setupDefaultUncaughtExceptionHandler();
    +        launch(new StandaloneINimbus());
    +    }
    +    
    +    private final Map<String, Object> conf;
    +    private final NimbusInfo nimbusHostPortInfo;
    +    private final INimbus inimbus;
    +    private IAuthorizer authorizationHandler;
    +    private final IAuthorizer impersonationAuthorizationHandler;
    +    private final AtomicLong submittedCount;
    +    private final IStormClusterState stormClusterState;
    +    private final Object submitLock;
    +    private final Object credUpdateLock;
    +    private final AtomicReference<Map<String, Map<List<Integer>, 
Map<String, Object>>>> heartbeatsCache;
    +    @SuppressWarnings("deprecation")
    +    private final TimeCacheMap<String, BufferInputStream> downloaders;
    +    @SuppressWarnings("deprecation")
    +    private final TimeCacheMap<String, WritableByteChannel> uploaders;
    +    private final BlobStore blobStore;
    +    @SuppressWarnings("deprecation")
    +    private final TimeCacheMap<String, BufferInputStream> blobDownloaders;
    +    @SuppressWarnings("deprecation")
    +    private final TimeCacheMap<String, OutputStream> blobUploaders;
    +    @SuppressWarnings("deprecation")
    +    private final TimeCacheMap<String, Iterator<String>> blobListers;
    +    private final UptimeComputer uptime;
    +    private final ITopologyValidator validator;
    +    private final StormTimer timer;
    +    private final IScheduler scheduler;
    +    private final ILeaderElector leaderElector;
    +    private final AtomicReference<Map<String, String>> idToSchedStatus;
    +    private final AtomicReference<Map<String, Double[]>> nodeIdToResources;
    +    private final AtomicReference<Map<String, TopologyResources>> 
idToResources;
    +    private final AtomicReference<Map<String, Map<WorkerSlot, 
WorkerResources>>> idToWorkerResources;
    +    private final Collection<ICredentialsRenewer> credRenewers;
    +    private final Object topologyHistoryLock;
    +    private final LocalState topologyHistoryState;
    +    private final Collection<INimbusCredentialPlugin> 
nimbusAutocredPlugins;
    +    private final ITopologyActionNotifierPlugin 
nimbusTopologyActionNotifier;
    +    private final List<ClusterMetricsConsumerExecutor> 
clusterConsumerExceutors;
    +    private final IGroupMappingServiceProvider groupMapper;
    +    private final IPrincipalToLocal principalToLocal;
    +    
    +    private static IStormClusterState makeStormClusterState(Map<String, 
Object> conf) throws Exception {
    +        List<ACL> acls = null;
    +        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
    +            acls = ZK_ACLS;
    +        }
    +        return ClusterUtils.mkStormClusterState(conf, acls, new 
ClusterStateContext(DaemonType.NIMBUS));
    +    }
    +    
    +    public Nimbus(Map<String, Object> conf, INimbus inimbus) throws 
Exception {
    +        this(conf, inimbus, null, null, null, null, null);
    +    }
    +    
    +    public Nimbus(Map<String, Object> conf, INimbus inimbus, 
IStormClusterState stormClusterState, NimbusInfo hostPortInfo,
    +            BlobStore blobStore, ILeaderElector leaderElector, 
IGroupMappingServiceProvider groupMapper) throws Exception {
    +        this.conf = conf;
    +        if (hostPortInfo == null) {
    +            hostPortInfo = NimbusInfo.fromConf(conf);
    +        }
    +        this.nimbusHostPortInfo = hostPortInfo;
    +        if (inimbus != null) {
    +            inimbus.prepare(conf, ConfigUtils.masterInimbusDir(conf));
    +        }
    +        
    +        this.inimbus = inimbus;
    +        this.authorizationHandler = 
StormCommon.mkAuthorizationHandler((String) conf.get(Config.NIMBUS_AUTHORIZER), 
conf);
    +        this.impersonationAuthorizationHandler = 
StormCommon.mkAuthorizationHandler((String) 
conf.get(Config.NIMBUS_IMPERSONATION_AUTHORIZER), conf);
    +        this.submittedCount = new AtomicLong(0);
    +        if (stormClusterState == null) {
    +            stormClusterState =  makeStormClusterState(conf);
    +        }
    +        this.stormClusterState = stormClusterState;
    +        this.submitLock = new Object();
    +        this.credUpdateLock = new Object();
    +        this.heartbeatsCache = new AtomicReference<>(new HashMap<>());
    +        this.downloaders = fileCacheMap(conf);
    +        this.uploaders = fileCacheMap(conf);
    +        if (blobStore == null) {
    +            blobStore = Utils.getNimbusBlobStore(conf, 
this.nimbusHostPortInfo);
    +        }
    +        this.blobStore = blobStore;
    +        this.blobDownloaders = makeBlobCacheMap(conf);
    +        this.blobUploaders = makeBlobCacheMap(conf);
    +        this.blobListers = makeBlobListCachMap(conf);
    +        this.uptime = Utils.makeUptimeComputer();
    +        this.validator = Utils.newInstance((String) 
conf.getOrDefault(Config.NIMBUS_TOPOLOGY_VALIDATOR, 
DefaultTopologyValidator.class.getName()));
    +        this.timer = new StormTimer(null, (t, e) -> {
    +            LOG.error("Error while processing event", e);
    +            Utils.exitProcess(20, "Error while processing event");
    +        });
    +        this.scheduler = makeScheduler(conf, inimbus);
    +        if (leaderElector == null) {
    +            leaderElector = Zookeeper.zkLeaderElector(conf, blobStore);
    +        }
    +        this.leaderElector = leaderElector;
    +        this.idToSchedStatus = new AtomicReference<>(new HashMap<>());
    +        this.nodeIdToResources = new AtomicReference<>(new HashMap<>());
    +        this.idToResources = new AtomicReference<>(new HashMap<>());
    +        this.idToWorkerResources = new AtomicReference<>(new HashMap<>());
    +        this.credRenewers = AuthUtils.GetCredentialRenewers(conf);
    +        this.topologyHistoryLock = new Object();
    +        this.topologyHistoryState = 
ConfigUtils.nimbusTopoHistoryState(conf);
    +        this.nimbusAutocredPlugins = 
AuthUtils.getNimbusAutoCredPlugins(conf);
    +        this.nimbusTopologyActionNotifier = 
createTopologyActionNotifier(conf);
    +        this.clusterConsumerExceutors = 
makeClusterMetricsConsumerExecutors(conf);
    +        if (groupMapper == null) {
    +            groupMapper = 
AuthUtils.GetGroupMappingServiceProviderPlugin(conf);
    +        }
    +        this.groupMapper = groupMapper;
    +        this.principalToLocal = AuthUtils.GetPrincipalToLocalPlugin(conf);
    +    }
    +
    +    Map<String, Object> getConf() {
    +        return conf;
    +    }
    +    
    +    @VisibleForTesting
    +    public void setAuthorizationHandler(IAuthorizer authorizationHandler) {
    +        this.authorizationHandler = authorizationHandler;
    +    }
    +
    +    private IStormClusterState getStormClusterState() {
    +        return stormClusterState;
    +    }
    +    
    +    @VisibleForTesting
    +    public 
AtomicReference<Map<String,Map<List<Integer>,Map<String,Object>>>> 
getHeartbeatsCache() {
    +        return heartbeatsCache;
    +    }
    +
    +    private BlobStore getBlobStore() {
    +        return blobStore;
    +    }
    +    
    +    private boolean isLeader() throws Exception {
    +        return leaderElector.isLeader();
    +    }
    +    
    +    private void assertIsLeader() throws Exception {
    +        if (!isLeader()) {
    +            NimbusInfo leaderAddress = leaderElector.getLeader();
    +            throw new RuntimeException("not a leader, current leader is " 
+ leaderAddress);
    +        }
    +    }
    +    
    +    private String getInbox() throws IOException {
    +        return ConfigUtils.masterInbox(conf);
    +    }
    +    
    +    void delayEvent(String topoId, int delaySecs, TopologyActions event, 
Object args) {
    +        LOG.info("Delaying event {} for {} secs for {}", event, delaySecs, 
topoId);
    +        timer.schedule(delaySecs, () -> {
    +            try {
    +                transition(topoId, event, args, false);
    +            } catch (Exception e) {
    +                throw new RuntimeException(e);
    +            }
    +        });
    +    }
    +
    +    void doRebalance(String topoId, StormBase stormBase) throws Exception {
    +        RebalanceOptions rbo = 
stormBase.get_topology_action_options().get_rebalance_options();
    +        StormBase updated = new StormBase();
    +        updated.set_topology_action_options(null);
    +        updated.set_component_debug(Collections.emptyMap());
    +        
    +        if (rbo.is_set_num_executors()) {
    +            updated.set_component_executors(rbo.get_num_executors());
    +        }
    +        
    +        if (rbo.is_set_num_workers()) {
    +            updated.set_num_workers(rbo.get_num_workers());
    +        }
    +        stormClusterState.updateStorm(topoId, updated);
    +        mkAssignments(topoId);
    +    }
    +    
    +    private String toTopoId(String topoName) throws NotAliveException {
    +        return stormClusterState.getTopoId(topoName)
    +                .orElseThrow(() -> new NotAliveException(topoName + " is 
not alive"));
    +    }
    +    
    +    private void transitionName(String topoName, TopologyActions event, 
Object eventArg, boolean errorOnNoTransition) throws Exception {
    +        transition(toTopoId(topoName), event, eventArg, 
errorOnNoTransition);
    +    }
    +
    +    private void transition(String topoId, TopologyActions event, Object 
eventArg) throws Exception {
    +        transition(topoId, event, eventArg, false);
    +    }
    +    
    +    private void transition(String topoId, TopologyActions event, Object 
eventArg, boolean errorOnNoTransition) throws Exception {
    +        LOG.info("TRANSITION: {} {} {} {}", topoId, event, eventArg, 
errorOnNoTransition);
    +        assertIsLeader();
    +        synchronized(submitLock) {
    +            IStormClusterState clusterState = stormClusterState;
    +            StormBase base = clusterState.stormBase(topoId, null);
    +            TopologyStatus status = base.get_status();
    +            if (status == null) {
    +                LOG.info("Cannot apply event {} to {} because topology no 
longer exists", event, topoId);
    +            } else {
    +                TopologyStateTransition transition = 
TOPO_STATE_TRANSITIONS.get(status).get(event);
    +                if (transition == null) {
    +                    String message = "No transition for event: " + event + 
", status: " + status + " storm-id: " + topoId;
    +                    if (errorOnNoTransition) {
    +                        throw new RuntimeException(message);
    +                    }
    +                    
    +                    if (TopologyActions.STARTUP != event) {
    +                        //STARTUP is a system event so don't log an issue
    +                        LOG.info(message);
    +                    }
    +                    transition = NOOP_TRANSITION;
    +                }
    +                StormBase updates = transition.transition(eventArg, this, 
topoId, base);
    +                if (updates != null) {
    +                    clusterState.updateStorm(topoId, updates);
    +                }
    +            }
    +        }
    +    }
    +    
    +    private void setupStormCode(Map<String, Object> conf, String topoId, 
String tmpJarLocation, 
    +            Map<String, Object> topoConf, StormTopology topology) throws 
Exception {
    +        Subject subject = getSubject();
    +        IStormClusterState clusterState = stormClusterState;
    +        BlobStore store = blobStore;
    +        String jarKey = ConfigUtils.masterStormJarKey(topoId);
    +        String codeKey = ConfigUtils.masterStormCodeKey(topoId);
    +        String confKey = ConfigUtils.masterStormConfKey(topoId);
    +        NimbusInfo hostPortInfo = nimbusHostPortInfo;
    +        if (tmpJarLocation != null) {
    +            //in local mode there is no jar
    +            try (FileInputStream fin = new 
FileInputStream(tmpJarLocation)) {
    +                store.createBlob(jarKey, fin, new 
SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject);
    +            }
    +            if (store instanceof LocalFsBlobStore) {
    +                clusterState.setupBlobstore(jarKey, hostPortInfo, 
getVersionForKey(jarKey, hostPortInfo, conf));
    +            }
    +        }
    +        
    +        store.createBlob(confKey, Utils.toCompressedJsonConf(topoConf), 
new SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject);
    +        if (store instanceof LocalFsBlobStore) {
    +            clusterState.setupBlobstore(confKey, hostPortInfo, 
getVersionForKey(confKey, hostPortInfo, conf));
    +        }
    +        
    +        store.createBlob(codeKey, Utils.serialize(topology), new 
SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject);
    +        if (store instanceof LocalFsBlobStore) {
    +            clusterState.setupBlobstore(codeKey, hostPortInfo, 
getVersionForKey(codeKey, hostPortInfo, conf));
    +        }
    +    }
    +    
    +    private Integer getBlobReplicationCount(String key) throws Exception {
    +        BlobStore store = blobStore;
    +        if (store != null) {
    +            return store.getBlobReplication(key, NIMBUS_SUBJECT);
    +        }
    +        return null;
    +    }
    +    
    +    private void waitForDesiredCodeReplication(Map<String, Object> 
topoConf, String topoId) throws Exception {
    +        int minReplicationCount = 
Utils.getInt(topoConf.get(Config.TOPOLOGY_MIN_REPLICATION_COUNT));
    +        int maxWaitTime = 
Utils.getInt(topoConf.get(Config.TOPOLOGY_MAX_REPLICATION_WAIT_TIME_SEC));
    +        int jarCount = minReplicationCount;
    +        if (!ConfigUtils.isLocalMode(topoConf)) {
    +            jarCount = 
getBlobReplicationCount(ConfigUtils.masterStormJarKey(topoId));
    +        }
    +        int codeCount = 
getBlobReplicationCount(ConfigUtils.masterStormCodeKey(topoId));
    +        int confCount = 
getBlobReplicationCount(ConfigUtils.masterStormConfKey(topoId));
    +        long totalWaitTime = 0;
    +        //When is this ever null?
    +        if (blobStore != null) {
    +            while (jarCount < minReplicationCount &&
    +                    codeCount < minReplicationCount &&
    +                    confCount < minReplicationCount) {
    +                if (maxWaitTime > 0 && totalWaitTime > maxWaitTime) {
    +                    LOG.info("desired replication count of {} not achieved 
but we have hit the max wait time {}"
    +                            + " so moving on with replication count for 
conf key = {} for code key = {} for jar key = ",
    +                            minReplicationCount, maxWaitTime, confCount, 
codeCount, jarCount);
    +                    return;
    +                }
    +                LOG.info("WAITING... {} <? {} {} {}", minReplicationCount, 
jarCount, codeCount, confCount);
    +                LOG.info("WAITING... {} <? {}", totalWaitTime, 
maxWaitTime);
    +                Time.sleepSecs(1);
    +                totalWaitTime++;
    +                if (!ConfigUtils.isLocalMode(topoConf)) {
    +                    jarCount = 
getBlobReplicationCount(ConfigUtils.masterStormJarKey(topoId));
    +                }
    +                codeCount = 
getBlobReplicationCount(ConfigUtils.masterStormCodeKey(topoId));
    +                confCount = 
getBlobReplicationCount(ConfigUtils.masterStormConfKey(topoId));
    +            }
    +        }
    +        LOG.info("desired replication count {} achieved, 
current-replication-count for conf key = {},"
    +                + " current-replication-count for code key = {}, 
current-replication-count for jar key = {}", 
    +                minReplicationCount, confCount, codeCount, jarCount);
    +    }
    +    
    +    private TopologyDetails readTopologyDetails(String topoId) throws 
NotAliveException, KeyNotFoundException, AuthorizationException, IOException, 
InvalidTopologyException {
    +        StormBase base = stormClusterState.stormBase(topoId, null);
    +        if (base == null) {
    +            if (topoId == null) {
    +                throw new NullPointerException();
    +            }
    +            throw new NotAliveException(topoId);
    +        }
    +        BlobStore store = blobStore;
    +        Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, store);
    +        StormTopology topo = readStormTopologyAsNimbus(topoId, store);
    +        Map<List<Integer>, String> rawExecToComponent = 
computeExecutorToComponent(topoId);
    +        Map<ExecutorDetails, String> executorsToComponent = new 
HashMap<>();
    +        for (Entry<List<Integer>, String> entry: 
rawExecToComponent.entrySet()) {
    +            List<Integer> execs = entry.getKey();
    +            ExecutorDetails execDetails = new 
ExecutorDetails(execs.get(0), execs.get(1));
    +            executorsToComponent.put(execDetails, entry.getValue());
    +        }
    +        
    +        return new TopologyDetails(topoId, topoConf, topo, 
base.get_num_workers(), executorsToComponent, base.get_launch_time_secs());
    +    }
    +    
    +    private void updateHeartbeats(String topoId, Set<List<Integer>> 
allExecutors, Assignment existingAssignment) {
    +        LOG.debug("Updating heartbeats for {} {}", topoId, allExecutors);
    +        IStormClusterState state = stormClusterState;
    +        Map<List<Integer>, Map<String, Object>> executorBeats = 
StatsUtil.convertExecutorBeats(state.executorBeats(topoId, ex
    --- End diff --
    
    Nit: Should maybe be "cancel blob upload exception"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to