http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 6581e04..6d40db9 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -24,7 +24,6 @@ import com.codahale.metrics.Meter;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
-
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -57,7 +56,6 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import javax.security.auth.Subject;
-
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
@@ -144,11 +142,11 @@ import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.metric.api.DataPoint;
 import org.apache.storm.metric.api.IClusterMetricsConsumer;
 import org.apache.storm.metric.api.IClusterMetricsConsumer.ClusterInfo;
-import org.apache.storm.nimbus.AssignmentDistributionService;
 import org.apache.storm.metricstore.AggLevel;
 import org.apache.storm.metricstore.Metric;
 import org.apache.storm.metricstore.MetricStore;
 import org.apache.storm.metricstore.MetricStoreConfig;
+import org.apache.storm.nimbus.AssignmentDistributionService;
 import org.apache.storm.nimbus.DefaultTopologyValidator;
 import org.apache.storm.nimbus.ILeaderElector;
 import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
@@ -211,8 +209,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Nimbus implements Iface, Shutdownable, DaemonCommon {
+    @VisibleForTesting
+    public static final List<ACL> ZK_ACLS = 
Arrays.asList(ZooDefs.Ids.CREATOR_ALL_ACL.get(0),
+                                                          new 
ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+    public static final SimpleVersion MIN_VERSION_SUPPORT_RPC_HEARTBEAT = new 
SimpleVersion("2.0.0");
     private static final Logger LOG = LoggerFactory.getLogger(Nimbus.class);
-
     //    Metrics
     private static final Meter submitTopologyWithOptsCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-submitTopologyWithOpts-calls");
     private static final Meter submitTopologyCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-submitTopology-calls");
@@ -224,7 +225,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     private static final Meter debugCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-debug-calls");
     private static final Meter setWorkerProfilerCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-setWorkerProfiler-calls");
     private static final Meter getComponentPendingProfileActionsCalls = 
StormMetricsRegistry.registerMeter(
-            "nimbus:num-getComponentPendingProfileActions-calls");
+        "nimbus:num-getComponentPendingProfileActions-calls");
     private static final Meter setLogConfigCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-setLogConfig-calls");
     private static final Meter uploadNewCredentialsCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-uploadNewCredentials-calls");
     private static final Meter beginFileUploadCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-beginFileUpload-calls");
@@ -241,59 +242,47 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     private static final Meter getLeaderCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-getLeader-calls");
     private static final Meter isTopologyNameAllowedCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-isTopologyNameAllowed-calls");
     private static final Meter getTopologyInfoWithOptsCalls = 
StormMetricsRegistry.registerMeter(
-            "nimbus:num-getTopologyInfoWithOpts-calls");
+        "nimbus:num-getTopologyInfoWithOpts-calls");
     private static final Meter getTopologyInfoCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-getTopologyInfo-calls");
     private static final Meter getTopologyPageInfoCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-getTopologyPageInfo-calls");
     private static final Meter getSupervisorPageInfoCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-getSupervisorPageInfo-calls");
     private static final Meter getComponentPageInfoCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-getComponentPageInfo-calls");
     private static final Histogram scheduleTopologyTimeMs = 
StormMetricsRegistry.registerHistogram("nimbus:time-scheduleTopology-ms",
-            new ExponentiallyDecayingReservoir());
+                                                                               
                    new ExponentiallyDecayingReservoir());
     private static final Meter getOwnerResourceSummariesCalls = 
StormMetricsRegistry.registerMeter(
-            "nimbus:num-getOwnerResourceSummaries-calls");
+        "nimbus:num-getOwnerResourceSummaries-calls");
+    // END Metrics
     private static final Meter shutdownCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-shutdown-calls");
     private static final Meter processWorkerMetricsCalls = 
StormMetricsRegistry.registerMeter("nimbus:process-worker-metric-calls");
-    // END Metrics
-    
     private static final String STORM_VERSION = VersionInfo.getVersion();
-
-    @VisibleForTesting
-    public static final List<ACL> ZK_ACLS = 
Arrays.asList(ZooDefs.Ids.CREATOR_ALL_ACL.get(0),
-            new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE, 
ZooDefs.Ids.ANYONE_ID_UNSAFE));
-
-    public static final SimpleVersion MIN_VERSION_SUPPORT_RPC_HEARTBEAT = new 
SimpleVersion("2.0.0");
-
-    private static List<ACL> getNimbusAcls(Map<String, Object> conf) {
-        List<ACL> acls = null;
-        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
-            acls = ZK_ACLS;
-        }
-        return acls;
-    }
-
     private static final Subject NIMBUS_SUBJECT = new Subject();
-
-    static {
-        NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal());
-        NIMBUS_SUBJECT.setReadOnly();
-    }
-
-    // TOPOLOGY STATE TRANSITIONS
-    private static StormBase make(TopologyStatus status) {
-        StormBase ret = new StormBase();
-        ret.set_status(status);
-        //The following are required for backwards compatibility with clojure 
code
-        ret.set_component_executors(Collections.emptyMap());
-        ret.set_component_debug(Collections.emptyMap());
-        return ret;
-    }
-    
     private static final TopologyStateTransition NOOP_TRANSITION = (arg, 
nimbus, topoId, base) -> null;
     private static final TopologyStateTransition INACTIVE_TRANSITION = (arg, 
nimbus, topoId, base) -> Nimbus.make(TopologyStatus.INACTIVE);
     private static final TopologyStateTransition ACTIVE_TRANSITION = (arg, 
nimbus, topoId, base) -> Nimbus.make(TopologyStatus.ACTIVE);
+    private static final TopologyStateTransition REMOVE_TRANSITION = (args, 
nimbus, topoId, base) -> {
+        LOG.info("Killing topology: {}", topoId);
+        IStormClusterState state = nimbus.getStormClusterState();
+        Assignment oldAssignment = state.assignmentInfo(topoId, null);
+        state.removeStorm(topoId);
+        notifySupervisorsAsKilled(state, oldAssignment, 
nimbus.getAssignmentsDistributer());
+        BlobStore store = nimbus.getBlobStore();
+        if (store instanceof LocalFsBlobStore) {
+            for (String key : Nimbus.getKeyListFromId(nimbus.getConf(), 
topoId)) {
+                state.removeBlobstoreKey(key);
+                state.removeKeyVersion(key);
+            }
+        }
+        nimbus.getHeartbeatsCache().getAndUpdate(new Dissoc<>(topoId));
+        return null;
+    };
+    private static final TopologyStateTransition DO_REBALANCE_TRANSITION = 
(args, nimbus, topoId, base) -> {
+        nimbus.doRebalance(topoId, base);
+        return Nimbus.make(base.get_prev_status());
+    };
     private static final TopologyStateTransition KILL_TRANSITION = (killTime, 
nimbus, topoId, base) -> {
         int delay = 0;
         if (killTime != null) {
-            delay = ((Number)killTime).intValue();
+            delay = ((Number) killTime).intValue();
         } else {
             delay = ObjectReader.getInt(Nimbus.readTopoConf(topoId, 
nimbus.getTopoCache()).get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
         }
@@ -309,7 +298,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         sb.set_component_debug(Collections.emptyMap());
         return sb;
     };
-    
+
     private static final TopologyStateTransition REBALANCE_TRANSITION = (args, 
nimbus, topoId, base) -> {
         RebalanceOptions rbo = ((RebalanceOptions) args).deepCopy();
         int delay = 0;
@@ -319,12 +308,12 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             delay = ObjectReader.getInt(Nimbus.readTopoConf(topoId, 
nimbus.getTopoCache()).get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
         }
         nimbus.delayEvent(topoId, delay, TopologyActions.DO_REBALANCE, null);
-        
+
         rbo.set_wait_secs(delay);
         if (!rbo.is_set_num_executors()) {
             rbo.set_num_executors(Collections.emptyMap());
         }
-        
+
         StormBase sb = new StormBase();
         sb.set_status(TopologyStatus.REBALANCING);
         sb.set_prev_status(base.get_status());
@@ -333,175 +322,228 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         sb.set_topology_action_options(tao);
         sb.set_component_executors(Collections.emptyMap());
         sb.set_component_debug(Collections.emptyMap());
-        
+
         return sb;
     };
-    
     private static final TopologyStateTransition 
STARTUP_WHEN_KILLED_TRANSITION = (args, nimbus, topoId, base) -> {
         int delay = 
base.get_topology_action_options().get_kill_options().get_wait_secs();
         nimbus.delayEvent(topoId, delay, TopologyActions.REMOVE, null);
         return null;
     };
-    
-    private static final TopologyStateTransition REMOVE_TRANSITION = (args, 
nimbus, topoId, base) -> {
-        LOG.info("Killing topology: {}", topoId);
-        IStormClusterState state = nimbus.getStormClusterState();
-        Assignment oldAssignment = state.assignmentInfo(topoId, null);
-        state.removeStorm(topoId);
-        notifySupervisorsAsKilled(state, oldAssignment, 
nimbus.getAssignmentsDistributer());
-        BlobStore store = nimbus.getBlobStore();
-        if (store instanceof LocalFsBlobStore) {
-            for (String key: Nimbus.getKeyListFromId(nimbus.getConf(), 
topoId)) {
-                state.removeBlobstoreKey(key);
-                state.removeKeyVersion(key);
-            }
-        }
-        nimbus.getHeartbeatsCache().getAndUpdate(new Dissoc<>(topoId));
-        return null;
-    };
-    
     private static final TopologyStateTransition 
STARTUP_WHEN_REBALANCING_TRANSITION = (args, nimbus, topoId, base) -> {
         int delay = 
base.get_topology_action_options().get_rebalance_options().get_wait_secs();
         nimbus.delayEvent(topoId, delay, TopologyActions.DO_REBALANCE, null);
         return null;
     };
-    
-    private static final TopologyStateTransition DO_REBALANCE_TRANSITION = 
(args, nimbus, topoId, base) -> {
-        nimbus.doRebalance(topoId, base);
-        return Nimbus.make(base.get_prev_status());
-    };
-    
     private static final Map<TopologyStatus, Map<TopologyActions, 
TopologyStateTransition>> TOPO_STATE_TRANSITIONS =
-            new ImmutableMap.Builder<TopologyStatus, Map<TopologyActions, 
TopologyStateTransition>>()
+        new ImmutableMap.Builder<TopologyStatus, Map<TopologyActions, 
TopologyStateTransition>>()
             .put(TopologyStatus.ACTIVE, new 
ImmutableMap.Builder<TopologyActions, TopologyStateTransition>()
-                    .put(TopologyActions.INACTIVATE, INACTIVE_TRANSITION)
-                    .put(TopologyActions.ACTIVATE, NOOP_TRANSITION)
-                    .put(TopologyActions.REBALANCE, REBALANCE_TRANSITION)
-                    .put(TopologyActions.KILL, KILL_TRANSITION)
-                    .build())
+                .put(TopologyActions.INACTIVATE, INACTIVE_TRANSITION)
+                .put(TopologyActions.ACTIVATE, NOOP_TRANSITION)
+                .put(TopologyActions.REBALANCE, REBALANCE_TRANSITION)
+                .put(TopologyActions.KILL, KILL_TRANSITION)
+                .build())
             .put(TopologyStatus.INACTIVE, new 
ImmutableMap.Builder<TopologyActions, TopologyStateTransition>()
-                    .put(TopologyActions.ACTIVATE, ACTIVE_TRANSITION)
-                    .put(TopologyActions.INACTIVATE, NOOP_TRANSITION)
-                    .put(TopologyActions.REBALANCE, REBALANCE_TRANSITION)
-                    .put(TopologyActions.KILL, KILL_TRANSITION)
-                    .build())
+                .put(TopologyActions.ACTIVATE, ACTIVE_TRANSITION)
+                .put(TopologyActions.INACTIVATE, NOOP_TRANSITION)
+                .put(TopologyActions.REBALANCE, REBALANCE_TRANSITION)
+                .put(TopologyActions.KILL, KILL_TRANSITION)
+                .build())
             .put(TopologyStatus.KILLED, new 
ImmutableMap.Builder<TopologyActions, TopologyStateTransition>()
-                    .put(TopologyActions.STARTUP, 
STARTUP_WHEN_KILLED_TRANSITION)
-                    .put(TopologyActions.KILL, KILL_TRANSITION)
-                    .put(TopologyActions.REMOVE, REMOVE_TRANSITION)
-                    .build())
+                .put(TopologyActions.STARTUP, STARTUP_WHEN_KILLED_TRANSITION)
+                .put(TopologyActions.KILL, KILL_TRANSITION)
+                .put(TopologyActions.REMOVE, REMOVE_TRANSITION)
+                .build())
             .put(TopologyStatus.REBALANCING, new 
ImmutableMap.Builder<TopologyActions, TopologyStateTransition>()
-                    .put(TopologyActions.STARTUP, 
STARTUP_WHEN_REBALANCING_TRANSITION)
-                    .put(TopologyActions.KILL, KILL_TRANSITION)
-                    .put(TopologyActions.DO_REBALANCE, DO_REBALANCE_TRANSITION)
-                    .build())
+                .put(TopologyActions.STARTUP, 
STARTUP_WHEN_REBALANCING_TRANSITION)
+                .put(TopologyActions.KILL, KILL_TRANSITION)
+                .put(TopologyActions.DO_REBALANCE, DO_REBALANCE_TRANSITION)
+                .build())
             .build();
-    
+    private static final List<String> EMPTY_STRING_LIST = 
Collections.unmodifiableList(Collections.emptyList());
+    private static final Set<String> EMPTY_STRING_SET = 
Collections.unmodifiableSet(Collections.emptySet());
+    private static final Pattern TOPOLOGY_NAME_REGEX = 
Pattern.compile("^[^/.:\\\\]+$");
+
     // END TOPOLOGY STATE TRANSITIONS
-    
-    private static final class Assoc<K,V> implements UnaryOperator<Map<K, V>> {
-        private final K key;
-        private final V value;
-        
-        public Assoc(K key, V value) {
-            this.key = key;
-            this.value = value;
-        }
-        
-        @Override
-        public Map<K, V> apply(Map<K, V> t) {
-            Map<K, V> ret = new HashMap<>(t);
-            ret.put(key, value);
-            return ret;
-        }
+
+    static {
+        NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal());
+        NIMBUS_SUBJECT.setReadOnly();
     }
-    
-    private static final class Dissoc<K,V> implements UnaryOperator<Map<K, V>> 
{
-        private final K key;
-        
-        public Dissoc(K key) {
-            this.key = key;
-        }
-        
-        @Override
-        public Map<K, V> apply(Map<K, V> t) {
-            Map<K, V> ret = new HashMap<>(t);
-            ret.remove(key);
-            return ret;
-        }
+
+    private final Map<String, Object> conf;
+    private final NavigableMap<SimpleVersion, List<String>> 
supervisorClasspaths;
+    private final NimbusInfo nimbusHostPortInfo;
+    private final INimbus inimbus;
+    private final IAuthorizer impersonationAuthorizationHandler;
+    private final AtomicLong submittedCount;
+    private final IStormClusterState stormClusterState;
+    private final Object submitLock = new Object();
+    private final Object schedLock = new Object();
+    private final Object credUpdateLock = new Object();
+    private final AtomicReference<Map<String, Map<List<Integer>, Map<String, 
Object>>>> heartbeatsCache;
+    private final AtomicBoolean heartbeatsReadyFlag;
+    private final IWorkerHeartbeatsRecoveryStrategy heartbeatsRecoveryStrategy;
+    @SuppressWarnings("deprecation")
+    private final TimeCacheMap<String, BufferInputStream> downloaders;
+    @SuppressWarnings("deprecation")
+    private final TimeCacheMap<String, WritableByteChannel> uploaders;
+    private final BlobStore blobStore;
+    private final TopoCache topoCache;
+    @SuppressWarnings("deprecation")
+    private final TimeCacheMap<String, BufferInputStream> blobDownloaders;
+    @SuppressWarnings("deprecation")
+    private final TimeCacheMap<String, OutputStream> blobUploaders;
+    @SuppressWarnings("deprecation")
+    private final TimeCacheMap<String, Iterator<String>> blobListers;
+    private final UptimeComputer uptime;
+    private final ITopologyValidator validator;
+    private final StormTimer timer;
+    private final IScheduler scheduler;
+    private final IScheduler underlyingScheduler;
+    private final ILeaderElector leaderElector;
+    private final AssignmentDistributionService assignmentsDistributer;
+    private final AtomicReference<Map<String, String>> idToSchedStatus;
+    private final AtomicReference<Map<String, SupervisorResources>> 
nodeIdToResources;
+    private final AtomicReference<Map<String, TopologyResources>> 
idToResources;
+    private final AtomicReference<Map<String, Map<WorkerSlot, 
WorkerResources>>> idToWorkerResources;
+    private final Collection<ICredentialsRenewer> credRenewers;
+    private final Object topologyHistoryLock;
+    private final LocalState topologyHistoryState;
+    private final Collection<INimbusCredentialPlugin> nimbusAutocredPlugins;
+    private final ITopologyActionNotifierPlugin nimbusTopologyActionNotifier;
+    private final List<ClusterMetricsConsumerExecutor> 
clusterConsumerExceutors;
+    private final IGroupMappingServiceProvider groupMapper;
+    private final IPrincipalToLocal principalToLocal;
+    private MetricStore metricsStore;
+    private IAuthorizer authorizationHandler;
+    //Cached CuratorFramework, mainly used for BlobStore.
+    private CuratorFramework zkClient;
+    //May be null if worker tokens are not supported by the thrift transport.
+    private WorkerTokenManager workerTokenManager;
+
+    public Nimbus(Map<String, Object> conf, INimbus inimbus) throws Exception {
+        this(conf, inimbus, null, null, null, null, null);
     }
-    
-    @VisibleForTesting
-    public static class StandaloneINimbus implements INimbus {
 
-        @Override
-        public void prepare(Map<String, Object> topoConf, String 
schedulerLocalDir) {
-            //NOOP
-        }
+    public Nimbus(Map<String, Object> conf, INimbus inimbus, 
IStormClusterState stormClusterState, NimbusInfo hostPortInfo,
+                  BlobStore blobStore, ILeaderElector leaderElector, 
IGroupMappingServiceProvider groupMapper) throws Exception {
+        this(conf, inimbus, stormClusterState, hostPortInfo, blobStore, null, 
leaderElector, groupMapper);
+    }
 
-        @SuppressWarnings("unchecked")
-        @Override
-        public Collection<WorkerSlot> 
allSlotsAvailableForScheduling(Collection<SupervisorDetails> supervisors,
-                Topologies topologies, Set<String> 
topologiesMissingAssignments) {
-            Set<WorkerSlot> ret = new HashSet<>();
-            for (SupervisorDetails sd: supervisors) {
-                String id = sd.getId();
-                for (Number port: (Collection<Number>)sd.getMeta()) {
-                    ret.add(new WorkerSlot(id, port));
-                }
-            }
-            return ret;
+    public Nimbus(Map<String, Object> conf, INimbus inimbus, 
IStormClusterState stormClusterState, NimbusInfo hostPortInfo,
+                  BlobStore blobStore, TopoCache topoCache, ILeaderElector 
leaderElector, IGroupMappingServiceProvider groupMapper)
+        throws Exception {
+        this.conf = conf;
+
+        this.metricsStore = null;
+        try {
+            this.metricsStore = MetricStoreConfig.configure(conf);
+        } catch (Exception e) {
+            // the metrics store is not critical to the operation of the 
cluster, allow Nimbus to come up
+            LOG.error("Failed to initialize metric store", e);
         }
 
-        @Override
-        public void assignSlots(Topologies topologies, Map<String, 
Collection<WorkerSlot>> newSlotsByTopologyId) {
-            //NOOP
+        if (hostPortInfo == null) {
+            hostPortInfo = NimbusInfo.fromConf(conf);
+        }
+        this.nimbusHostPortInfo = hostPortInfo;
+        if (inimbus != null) {
+            inimbus.prepare(conf, ServerConfigUtils.masterInimbusDir(conf));
         }
 
-        @Override
-        public String getHostName(Map<String, SupervisorDetails> supervisors, 
String nodeId) {
-            SupervisorDetails sd = supervisors.get(nodeId);
-            if (sd != null) {
-                return sd.getHost();
-            }
-            return null;
+        this.inimbus = inimbus;
+        this.authorizationHandler = 
StormCommon.mkAuthorizationHandler((String) 
conf.get(DaemonConfig.NIMBUS_AUTHORIZER), conf);
+        this.impersonationAuthorizationHandler =
+            StormCommon.mkAuthorizationHandler((String) 
conf.get(DaemonConfig.NIMBUS_IMPERSONATION_AUTHORIZER), conf);
+        this.submittedCount = new AtomicLong(0);
+        if (stormClusterState == null) {
+            stormClusterState = makeStormClusterState(conf);
+        }
+        this.stormClusterState = stormClusterState;
+        this.heartbeatsCache = new AtomicReference<>(new HashMap<>());
+        this.heartbeatsReadyFlag = new AtomicBoolean(false);
+        this.heartbeatsRecoveryStrategy = 
WorkerHeartbeatsRecoveryStrategyFactory.getStrategy(conf);
+        this.downloaders = fileCacheMap(conf);
+        this.uploaders = fileCacheMap(conf);
+        if (blobStore == null) {
+            blobStore = ServerUtils.getNimbusBlobStore(conf, 
this.nimbusHostPortInfo);
+        }
+        this.blobStore = blobStore;
+        if (topoCache == null) {
+            topoCache = new TopoCache(blobStore, conf);
+        }
+        this.topoCache = topoCache;
+        this.blobDownloaders = makeBlobCacheMap(conf);
+        this.blobUploaders = makeBlobCacheMap(conf);
+        this.blobListers = makeBlobListCacheMap(conf);
+        this.uptime = Utils.makeUptimeComputer();
+        this.validator = ReflectionUtils
+            .newInstance((String) 
conf.getOrDefault(DaemonConfig.NIMBUS_TOPOLOGY_VALIDATOR, 
DefaultTopologyValidator.class.getName()));
+        this.timer = new StormTimer(null, (t, e) -> {
+            LOG.error("Error while processing event", e);
+            Utils.exitProcess(20, "Error while processing event");
+        });
+        this.underlyingScheduler = makeScheduler(conf, inimbus);
+        this.scheduler = wrapAsBlacklistScheduler(conf, underlyingScheduler);
+        this.zkClient = makeZKClient(conf);
+        if (leaderElector == null) {
+            leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, 
blobStore, topoCache, stormClusterState, getNimbusAcls(conf));
         }
+        this.leaderElector = leaderElector;
+        this.assignmentsDistributer = 
AssignmentDistributionService.getInstance(conf);
+        this.idToSchedStatus = new AtomicReference<>(new HashMap<>());
+        this.nodeIdToResources = new AtomicReference<>(new HashMap<>());
+        this.idToResources = new AtomicReference<>(new HashMap<>());
+        this.idToWorkerResources = new AtomicReference<>(new HashMap<>());
+        this.credRenewers = AuthUtils.GetCredentialRenewers(conf);
+        this.topologyHistoryLock = new Object();
+        this.topologyHistoryState = 
ServerConfigUtils.nimbusTopoHistoryState(conf);
+        this.nimbusAutocredPlugins = AuthUtils.getNimbusAutoCredPlugins(conf);
+        this.nimbusTopologyActionNotifier = createTopologyActionNotifier(conf);
+        this.clusterConsumerExceutors = 
makeClusterMetricsConsumerExecutors(conf);
+        if (groupMapper == null) {
+            groupMapper = AuthUtils.GetGroupMappingServiceProviderPlugin(conf);
+        }
+        this.groupMapper = groupMapper;
+        this.principalToLocal = AuthUtils.GetPrincipalToLocalPlugin(conf);
+        this.supervisorClasspaths = Collections.unmodifiableNavigableMap(
+            Utils.getConfiguredClasspathVersions(conf, EMPTY_STRING_LIST));// 
We don't use the classpath part of this, so just an empty list
+    }
 
-        @Override
-        public IScheduler getForcedScheduler() {
-            return null;
+    private static List<ACL> getNimbusAcls(Map<String, Object> conf) {
+        List<ACL> acls = null;
+        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
+            acls = ZK_ACLS;
         }
-        
+        return acls;
     }
-    
-    private static class CommonTopoInfo {
-        public Map<String, Object> topoConf;
-        public String topoName;
-        public StormTopology topology;
-        public Map<Integer, String> taskToComponent;
-        public StormBase base;
-        public int launchTimeSecs;
-        public Assignment assignment;
-        public Map<List<Integer>, Map<String, Object>> beats;
-        public HashSet<String> allComponents;
 
+    // TOPOLOGY STATE TRANSITIONS
+    private static StormBase make(TopologyStatus status) {
+        StormBase ret = new StormBase();
+        ret.set_status(status);
+        //The following are required for backwards compatibility with clojure 
code
+        ret.set_component_executors(Collections.emptyMap());
+        ret.set_component_debug(Collections.emptyMap());
+        return ret;
     }
-    
+
     @SuppressWarnings("deprecation")
     private static <T extends AutoCloseable> TimeCacheMap<String, T> 
fileCacheMap(Map<String, Object> conf) {
         return new 
TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_FILE_COPY_EXPIRATION_SECS),
 600),
-            (id, stream) -> {
-                try {
-                    stream.close();
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            });
+                                  (id, stream) -> {
+                                      try {
+                                          stream.close();
+                                      } catch (Exception e) {
+                                          throw new RuntimeException(e);
+                                      }
+                                  });
     }
 
     private static <K, V> Map<K, V> mapDiff(Map<? extends K, ? extends V> 
first, Map<? extends K, ? extends V> second) {
         Map<K, V> ret = new HashMap<>();
-        for (Entry<? extends K, ? extends V> entry: second.entrySet()) {
+        for (Entry<? extends K, ? extends V> entry : second.entrySet()) {
             if (!entry.getValue().equals(first.get(entry.getKey()))) {
                 ret.put(entry.getKey(), entry.getValue());
             }
@@ -531,30 +573,31 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     }
 
     /**
-     * Constructs a TimeCacheMap instance with a blob store timeout whose
-     * expiration callback invokes cancel on the value held by an expired 
entry when
-     * that value is an AtomicOutputStream and calls close otherwise.
+     * Constructs a TimeCacheMap instance with a blob store timeout whose 
expiration callback invokes cancel on the value held by an expired
+     * entry when that value is an AtomicOutputStream and calls close 
otherwise.
+     *
      * @param conf the config to use
      * @return the newly created map
      */
     @SuppressWarnings("deprecation")
     private static <T extends AutoCloseable> TimeCacheMap<String, T> 
makeBlobCacheMap(Map<String, Object> conf) {
         return new 
TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_BLOBSTORE_EXPIRATION_SECS),
 600),
-            (id, stream) -> {
-                try {
-                    if (stream instanceof AtomicOutputStream) {
-                        ((AtomicOutputStream) stream).cancel();
-                    } else {
-                        stream.close();
-                    }
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            });
+                                  (id, stream) -> {
+                                      try {
+                                          if (stream instanceof 
AtomicOutputStream) {
+                                              ((AtomicOutputStream) 
stream).cancel();
+                                          } else {
+                                              stream.close();
+                                          }
+                                      } catch (Exception e) {
+                                          throw new RuntimeException(e);
+                                      }
+                                  });
     }
-    
+
     /**
      * Constructs a TimeCacheMap instance with a blobstore timeout and no 
callback function.
+     *
      * @param conf the config to use
      * @return the newly created TimeCacheMap
      */
@@ -562,7 +605,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     private static TimeCacheMap<String, Iterator<String>> 
makeBlobListCacheMap(Map<String, Object> conf) {
         return new 
TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_BLOBSTORE_EXPIRATION_SECS),
 600));
     }
-    
+
     private static ITopologyActionNotifierPlugin 
createTopologyActionNotifier(Map<String, Object> conf) {
         String clazz = (String) 
conf.get(DaemonConfig.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN);
         ITopologyActionNotifierPlugin ret = null;
@@ -577,7 +620,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         }
         return ret;
     }
-    
+
     @SuppressWarnings("unchecked")
     private static List<ClusterMetricsConsumerExecutor> 
makeClusterMetricsConsumerExecutors(Map<String, Object> conf) {
         Collection<Map<String, Object>> consumers = (Collection<Map<String, 
Object>>) conf.get(
@@ -590,16 +633,16 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         }
         return ret;
     }
-    
+
     private static Subject getSubject() {
         return ReqContext.context().subject();
     }
-    
+
     static Map<String, Object> readTopoConf(String topoId, TopoCache tc) 
throws KeyNotFoundException,
         AuthorizationException, IOException {
         return tc.readTopoConf(topoId, getSubject());
     }
-    
+
     static List<String> getKeyListFromId(Map<String, Object> conf, String id) {
         List<String> ret = new ArrayList<>(3);
         ret.add(ConfigUtils.masterStormCodeKey(id));
@@ -609,56 +652,57 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         }
         return ret;
     }
-    
+
     private static int getVersionForKey(String key, NimbusInfo nimbusInfo,
-        CuratorFramework zkClient) throws KeyNotFoundException {
+                                        CuratorFramework zkClient) throws 
KeyNotFoundException {
         KeySequenceNumber kseq = new KeySequenceNumber(key, nimbusInfo);
         return kseq.getKeySequenceNumber(zkClient);
     }
-    
+
     private static StormTopology readStormTopology(String topoId, TopoCache 
tc) throws KeyNotFoundException, AuthorizationException,
         IOException {
         return tc.readTopology(topoId, getSubject());
     }
-    
+
     private static Map<String, Object> readTopoConfAsNimbus(String topoId, 
TopoCache tc) throws KeyNotFoundException,
         AuthorizationException, IOException {
         return tc.readTopoConf(topoId, NIMBUS_SUBJECT);
     }
-    
+
     private static StormTopology readStormTopologyAsNimbus(String topoId, 
TopoCache tc) throws KeyNotFoundException,
         AuthorizationException, IOException {
         return tc.readTopology(topoId, NIMBUS_SUBJECT);
     }
-    
+
     /**
-     * convert {topology-id -> SchedulerAssignment} to
-     *         {topology-id -> {executor [node port]}}.
+     * convert {topology-id -> SchedulerAssignment} to {topology-id -> 
{executor [node port]}}.
+     *
      * @return {topology-id -> {executor [node port]}} mapping
      */
-    private static Map<String, Map<List<Long>, List<Object>>> 
computeTopoToExecToNodePort(Map<String, SchedulerAssignment> schedAssignments) {
+    private static Map<String, Map<List<Long>, List<Object>>> 
computeTopoToExecToNodePort(
+        Map<String, SchedulerAssignment> schedAssignments) {
         Map<String, Map<List<Long>, List<Object>>> ret = new HashMap<>();
-        for (Entry<String, SchedulerAssignment> schedEntry: 
schedAssignments.entrySet()) {
+        for (Entry<String, SchedulerAssignment> schedEntry : 
schedAssignments.entrySet()) {
             Map<List<Long>, List<Object>> execToNodePort = new HashMap<>();
-            for (Entry<ExecutorDetails, WorkerSlot> execAndNodePort: 
schedEntry.getValue().getExecutorToSlot().entrySet()) {
+            for (Entry<ExecutorDetails, WorkerSlot> execAndNodePort : 
schedEntry.getValue().getExecutorToSlot().entrySet()) {
                 ExecutorDetails exec = execAndNodePort.getKey();
                 WorkerSlot slot = execAndNodePort.getValue();
-                
+
                 List<Long> listExec = new ArrayList<>(2);
                 listExec.add((long) exec.getStartTask());
                 listExec.add((long) exec.getEndTask());
-                
+
                 List<Object> nodePort = new ArrayList<>(2);
                 nodePort.add(slot.getNodeId());
-                nodePort.add((long)slot.getPort());
-                
+                nodePort.add((long) slot.getPort());
+
                 execToNodePort.put(listExec, nodePort);
             }
             ret.put(schedEntry.getKey(), execToNodePort);
         }
         return ret;
     }
-    
+
     private static int numUsedWorkers(SchedulerAssignment assignment) {
         if (assignment == null) {
             return 0;
@@ -667,8 +711,8 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     }
 
     /**
-     * Convert {topology-id -> SchedulerAssignment} to {topology-id -> 
{WorkerSlot WorkerResources}}. Make sure this
-     * can deal with other non-RAS schedulers later we may further support 
map-for-any-resources.
+     * Convert {topology-id -> SchedulerAssignment} to {topology-id -> 
{WorkerSlot WorkerResources}}. Make sure this can deal with other
+     * non-RAS schedulers later we may further support map-for-any-resources.
      *
      * @param schedAssignments the assignments
      * @return The resources used per slot
@@ -687,7 +731,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         Map<String, Map<List<Long>, List<Object>>> ret = 
computeTopoToExecToNodePort(schedAssignments);
         // Print some useful information
         if (existingAssignments != null && !existingAssignments.isEmpty()) {
-            for (Entry<String, Map<List<Long>, List<Object>>> entry: 
ret.entrySet()) {
+            for (Entry<String, Map<List<Long>, List<Object>>> entry : 
ret.entrySet()) {
                 String topoId = entry.getKey();
                 Map<List<Long>, List<Object>> execToNodePort = 
entry.getValue();
                 Assignment assignment = existingAssignments.get(topoId);
@@ -696,12 +740,12 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                 }
                 Map<List<Long>, NodeInfo> old = 
assignment.get_executor_node_port();
                 Map<List<Long>, List<Object>> reassigned = new HashMap<>();
-                for (Entry<List<Long>, List<Object>> execAndNodePort: 
execToNodePort.entrySet()) {
+                for (Entry<List<Long>, List<Object>> execAndNodePort : 
execToNodePort.entrySet()) {
                     NodeInfo oldAssigned = old.get(execAndNodePort.getKey());
                     String node = (String) execAndNodePort.getValue().get(0);
                     Long port = (Long) execAndNodePort.getValue().get(1);
-                    if (oldAssigned == null || 
!oldAssigned.get_node().equals(node) 
-                            || 
!port.equals(oldAssigned.get_port_iterator().next())) {
+                    if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
+                        || 
!port.equals(oldAssigned.get_port_iterator().next())) {
                         reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
                     }
                 }
@@ -716,12 +760,12 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         }
         return ret;
     }
-    
+
     private static List<List<Long>> changedExecutors(Map<List<Long>, NodeInfo> 
map, Map<List<Long>,
         List<Object>> newExecToNodePort) {
         HashMap<NodeInfo, List<List<Long>>> tmpSlotAssigned = map == null ? 
new HashMap<>() : Utils.reverseMap(map);
         HashMap<List<Object>, List<List<Long>>> slotAssigned = new HashMap<>();
-        for (Entry<NodeInfo, List<List<Long>>> entry: 
tmpSlotAssigned.entrySet()) {
+        for (Entry<NodeInfo, List<List<Long>>> entry : 
tmpSlotAssigned.entrySet()) {
             NodeInfo ni = entry.getKey();
             List<Object> key = new ArrayList<>(2);
             key.add(ni.get_node());
@@ -731,16 +775,16 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             slotAssigned.put(key, value);
         }
         HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = 
newExecToNodePort == null ? new HashMap<>() :
-                Utils.reverseMap(newExecToNodePort);
+            Utils.reverseMap(newExecToNodePort);
         HashMap<List<Object>, List<List<Long>>> newSlotAssigned = new 
HashMap<>();
-        for (Entry<List<Object>, List<List<Long>>> entry: 
tmpNewSlotAssigned.entrySet()) {
+        for (Entry<List<Object>, List<List<Long>>> entry : 
tmpNewSlotAssigned.entrySet()) {
             List<List<Long>> value = new ArrayList<>(entry.getValue());
             value.sort((a, b) -> a.get(0).compareTo(b.get(0)));
             newSlotAssigned.put(entry.getKey(), value);
         }
         Map<List<Object>, List<List<Long>>> diff = mapDiff(slotAssigned, 
newSlotAssigned);
         List<List<Long>> ret = new ArrayList<>();
-        for (List<List<Long>> val: diff.values()) {
+        for (List<List<Long>> val : diff.values()) {
             ret.addAll(val);
         }
         return ret;
@@ -751,27 +795,27 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         Set<NodeInfo> niRet = new 
HashSet<>(current.get_executor_node_port().values());
         niRet.removeAll(oldSlots);
         Set<WorkerSlot> ret = new HashSet<>();
-        for (NodeInfo ni: niRet) {
+        for (NodeInfo ni : niRet) {
             ret.add(new WorkerSlot(ni.get_node(), 
ni.get_port_iterator().next()));
         }
         return ret;
     }
-    
+
     private static Map<String, SupervisorDetails> 
basicSupervisorDetailsMap(IStormClusterState state) {
         Map<String, SupervisorDetails> ret = new HashMap<>();
-        for (Entry<String, SupervisorInfo> entry: 
state.allSupervisorInfo().entrySet()) {
+        for (Entry<String, SupervisorInfo> entry : 
state.allSupervisorInfo().entrySet()) {
             String id = entry.getKey();
             SupervisorInfo info = entry.getValue();
             ret.put(id, new SupervisorDetails(id, info.get_server_port(), 
info.get_hostname(),
-                    info.get_scheduler_meta(), null, 
info.get_resources_map()));
+                                              info.get_scheduler_meta(), null, 
info.get_resources_map()));
         }
         return ret;
     }
-    
+
     private static boolean isTopologyActive(IStormClusterState state, String 
topoName) {
         return state.getTopoId(topoName).isPresent();
     }
-    
+
     private static Map<String, Object> tryReadTopoConf(String topoId, 
TopoCache tc)
         throws NotAliveException, AuthorizationException, IOException {
         try {
@@ -785,10 +829,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             throw new NotAliveException(topoId);
         }
     }
-    
-    private static final List<String> EMPTY_STRING_LIST = 
Collections.unmodifiableList(Collections.emptyList());
-    private static final Set<String> EMPTY_STRING_SET = 
Collections.unmodifiableSet(Collections.emptySet());
-    
+
     @VisibleForTesting
     public static Set<String> topoIdsToClean(IStormClusterState state, 
BlobStore store) {
         Set<String> ret = new HashSet<>();
@@ -800,7 +841,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         ret.removeAll(Utils.OR(state.activeStorms(), EMPTY_STRING_LIST));
         return ret;
     }
-    
+
     private static String extractStatusStr(StormBase base) {
         String ret = null;
         if (base != null) {
@@ -815,7 +856,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     private static StormTopology normalizeTopology(Map<String, Object> 
topoConf, StormTopology topology)
         throws InvalidTopologyException {
         StormTopology ret = topology.deepCopy();
-        for (Object comp: StormCommon.allComponents(ret).values()) {
+        for (Object comp : StormCommon.allComponents(ret).values()) {
             Map<String, Object> mergedConf = StormCommon.componentConf(comp);
             mergedConf.put(Config.TOPOLOGY_TASKS, 
ServerUtils.getComponentParallelism(topoConf, comp));
             String jsonConf = JSONValue.toJSONString(mergedConf);
@@ -823,47 +864,47 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         }
         return ret;
     }
-    
+
     private static void addToDecorators(Set<String> decorators, List<String> 
conf) {
         if (conf != null) {
             decorators.addAll(conf);
         }
     }
-    
+
     @SuppressWarnings("unchecked")
     private static void addToSerializers(Map<String, String> ser, List<Object> 
conf) {
         if (conf != null) {
-            for (Object o: conf) {
+            for (Object o : conf) {
                 if (o instanceof Map) {
-                    ser.putAll((Map<String,String>)o);
+                    ser.putAll((Map<String, String>) o);
                 } else {
-                    ser.put((String)o, null);
+                    ser.put((String) o, null);
                 }
             }
         }
     }
-    
+
     @SuppressWarnings("unchecked")
-    private static Map<String, Object> normalizeConf(Map<String,Object> conf, 
Map<String, Object> topoConf, StormTopology topology) {
+    private static Map<String, Object> normalizeConf(Map<String, Object> conf, 
Map<String, Object> topoConf, StormTopology topology) {
         //ensure that serializations are same for all tasks no matter what's on
         // the supervisors. this also allows you to declare the serializations 
as a sequence
         List<Map<String, Object>> allConfs = new ArrayList<>();
-        for (Object comp: StormCommon.allComponents(topology).values()) {
+        for (Object comp : StormCommon.allComponents(topology).values()) {
             allConfs.add(StormCommon.componentConf(comp));
         }
 
         Set<String> decorators = new HashSet<>();
         //Yes we are putting in a config that is not the same type we pulled 
out.
         Map<String, String> serializers = new HashMap<>();
-        for (Map<String, Object> c: allConfs) {
+        for (Map<String, Object> c : allConfs) {
             addToDecorators(decorators, (List<String>) 
c.get(Config.TOPOLOGY_KRYO_DECORATORS));
             addToSerializers(serializers, (List<Object>) 
c.get(Config.TOPOLOGY_KRYO_REGISTER));
         }
-        addToDecorators(decorators, 
(List<String>)topoConf.getOrDefault(Config.TOPOLOGY_KRYO_DECORATORS, 
-                conf.get(Config.TOPOLOGY_KRYO_DECORATORS)));
-        addToSerializers(serializers, 
(List<Object>)topoConf.getOrDefault(Config.TOPOLOGY_KRYO_REGISTER, 
-                conf.get(Config.TOPOLOGY_KRYO_REGISTER)));
-        
+        addToDecorators(decorators, (List<String>) 
topoConf.getOrDefault(Config.TOPOLOGY_KRYO_DECORATORS,
+                                                                         
conf.get(Config.TOPOLOGY_KRYO_DECORATORS)));
+        addToSerializers(serializers, (List<Object>) 
topoConf.getOrDefault(Config.TOPOLOGY_KRYO_REGISTER,
+                                                                           
conf.get(Config.TOPOLOGY_KRYO_REGISTER)));
+
         Map<String, Object> mergedConf = Utils.merge(conf, topoConf);
         Map<String, Object> ret = new HashMap<>(topoConf);
         ret.put(Config.TOPOLOGY_KRYO_REGISTER, serializers);
@@ -873,7 +914,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 
mergedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
         return ret;
     }
-    
+
     private static void rmBlobKey(BlobStore store, String key, 
IStormClusterState state) {
         try {
             store.deleteBlob(key, NIMBUS_SUBJECT);
@@ -885,10 +926,11 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             LOG.info("Exception {}", e);
         }
     }
-    
+
     /**
      * Deletes jar files in dirLoc older than seconds.
-     * @param dirLoc the location to look in for file
+     *
+     * @param dirLoc  the location to look in for file
      * @param seconds how old is too old and should be deleted
      */
     @VisibleForTesting
@@ -904,12 +946,10 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             }
         }
     }
-    
+
     private static ExecutorInfo toExecInfo(List<Long> exec) {
         return new ExecutorInfo(exec.get(0).intValue(), 
exec.get(1).intValue());
     }
-    
-    private static final Pattern TOPOLOGY_NAME_REGEX = 
Pattern.compile("^[^/.:\\\\]+$");
 
     private static void validateTopologyName(String name) throws 
InvalidTopologyException {
         Matcher m = TOPOLOGY_NAME_REGEX.matcher(name);
@@ -919,16 +959,16 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     }
 
     private static StormTopology tryReadTopology(String topoId, TopoCache tc)
-            throws NotAliveException, AuthorizationException, IOException {
+        throws NotAliveException, AuthorizationException, IOException {
         try {
             return readStormTopologyAsNimbus(topoId, tc);
         } catch (KeyNotFoundException e) {
             throw new NotAliveException(topoId);
         }
     }
-    
+
     private static void validateTopologySize(Map<String, Object> topoConf, 
Map<String, Object> nimbusConf,
-        StormTopology topology) throws InvalidTopologyException {
+                                             StormTopology topology) throws 
InvalidTopologyException {
         int workerCount = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_WORKERS), 1);
         Integer allowedWorkers = 
ObjectReader.getInt(nimbusConf.get(DaemonConfig.NIMBUS_SLOTS_PER_TOPOLOGY), 
null);
         int executorsCount = 0;
@@ -938,15 +978,15 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         Integer allowedExecutors = 
ObjectReader.getInt(nimbusConf.get(DaemonConfig.NIMBUS_EXECUTORS_PER_TOPOLOGY), 
null);
         if (allowedExecutors != null && executorsCount > allowedExecutors) {
             throw new InvalidTopologyException("Failed to submit topology. 
Topology requests more than " +
-                    allowedExecutors + " executors.");
+                                               allowedExecutors + " 
executors.");
         }
-        
+
         if (allowedWorkers != null && workerCount > allowedWorkers) {
             throw new InvalidTopologyException("Failed to submit topology. 
Topology requests more than " +
-                    allowedWorkers + " workers.");
+                                               allowedWorkers + " workers.");
         }
     }
-    
+
     private static void setLoggerTimeouts(LogLevel level) {
         int timeoutSecs = level.get_reset_log_level_timeout_secs();
         if (timeoutSecs > 0) {
@@ -955,45 +995,45 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             level.unset_reset_log_level_timeout_epoch();
         }
     }
-    
+
     @VisibleForTesting
     public static List<String> topologiesOnSupervisor(Map<String, Assignment> 
assignments, String supervisorId) {
         Set<String> ret = new HashSet<>();
-        for (Entry<String, Assignment> entry: assignments.entrySet()) {
+        for (Entry<String, Assignment> entry : assignments.entrySet()) {
             Assignment assignment = entry.getValue();
-            for (NodeInfo nodeInfo: 
assignment.get_executor_node_port().values()) {
+            for (NodeInfo nodeInfo : 
assignment.get_executor_node_port().values()) {
                 if (supervisorId.equals(nodeInfo.get_node())) {
                     ret.add(entry.getKey());
                     break;
                 }
             }
         }
-        
+
         return new ArrayList<>(ret);
     }
-    
+
     private static IClusterMetricsConsumer.ClusterInfo mkClusterInfo() {
         return new IClusterMetricsConsumer.ClusterInfo(Time.currentTimeSecs());
     }
-    
+
     private static List<DataPoint> extractClusterMetrics(ClusterSummary summ) {
         List<DataPoint> ret = new ArrayList<>();
         ret.add(new DataPoint("supervisors", summ.get_supervisors_size()));
         ret.add(new DataPoint("topologies", summ.get_topologies_size()));
-        
+
         int totalSlots = 0;
         int usedSlots = 0;
-        for (SupervisorSummary sup: summ.get_supervisors()) {
+        for (SupervisorSummary sup : summ.get_supervisors()) {
             usedSlots += sup.get_num_used_workers();
             totalSlots += sup.get_num_workers();
         }
         ret.add(new DataPoint("slotsTotal", totalSlots));
         ret.add(new DataPoint("slotsUsed", usedSlots));
         ret.add(new DataPoint("slotsFree", totalSlots - usedSlots));
-        
+
         int totalExecutors = 0;
         int totalTasks = 0;
-        for (TopologySummary topo: summ.get_topologies()) {
+        for (TopologySummary topo : summ.get_topologies()) {
             totalExecutors += topo.get_num_executors();
             totalTasks += topo.get_num_tasks();
         }
@@ -1004,8 +1044,9 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
 
     private static Map<IClusterMetricsConsumer.SupervisorInfo, 
List<DataPoint>> extractSupervisorMetrics(ClusterSummary summ) {
         Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> ret = new 
HashMap<>();
-        for (SupervisorSummary sup: summ.get_supervisors()) {
-            IClusterMetricsConsumer.SupervisorInfo info = new 
IClusterMetricsConsumer.SupervisorInfo(sup.get_host(), sup.get_supervisor_id(), 
Time.currentTimeSecs());
+        for (SupervisorSummary sup : summ.get_supervisors()) {
+            IClusterMetricsConsumer.SupervisorInfo info =
+                new IClusterMetricsConsumer.SupervisorInfo(sup.get_host(), 
sup.get_supervisor_id(), Time.currentTimeSecs());
             List<DataPoint> metrics = new ArrayList<>();
             metrics.add(new DataPoint("slotsTotal", sup.get_num_workers()));
             metrics.add(new DataPoint("slotsUsed", 
sup.get_num_used_workers()));
@@ -1017,7 +1058,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         }
         return ret;
     }
-    
+
     private static void setResourcesDefaultIfNotSet(Map<String, 
NormalizedResourceRequest> compResourcesMap, String compId,
                                                     Map<String, Object> 
topoConf) {
         NormalizedResourceRequest resources = compResourcesMap.get(compId);
@@ -1025,7 +1066,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             compResourcesMap.put(compId, new 
NormalizedResourceRequest(topoConf));
         }
     }
-    
+
     private static void validatePortAvailable(Map<String, Object> conf) throws 
IOException {
         int port = ObjectReader.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
         try (ServerSocket socket = new ServerSocket(port)) {
@@ -1035,7 +1076,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             System.exit(0);
         }
     }
-    
+
     private static Nimbus launchServer(Map<String, Object> conf, INimbus 
inimbus) throws Exception {
         StormCommon.validateDistributedMode(conf);
         validatePortAvailable(conf);
@@ -1056,7 +1097,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
 
     public static Nimbus launch(INimbus inimbus) throws Exception {
         Map<String, Object> conf = Utils.merge(Utils.readStormConfig(),
-                ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false));
+                                               
ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false));
         boolean fixupAcl = (boolean) 
conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP);
         boolean checkAcl = fixupAcl || (boolean) 
conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK);
         if (checkAcl) {
@@ -1064,168 +1105,165 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         }
         return launchServer(conf, inimbus);
     }
-    
+
     public static void main(String[] args) throws Exception {
         Utils.setupDefaultUncaughtExceptionHandler();
         launch(new StandaloneINimbus());
     }
-    
-    private final Map<String, Object> conf;
-    private MetricStore metricsStore;
-    private final NavigableMap<SimpleVersion, List<String>> 
supervisorClasspaths;
-    private final NimbusInfo nimbusHostPortInfo;
-    private final INimbus inimbus;
-    private IAuthorizer authorizationHandler;
-    private final IAuthorizer impersonationAuthorizationHandler;
-    private final AtomicLong submittedCount;
-    //Cached CuratorFramework, mainly used for BlobStore.
-    private CuratorFramework zkClient;
-    private final IStormClusterState stormClusterState;
-    private final Object submitLock = new Object();
-    private final Object schedLock = new Object();
-    private final Object credUpdateLock = new Object();
-    private final AtomicReference<Map<String, Map<List<Integer>, Map<String, 
Object>>>> heartbeatsCache;
-    private final AtomicBoolean heartbeatsReadyFlag;
-    private final IWorkerHeartbeatsRecoveryStrategy heartbeatsRecoveryStrategy;
-    @SuppressWarnings("deprecation")
-    private final TimeCacheMap<String, BufferInputStream> downloaders;
-    @SuppressWarnings("deprecation")
-    private final TimeCacheMap<String, WritableByteChannel> uploaders;
-    private final BlobStore blobStore;
-    private final TopoCache topoCache;
-    @SuppressWarnings("deprecation")
-    private final TimeCacheMap<String, BufferInputStream> blobDownloaders;
-    @SuppressWarnings("deprecation")
-    private final TimeCacheMap<String, OutputStream> blobUploaders;
-    @SuppressWarnings("deprecation")
-    private final TimeCacheMap<String, Iterator<String>> blobListers;
-    private final UptimeComputer uptime;
-    private final ITopologyValidator validator;
-    private final StormTimer timer;
-    private final IScheduler scheduler;
-    private final IScheduler underlyingScheduler;
-    private final ILeaderElector leaderElector;
-    private final AssignmentDistributionService assignmentsDistributer;
-    private final AtomicReference<Map<String, String>> idToSchedStatus;
-    private final AtomicReference<Map<String, SupervisorResources>> 
nodeIdToResources;
-    private final AtomicReference<Map<String, TopologyResources>> 
idToResources;
-    private final AtomicReference<Map<String, Map<WorkerSlot, 
WorkerResources>>> idToWorkerResources;
-    private final Collection<ICredentialsRenewer> credRenewers;
-    private final Object topologyHistoryLock;
-    private final LocalState topologyHistoryState;
-    private final Collection<INimbusCredentialPlugin> nimbusAutocredPlugins;
-    private final ITopologyActionNotifierPlugin nimbusTopologyActionNotifier;
-    private final List<ClusterMetricsConsumerExecutor> 
clusterConsumerExceutors;
-    private final IGroupMappingServiceProvider groupMapper;
-    private final IPrincipalToLocal principalToLocal;
-    //May be null if worker tokens are not supported by the thrift transport.
-    private WorkerTokenManager workerTokenManager;
 
-    private static CuratorFramework makeZKClient(Map<String, Object> conf) {
-        List<String> servers = 
(List<String>)conf.get(Config.STORM_ZOOKEEPER_SERVERS);
-        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
-        String root = (String)conf.get(Config.STORM_ZOOKEEPER_ROOT);
-        CuratorFramework ret = null;
-        if (servers != null && port != null) {
-            ret = ClientZookeeper.mkClient(conf, servers, port, root, new 
DefaultWatcherCallBack(), conf, DaemonType.NIMBUS);
+    private static CuratorFramework makeZKClient(Map<String, Object> conf) {
+        List<String> servers = (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
+        String root = (String) conf.get(Config.STORM_ZOOKEEPER_ROOT);
+        CuratorFramework ret = null;
+        if (servers != null && port != null) {
+            ret = ClientZookeeper.mkClient(conf, servers, port, root, new 
DefaultWatcherCallBack(), conf, DaemonType.NIMBUS);
+        }
+        return ret;
+    }
+
+    private static IStormClusterState makeStormClusterState(Map<String, 
Object> conf) throws Exception {
+        return ClusterUtils.mkStormClusterState(conf, new 
ClusterStateContext(DaemonType.NIMBUS, conf));
+    }
+
+    private static List<Integer> asIntExec(List<Long> exec) {
+        List<Integer> ret = new ArrayList<>(2);
+        ret.add(exec.get(0).intValue());
+        ret.add(exec.get(1).intValue());
+        return ret;
+    }
+
+    /**
+     * Diff old/new assignment to find nodes which assigned assignments has 
changed.
+     *
+     * @param oldAss old assigned assignment
+     * @param newAss new assigned assignment
+     * @return nodeId -> host map of assignments changed nodes
+     */
+    private static Map<String, String> assignmentChangedNodes(Assignment 
oldAss, Assignment newAss) {
+        Map<List<Long>, NodeInfo> oldExecutorNodePort = null;
+        Map<List<Long>, NodeInfo> newExecutorNodePort = null;
+        Map<String, String> allNodeHost = new HashMap<>();
+        if (oldAss != null) {
+            oldExecutorNodePort = oldAss.get_executor_node_port();
+            allNodeHost.putAll(oldAss.get_node_host());
+        }
+        if (newAss != null) {
+            newExecutorNodePort = newAss.get_executor_node_port();
+            allNodeHost.putAll(newAss.get_node_host());
+        }
+        //kill or newly submit
+        if (oldAss == null || newAss == null) {
+            return allNodeHost;
+        } else {
+            // rebalance
+            Map<String, String> ret = new HashMap();
+            for (Map.Entry<List<Long>, NodeInfo> entry : 
newExecutorNodePort.entrySet()) {
+                NodeInfo newNodeInfo = entry.getValue();
+                NodeInfo oldNodeInfo = oldExecutorNodePort.get(entry.getKey());
+                if (null != oldNodeInfo) {
+                    if (!oldNodeInfo.equals(newNodeInfo)) {
+                        ret.put(oldNodeInfo.get_node(), 
allNodeHost.get(oldNodeInfo.get_node()));
+                        ret.put(newNodeInfo.get_node(), 
allNodeHost.get(newNodeInfo.get_node()));
+                    }
+                } else {
+                    ret.put(newNodeInfo.get_node(), 
allNodeHost.get(newNodeInfo.get_node()));
+                }
+            }
+
+            return ret;
         }
-        return ret;
-    }
-    
-    private static IStormClusterState makeStormClusterState(Map<String, 
Object> conf) throws Exception {
-        return ClusterUtils.mkStormClusterState(conf, new 
ClusterStateContext(DaemonType.NIMBUS, conf));
     }
-    
-    public Nimbus(Map<String, Object> conf, INimbus inimbus) throws Exception {
-        this(conf, inimbus, null, null, null, null, null);
+
+    /**
+     * Pick out assignments for specific node from all assignments.
+     *
+     * @param assignmentMap stormId -> assignment map
+     * @param nodeId        supervisor/node id
+     * @return stormId -> assignment map for the node
+     */
+    private static Map<String, Assignment> assignmentsForNode(Map<String, 
Assignment> assignmentMap, String nodeId) {
+        Map<String, Assignment> ret = new HashMap<>();
+        assignmentMap.entrySet().stream().filter(assignmentEntry -> 
assignmentEntry.getValue().get_node_host().keySet()
+                                                                               
    .contains(nodeId))
+                     .forEach(assignmentEntry -> { 
ret.put(assignmentEntry.getKey(), assignmentEntry.getValue()); });
+
+        return ret;
     }
-    
-    public Nimbus(Map<String, Object> conf, INimbus inimbus, 
IStormClusterState stormClusterState, NimbusInfo hostPortInfo,
-                  BlobStore blobStore, ILeaderElector leaderElector, 
IGroupMappingServiceProvider groupMapper) throws Exception {
-        this(conf, inimbus, stormClusterState, hostPortInfo, blobStore, null, 
leaderElector, groupMapper);
+
+    /**
+     * Notify supervisors/nodes assigned assignments.
+     *
+     * @param assignments       assignments map for nodes
+     * @param service           {@link AssignmentDistributionService} for 
distributing assignments asynchronous
+     * @param nodeHost          node -> host map
+     * @param supervisorDetails nodeId -> {@link SupervisorDetails} map
+     */
+    private static void notifySupervisorsAssignments(Map<String, Assignment> 
assignments,
+                                                     
AssignmentDistributionService service, Map<String, String> nodeHost,
+                                                     Map<String, 
SupervisorDetails> supervisorDetails) {
+        for (Map.Entry<String, String> nodeEntry : nodeHost.entrySet()) {
+            try {
+                String nodeId = nodeEntry.getKey();
+                SupervisorAssignments supervisorAssignments = new 
SupervisorAssignments();
+                
supervisorAssignments.set_storm_assignment(assignmentsForNode(assignments, 
nodeEntry.getKey()));
+                SupervisorDetails details = supervisorDetails.get(nodeId);
+                Integer serverPort = details != null ? details.getServerPort() 
: null;
+                service.addAssignmentsForNode(nodeId, nodeEntry.getValue(), 
serverPort, supervisorAssignments);
+            } catch (Throwable tr1) {
+                //just skip when any error happens wait for next round 
assignments reassign
+                LOG.error("Exception when add assignments distribution task 
for node {}", nodeEntry.getKey());
+            }
+        }
     }
 
-    public Nimbus(Map<String, Object> conf, INimbus inimbus, 
IStormClusterState stormClusterState, NimbusInfo hostPortInfo,
-            BlobStore blobStore, TopoCache topoCache, ILeaderElector 
leaderElector, IGroupMappingServiceProvider groupMapper)
-        throws Exception {
-        this.conf = conf;
+    private static void notifySupervisorsAsKilled(IStormClusterState 
clusterState, Assignment oldAss,
+                                                  
AssignmentDistributionService service) {
+        Map<String, String> nodeHost = assignmentChangedNodes(oldAss, null);
+        notifySupervisorsAssignments(clusterState.assignmentsInfo(), service, 
nodeHost,
+                                     basicSupervisorDetailsMap(clusterState));
+    }
 
-        this.metricsStore = null;
-        try {
-            this.metricsStore = MetricStoreConfig.configure(conf);
-        } catch (Exception e) {
-            // the metrics store is not critical to the operation of the 
cluster, allow Nimbus to come up
-            LOG.error("Failed to initialize metric store", e);
+    @VisibleForTesting
+    static void validateTopologyWorkerMaxHeapSizeConfigs(
+        Map<String, Object> stormConf, StormTopology topology, double 
defaultWorkerMaxHeapSizeMB) {
+        double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, 
stormConf);
+        double topologyWorkerMaxHeapSize =
+            
ObjectReader.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB), 
defaultWorkerMaxHeapSizeMB);
+        if (topologyWorkerMaxHeapSize < largestMemReq) {
+            throw new IllegalArgumentException(
+                "Topology will not be able to be successfully scheduled: 
Config "
+                + "TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB="
+                + topologyWorkerMaxHeapSize
+                + " < " + largestMemReq + " (Largest memory requirement of a 
component in the topology)."
+                + " Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger 
amount");
         }
+    }
 
-        if (hostPortInfo == null) {
-            hostPortInfo = NimbusInfo.fromConf(conf);
-        }
-        this.nimbusHostPortInfo = hostPortInfo;
-        if (inimbus != null) {
-            inimbus.prepare(conf, ServerConfigUtils.masterInimbusDir(conf));
-        }
-        
-        this.inimbus = inimbus;
-        this.authorizationHandler = 
StormCommon.mkAuthorizationHandler((String) 
conf.get(DaemonConfig.NIMBUS_AUTHORIZER), conf);
-        this.impersonationAuthorizationHandler = 
StormCommon.mkAuthorizationHandler((String) 
conf.get(DaemonConfig.NIMBUS_IMPERSONATION_AUTHORIZER), conf);
-        this.submittedCount = new AtomicLong(0);
-        if (stormClusterState == null) {
-            stormClusterState =  makeStormClusterState(conf);
-        }
-        this.stormClusterState = stormClusterState;
-        this.heartbeatsCache = new AtomicReference<>(new HashMap<>());
-        this.heartbeatsReadyFlag = new AtomicBoolean(false);
-        this.heartbeatsRecoveryStrategy = 
WorkerHeartbeatsRecoveryStrategyFactory.getStrategy(conf);
-        this.downloaders = fileCacheMap(conf);
-        this.uploaders = fileCacheMap(conf);
-        if (blobStore == null) {
-            blobStore = ServerUtils.getNimbusBlobStore(conf, 
this.nimbusHostPortInfo);
-        }
-        this.blobStore = blobStore;
-        if (topoCache == null) {
-            topoCache = new TopoCache(blobStore, conf);
-        }
-        this.topoCache = topoCache;
-        this.blobDownloaders = makeBlobCacheMap(conf);
-        this.blobUploaders = makeBlobCacheMap(conf);
-        this.blobListers = makeBlobListCacheMap(conf);
-        this.uptime = Utils.makeUptimeComputer();
-        this.validator = ReflectionUtils.newInstance((String) 
conf.getOrDefault(DaemonConfig.NIMBUS_TOPOLOGY_VALIDATOR, 
DefaultTopologyValidator.class.getName()));
-        this.timer = new StormTimer(null, (t, e) -> {
-            LOG.error("Error while processing event", e);
-            Utils.exitProcess(20, "Error while processing event");
-        });
-        this.underlyingScheduler = makeScheduler(conf, inimbus);
-        this.scheduler = wrapAsBlacklistScheduler(conf, underlyingScheduler);
-        this.zkClient = makeZKClient(conf);
-        if (leaderElector == null) {
-            leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, 
blobStore, topoCache, stormClusterState, getNimbusAcls(conf));
+    private static double getMaxExecutorMemoryUsageForTopo(
+        StormTopology topology, Map<String, Object> topologyConf) {
+        double largestMemoryOperator = 0.0;
+        for (NormalizedResourceRequest entry :
+            ResourceUtils.getBoltsResources(topology, topologyConf).values()) {
+            double memoryRequirement = entry.getTotalMemoryMb();
+            if (memoryRequirement > largestMemoryOperator) {
+                largestMemoryOperator = memoryRequirement;
+            }
         }
-        this.leaderElector = leaderElector;
-        this.assignmentsDistributer = 
AssignmentDistributionService.getInstance(conf);
-        this.idToSchedStatus = new AtomicReference<>(new HashMap<>());
-        this.nodeIdToResources = new AtomicReference<>(new HashMap<>());
-        this.idToResources = new AtomicReference<>(new HashMap<>());
-        this.idToWorkerResources = new AtomicReference<>(new HashMap<>());
-        this.credRenewers = AuthUtils.GetCredentialRenewers(conf);
-        this.topologyHistoryLock = new Object();
-        this.topologyHistoryState = 
ServerConfigUtils.nimbusTopoHistoryState(conf);
-        this.nimbusAutocredPlugins = AuthUtils.getNimbusAutoCredPlugins(conf);
-        this.nimbusTopologyActionNotifier = createTopologyActionNotifier(conf);
-        this.clusterConsumerExceutors = 
makeClusterMetricsConsumerExecutors(conf);
-        if (groupMapper == null) {
-            groupMapper = AuthUtils.GetGroupMappingServiceProviderPlugin(conf);
+        for (NormalizedResourceRequest entry :
+            ResourceUtils.getSpoutsResources(topology, topologyConf).values()) 
{
+            double memoryRequirement = entry.getTotalMemoryMb();
+            if (memoryRequirement > largestMemoryOperator) {
+                largestMemoryOperator = memoryRequirement;
+            }
         }
-        this.groupMapper = groupMapper;
-        this.principalToLocal = AuthUtils.GetPrincipalToLocalPlugin(conf);
-        this.supervisorClasspaths = 
Collections.unmodifiableNavigableMap(Utils.getConfiguredClasspathVersions(conf, 
EMPTY_STRING_LIST));// We don't use the classpath part of this, so just an 
empty list
+        return largestMemoryOperator;
     }
 
     Map<String, Object> getConf() {
         return conf;
     }
-    
+
     @VisibleForTesting
     public void setAuthorizationHandler(IAuthorizer authorizationHandler) {
         this.authorizationHandler = authorizationHandler;
@@ -1238,9 +1276,9 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     private AssignmentDistributionService getAssignmentsDistributer() {
         return assignmentsDistributer;
     }
-    
+
     @VisibleForTesting
-    public AtomicReference<Map<String,Map<List<Integer>,Map<String,Object>>>> 
getHeartbeatsCache() {
+    public AtomicReference<Map<String, Map<List<Integer>, Map<String, 
Object>>>> getHeartbeatsCache() {
         return heartbeatsCache;
     }
 
@@ -1262,26 +1300,27 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     private boolean isLeader() throws Exception {
         return leaderElector.isLeader();
     }
-    
+
     private void assertIsLeader() throws Exception {
         if (!isLeader()) {
             NimbusInfo leaderAddress = leaderElector.getLeader();
             throw new RuntimeException("not a leader, current leader is " + 
leaderAddress);
         }
     }
-    
+
     private String getInbox() throws IOException {
         return ServerConfigUtils.masterInbox(conf);
     }
 
     /**
      * Used for local cluster.
+     *
      * @param supervisor {@link org.apache.storm.daemon.supervisor.Supervisor}
      */
     public void addSupervisor(org.apache.storm.daemon.supervisor.Supervisor 
supervisor) {
         assignmentsDistributer.addLocalSupervisor(supervisor);
     }
-    
+
     void delayEvent(String topoId, int delaySecs, TopologyActions event, 
Object args) {
         LOG.info("Delaying event {} for {} secs for {}", event, delaySecs, 
topoId);
         timer.schedule(delaySecs, () -> {
@@ -1298,11 +1337,11 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         StormBase updated = new StormBase();
         updated.set_topology_action_options(null);
         updated.set_component_debug(Collections.emptyMap());
-        
+
         if (rbo.is_set_num_executors()) {
             updated.set_component_executors(rbo.get_num_executors());
         }
-        
+
         if (rbo.is_set_num_workers()) {
             updated.set_num_workers(rbo.get_num_workers());
         }
@@ -1310,12 +1349,12 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         updateBlobStore(topoId, rbo, 
ServerUtils.principalNameToSubject(rbo.get_principal()));
         mkAssignments(topoId);
     }
-    
+
     private String toTopoId(String topoName) throws NotAliveException {
         return stormClusterState.getTopoId(topoName)
-                .orElseThrow(() -> new NotAliveException(topoName + " is not 
alive"));
+                                .orElseThrow(() -> new 
NotAliveException(topoName + " is not alive"));
     }
-    
+
     private void transitionName(String topoName, TopologyActions event, Object 
eventArg, boolean errorOnNoTransition) throws Exception {
         transition(toTopoId(topoName), event, eventArg, errorOnNoTransition);
     }
@@ -1323,12 +1362,12 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     private void transition(String topoId, TopologyActions event, Object 
eventArg) throws Exception {
         transition(topoId, event, eventArg, false);
     }
-    
+
     private void transition(String topoId, TopologyActions event, Object 
eventArg, boolean errorOnNoTransition)
         throws Exception {
         LOG.info("TRANSITION: {} {} {} {}", topoId, event, eventArg, 
errorOnNoTransition);
         assertIsLeader();
-        synchronized(submitLock) {
+        synchronized (submitLock) {
             IStormClusterState clusterState = stormClusterState;
             StormBase base = clusterState.stormBase(topoId, null);
             if (base == null || base.get_status() == null) {
@@ -1341,7 +1380,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                     if (errorOnNoTransition) {
                         throw new RuntimeException(message);
                     }
-                    
+
                     if (TopologyActions.STARTUP != event) {
                         //STARTUP is a system event so don't log an issue
                         LOG.info(message);
@@ -1355,9 +1394,9 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             }
         }
     }
-    
+
     private void setupStormCode(Map<String, Object> conf, String topoId, 
String tmpJarLocation,
-        Map<String, Object> topoConf, StormTopology topology) throws Exception 
{
+                                Map<String, Object> topoConf, StormTopology 
topology) throws Exception {
         Subject subject = getSubject();
         IStormClusterState clusterState = stormClusterState;
         BlobStore store = blobStore;
@@ -1398,7 +1437,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         throws AuthorizationException, IOException, KeyNotFoundException {
         Map<String, Object> topoConf = new 
HashMap<>(topoCache.readTopoConf(topoId, subject)); //Copy the data
         topoConf.putAll(configOverride);
-        topoCache.updateTopoConf(topoId, subject,  topoConf);
+        topoCache.updateTopoConf(topoId, subject, topoConf);
     }
 
     private void updateBlobStore(String topoId, RebalanceOptions rbo, Subject 
subject)
@@ -1412,7 +1451,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             updateTopologyConf(topoId, Utils.parseJson(confOverride), subject);
         }
     }
-    
+
     private Integer getBlobReplicationCount(String key) throws Exception {
         BlobStore store = blobStore;
         if (store != null) {
@@ -1420,7 +1459,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         }
         return null;
     }
-    
+
     private void waitForDesiredCodeReplication(Map<String, Object> topoConf, 
String topoId) throws Exception {
         int minReplicationCount = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MIN_REPLICATION_COUNT));
         int maxWaitTime = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_REPLICATION_WAIT_TIME_SEC));
@@ -1434,12 +1473,12 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         //When is this ever null?
         if (blobStore != null) {
             while (jarCount < minReplicationCount &&
-                    codeCount < minReplicationCount &&
-                    confCount < minReplicationCount) {
+                   codeCount < minReplicationCount &&
+                   confCount < minReplicationCount) {
                 if (maxWaitTime > 0 && totalWaitTime > maxWaitTime) {
                     LOG.info("desired replication count of {} not achieved but 
we have hit the max wait time {}"
-                            + " so moving on with replication count for conf 
key = {} for code key = {} for jar key = ",
-                            minReplicationCount, maxWaitTime, confCount, 
codeCount, jarCount);
+                             + " so moving on with replication count for conf 
key = {} for code key = {} for jar key = ",
+                             minReplicationCount, maxWaitTime, confCount, 
codeCount, jarCount);
                     return;
                 }
                 LOG.debug("Checking if I am still the leader");
@@ -1456,10 +1495,10 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             }
         }
         LOG.info("desired replication count {} achieved, 
current-replication-count for conf key = {},"
-                + " current-replication-count for code key = {}, 
current-replication-count for jar key = {}", 
-                minReplicationCount, confCount, codeCount, jarCount);
+                 + " current-replication-count for code key = {}, 
current-replication-count for jar key = {}",
+                 minReplicationCount, confCount, codeCount, jarCount);
     }
-    
+
     private TopologyDetails readTopologyDetails(String topoId, StormBase base) 
throws KeyNotFoundException,
         AuthorizationException, IOException, InvalidTopologyException {
         assert (base != null);
@@ -1473,44 +1512,49 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         }
         Map<List<Integer>, String> rawExecToComponent = 
computeExecutorToComponent(topoId, base, topoConf, topo);
         Map<ExecutorDetails, String> executorsToComponent = new HashMap<>();
-        for (Entry<List<Integer>, String> entry: 
rawExecToComponent.entrySet()) {
+        for (Entry<List<Integer>, String> entry : 
rawExecToComponent.entrySet()) {
             List<Integer> execs = entry.getKey();
             ExecutorDetails execDetails = new ExecutorDetails(execs.get(0), 
execs.get(1));
             executorsToComponent.put(execDetails, entry.getValue());
         }
-        
+
         return new TopologyDetails(topoId, topoConf, topo, 
base.get_num_workers(), executorsToComponent, base.get_launch_time_secs(),
-            base.get_owner());
+                                   base.get_owner());
     }
 
     private void updateHeartbeatsFromZkHeartbeat(String topoId, 
Set<List<Integer>> allExecutors, Assignment existingAssignment) {
         LOG.debug("Updating heartbeats for {} {} (from ZK heartbeat)", topoId, 
allExecutors);
         IStormClusterState state = stormClusterState;
-        Map<List<Integer>, Map<String, Object>> executorBeats = 
StatsUtil.convertExecutorBeats(state.executorBeats(topoId, 
existingAssignment.get_executor_node_port()));
+        Map<List<Integer>, Map<String, Object>> executorBeats =
+            StatsUtil.convertExecutorBeats(state.executorBeats(topoId, 
existingAssignment.get_executor_node_port()));
         Map<List<Integer>, Map<String, Object>> cache = 
StatsUtil.updateHeartbeatCacheFromZkHeartbeat(heartbeatsCache.get().get(topoId),
-                executorBeats, allExecutors, 
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
+                                                                               
                       executorBeats, allExecutors,
+                                                                               
                       ObjectReader.getInt(conf.get(
+                                                                               
                           DaemonConfig
+                                                                               
                               .NIMBUS_TASK_TIMEOUT_SECS)));
         heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache));
     }
 
     private void updateHeartbeats(String topoId, Set<List<Integer>> 
allExecutors, Assignment existingAssignment) {
         LOG.debug("Updating heartbeats for {} {}", topoId, allExecutors);
         Map<List<Integer>, Map<String, Object>> cache = 
heartbeatsCache.get().get(topoId);
-        if(cache == null) {
+        if (cache == null) {
             cache = new HashMap<>();
             heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache));
         }
         StatsUtil.updateHeartbeatCache(heartbeatsCache.get().get(topoId),
-            null, allExecutors, 
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
+                                       null, allExecutors, 
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
     }
 
     /**
      * Update all the heartbeats for all the topologies' executors.
+     *
      * @param existingAssignments current assignments (thrift)
      * @param topologyToExecutors topology ID to executors.
      */
     private void updateAllHeartbeats(Map<String, Assignment> 
existingAssignments,
-        Map<String, Set<List<Integer>>> topologyToExecutors, Set<String> 
zkHeartbeatTopologies) {
-        for (Entry<String, Assignment> entry: existingAssignments.entrySet()) {
+                                     Map<String, Set<List<Integer>>> 
topologyToExecutors, Set<String> zkHeartbeatTopologies) {
+        for (Entry<String, Assignment> entry : existingAssignments.entrySet()) 
{
             String topoId = entry.getKey();
             if (zkHeartbeatTopologies.contains(topoId)) {
                 updateHeartbeatsFromZkHeartbeat(topoId, 
topologyToExecutors.get(topoId), entry.getValue());
@@ -1524,42 +1568,43 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         Map<List<Integer>, Map<String, Object>> executorBeats = 
StatsUtil.convertWorkerBeats(workerHeartbeat);
         String topoId = workerHeartbeat.get_storm_id();
         Map<List<Integer>, Map<String, Object>> cache = 
heartbeatsCache.get().get(topoId);
-        if(cache == null) {
+        if (cache == null) {
             cache = new HashMap<>();
             heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache));
         }
         Set<List<Integer>> executors = new HashSet<>();
-        for(ExecutorInfo executorInfo : workerHeartbeat.get_executors()) {
+        for (ExecutorInfo executorInfo : workerHeartbeat.get_executors()) {
             executors.add(Arrays.asList(executorInfo.get_task_start(), 
executorInfo.get_task_end()));
         }
 
         StatsUtil.updateHeartbeatCache(heartbeatsCache.get().get(topoId), 
executorBeats, executors,
-                
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
+                                       
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
 
     }
 
     private void 
updateCachedHeartbeatsFromSupervisor(SupervisorWorkerHeartbeats 
workerHeartbeats) {
         
workerHeartbeats.get_worker_heartbeats().forEach(this::updateCachedHeartbeatsFromWorker);
-        if(!heartbeatsReadyFlag.get() && 
!Strings.isNullOrEmpty(workerHeartbeats.get_supervisor_id())) {
+        if (!heartbeatsReadyFlag.get() && 
!Strings.isNullOrEmpty(workerHeartbeats.get_supervisor_id())) {
             
heartbeatsRecoveryStrategy.reportNodeId(workerHeartbeats.get_supervisor_id());
         }
     }
 
     /**
-     * Decide if the heartbeats is recovered for a master, will wait for all 
the assignments nodes to recovery,
-     * every node will take care its node heartbeats reporting.
+     * Decide if the heartbeats is recovered for a master, will wait for all 
the assignments nodes to recovery, every node will take care
+     * its node heartbeats reporting.
+     *
      * @return true if all nodes have reported heartbeats or exceeds 
max-time-out
      */
     private boolean isHeartbeatsRecovered() {
-        if(heartbeatsReadyFlag.get()) {
+        if (heartbeatsReadyFlag.get()) {
             return true;
         }
         Set<String> allNodes = new HashSet<>();
-        for(Map.Entry<String, Assignment> assignmentEntry: 
stormClusterState.assignmentsInfo().entrySet()) {
+        for (Map.Entry<String, Assignment> assignmentEntry : 
stormClusterState.assignmentsInfo().entrySet()) {
             
allNodes.addAll(assignmentEntry.getValue().get_node_host().keySet());
         }
         boolean isReady = heartbeatsRecoveryStrategy.isReady(allNodes);
-        if(isReady) {
+        if (isReady) {
             heartbeatsReadyFlag.getAndSet(true);
         }
         return isReady;
@@ -1567,12 +1612,13 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
 
     /**
      * Decide if the assignments is synchronized.
+     *
      * @return true if assignments have been synchronized from remote state 
store
      */
     private boolean isAssignmentsRecovered() {
         return stormClusterState.isAssignmentsBackendSynchronized();
     }
-    
+
     private Set<List<Integer>> aliveExecutors(TopologyDetails td, 
Set<List<Integer>> allExecutors, Assignment assignment) {
         String topoId = td.getId();
         Map<List<Integer>, Map<String, Object>> hbCache = 
heartbeatsCache.get().get(topoId);
@@ -1581,13 +1627,13 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             hbCache = new HashMap<>();
         }
         LOG.debug("NEW  Computing alive executors for {}\nExecutors: 
{}\nAssignment: {}\nHeartbeat cache: {}",
-                topoId, allExecutors, assignment, hbCache);
-        
+                  topoId, allExecutors, assignment, hbCache);
+
         int taskLaunchSecs = 
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_LAUNCH_SECS));
         Set<List<Integer>> ret = new HashSet<>();
         Map<List<Long>, Long> execToStartTimes = 
assignment.get_executor_start_time_secs();
 
-        for (List<Integer> exec: allExecutors) {
+        for (List<Integer> exec : allExecutors) {
             List<Long> longExec = new ArrayList<Long>(exec.size());
             for (Integer num : exec) {
                 longExec.add(num.longValue());
@@ -1596,7 +1642,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             Long startTime = execToStartTimes.get(longExec);
             Map<String, Object> executorCache = 
hbCache.get(StatsUtil.convertExecutor(longExec));
             //null isTimedOut means worker never reported any heartbeat
-            Boolean isTimedOut = executorCache == null ? null : 
(Boolean)executorCache.get("is-timed-out");
+            Boolean isTimedOut = executorCache == null ? null : (Boolean) 
executorCache.get("is-timed-out");
             Integer delta = startTime == null ? null : 
Time.deltaSecs(startTime.intValue());
             if (startTime != null && ((delta < taskLaunchSecs) || (isTimedOut 
!= null && !isTimedOut))) {
                 ret.add(exec);
@@ -1606,9 +1652,9 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         }
         return ret;
     }
-    
+
     private List<List<Integer>> computeExecutors(String topoId, StormBase 
base, Map<String, Object> topoConf,
-        StormTopology topology)
+                                                 StormTopology topology)
         throws KeyNotFoundException, AuthorizationException, IOException, 
InvalidTopologyException {
         assert (base != null);
 
@@ -1617,13 +1663,13 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         if (compToExecutors != null) {
             Map<Integer, String> taskInfo = 
StormCommon.stormTaskInfo(topology, topoConf);
             Map<String, List<Integer>> compToTaskList = 
Utils.reverseMap(taskInfo);
-            for (Entry<String, List<Integer>> entry: 
compToTaskList.entrySet()) {
+            for (Entry<String, List<Integer>> entry : 
compToTaskList.entrySet()) {
                 List<Integer> comps = entry.getValue();
                 comps.sort(null);
                 Integer numExecutors = compToExecutors.get(entry.getKey());
                 if (numExecutors != null) {
                     List<List<Integer>> partitioned = 
Utils.partitionFixed(numExecutors, comps);
-                    for (List<I

<TRUNCATED>

Reply via email to