http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index 6581e04..6d40db9 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -24,7 +24,6 @@ import com.codahale.metrics.Meter; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; - import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -57,7 +56,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.security.auth.Subject; - import org.apache.curator.framework.CuratorFramework; import org.apache.storm.Config; import org.apache.storm.Constants; @@ -144,11 +142,11 @@ import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.metric.api.DataPoint; import org.apache.storm.metric.api.IClusterMetricsConsumer; import org.apache.storm.metric.api.IClusterMetricsConsumer.ClusterInfo; -import org.apache.storm.nimbus.AssignmentDistributionService; import org.apache.storm.metricstore.AggLevel; import org.apache.storm.metricstore.Metric; import org.apache.storm.metricstore.MetricStore; import org.apache.storm.metricstore.MetricStoreConfig; +import org.apache.storm.nimbus.AssignmentDistributionService; import org.apache.storm.nimbus.DefaultTopologyValidator; import org.apache.storm.nimbus.ILeaderElector; import org.apache.storm.nimbus.ITopologyActionNotifierPlugin; @@ -211,8 +209,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Nimbus implements Iface, Shutdownable, DaemonCommon { + @VisibleForTesting + public static final List<ACL> ZK_ACLS = Arrays.asList(ZooDefs.Ids.CREATOR_ALL_ACL.get(0), + new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE, ZooDefs.Ids.ANYONE_ID_UNSAFE)); + public static final SimpleVersion MIN_VERSION_SUPPORT_RPC_HEARTBEAT = new SimpleVersion("2.0.0"); private static final Logger LOG = LoggerFactory.getLogger(Nimbus.class); - // Metrics private static final Meter submitTopologyWithOptsCalls = StormMetricsRegistry.registerMeter("nimbus:num-submitTopologyWithOpts-calls"); private static final Meter submitTopologyCalls = StormMetricsRegistry.registerMeter("nimbus:num-submitTopology-calls"); @@ -224,7 +225,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private static final Meter debugCalls = StormMetricsRegistry.registerMeter("nimbus:num-debug-calls"); private static final Meter setWorkerProfilerCalls = StormMetricsRegistry.registerMeter("nimbus:num-setWorkerProfiler-calls"); private static final Meter getComponentPendingProfileActionsCalls = StormMetricsRegistry.registerMeter( - "nimbus:num-getComponentPendingProfileActions-calls"); + "nimbus:num-getComponentPendingProfileActions-calls"); private static final Meter setLogConfigCalls = StormMetricsRegistry.registerMeter("nimbus:num-setLogConfig-calls"); private static final Meter uploadNewCredentialsCalls = StormMetricsRegistry.registerMeter("nimbus:num-uploadNewCredentials-calls"); private static final Meter beginFileUploadCalls = StormMetricsRegistry.registerMeter("nimbus:num-beginFileUpload-calls"); @@ -241,59 +242,47 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private static final Meter getLeaderCalls = StormMetricsRegistry.registerMeter("nimbus:num-getLeader-calls"); private static final Meter isTopologyNameAllowedCalls = StormMetricsRegistry.registerMeter("nimbus:num-isTopologyNameAllowed-calls"); private static final Meter getTopologyInfoWithOptsCalls = StormMetricsRegistry.registerMeter( - "nimbus:num-getTopologyInfoWithOpts-calls"); + "nimbus:num-getTopologyInfoWithOpts-calls"); private static final Meter getTopologyInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyInfo-calls"); private static final Meter getTopologyPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyPageInfo-calls"); private static final Meter getSupervisorPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getSupervisorPageInfo-calls"); private static final Meter getComponentPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getComponentPageInfo-calls"); private static final Histogram scheduleTopologyTimeMs = StormMetricsRegistry.registerHistogram("nimbus:time-scheduleTopology-ms", - new ExponentiallyDecayingReservoir()); + new ExponentiallyDecayingReservoir()); private static final Meter getOwnerResourceSummariesCalls = StormMetricsRegistry.registerMeter( - "nimbus:num-getOwnerResourceSummaries-calls"); + "nimbus:num-getOwnerResourceSummaries-calls"); + // END Metrics private static final Meter shutdownCalls = StormMetricsRegistry.registerMeter("nimbus:num-shutdown-calls"); private static final Meter processWorkerMetricsCalls = StormMetricsRegistry.registerMeter("nimbus:process-worker-metric-calls"); - // END Metrics - private static final String STORM_VERSION = VersionInfo.getVersion(); - - @VisibleForTesting - public static final List<ACL> ZK_ACLS = Arrays.asList(ZooDefs.Ids.CREATOR_ALL_ACL.get(0), - new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE, ZooDefs.Ids.ANYONE_ID_UNSAFE)); - - public static final SimpleVersion MIN_VERSION_SUPPORT_RPC_HEARTBEAT = new SimpleVersion("2.0.0"); - - private static List<ACL> getNimbusAcls(Map<String, Object> conf) { - List<ACL> acls = null; - if (Utils.isZkAuthenticationConfiguredStormServer(conf)) { - acls = ZK_ACLS; - } - return acls; - } - private static final Subject NIMBUS_SUBJECT = new Subject(); - - static { - NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal()); - NIMBUS_SUBJECT.setReadOnly(); - } - - // TOPOLOGY STATE TRANSITIONS - private static StormBase make(TopologyStatus status) { - StormBase ret = new StormBase(); - ret.set_status(status); - //The following are required for backwards compatibility with clojure code - ret.set_component_executors(Collections.emptyMap()); - ret.set_component_debug(Collections.emptyMap()); - return ret; - } - private static final TopologyStateTransition NOOP_TRANSITION = (arg, nimbus, topoId, base) -> null; private static final TopologyStateTransition INACTIVE_TRANSITION = (arg, nimbus, topoId, base) -> Nimbus.make(TopologyStatus.INACTIVE); private static final TopologyStateTransition ACTIVE_TRANSITION = (arg, nimbus, topoId, base) -> Nimbus.make(TopologyStatus.ACTIVE); + private static final TopologyStateTransition REMOVE_TRANSITION = (args, nimbus, topoId, base) -> { + LOG.info("Killing topology: {}", topoId); + IStormClusterState state = nimbus.getStormClusterState(); + Assignment oldAssignment = state.assignmentInfo(topoId, null); + state.removeStorm(topoId); + notifySupervisorsAsKilled(state, oldAssignment, nimbus.getAssignmentsDistributer()); + BlobStore store = nimbus.getBlobStore(); + if (store instanceof LocalFsBlobStore) { + for (String key : Nimbus.getKeyListFromId(nimbus.getConf(), topoId)) { + state.removeBlobstoreKey(key); + state.removeKeyVersion(key); + } + } + nimbus.getHeartbeatsCache().getAndUpdate(new Dissoc<>(topoId)); + return null; + }; + private static final TopologyStateTransition DO_REBALANCE_TRANSITION = (args, nimbus, topoId, base) -> { + nimbus.doRebalance(topoId, base); + return Nimbus.make(base.get_prev_status()); + }; private static final TopologyStateTransition KILL_TRANSITION = (killTime, nimbus, topoId, base) -> { int delay = 0; if (killTime != null) { - delay = ((Number)killTime).intValue(); + delay = ((Number) killTime).intValue(); } else { delay = ObjectReader.getInt(Nimbus.readTopoConf(topoId, nimbus.getTopoCache()).get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)); } @@ -309,7 +298,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { sb.set_component_debug(Collections.emptyMap()); return sb; }; - + private static final TopologyStateTransition REBALANCE_TRANSITION = (args, nimbus, topoId, base) -> { RebalanceOptions rbo = ((RebalanceOptions) args).deepCopy(); int delay = 0; @@ -319,12 +308,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { delay = ObjectReader.getInt(Nimbus.readTopoConf(topoId, nimbus.getTopoCache()).get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)); } nimbus.delayEvent(topoId, delay, TopologyActions.DO_REBALANCE, null); - + rbo.set_wait_secs(delay); if (!rbo.is_set_num_executors()) { rbo.set_num_executors(Collections.emptyMap()); } - + StormBase sb = new StormBase(); sb.set_status(TopologyStatus.REBALANCING); sb.set_prev_status(base.get_status()); @@ -333,175 +322,228 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { sb.set_topology_action_options(tao); sb.set_component_executors(Collections.emptyMap()); sb.set_component_debug(Collections.emptyMap()); - + return sb; }; - private static final TopologyStateTransition STARTUP_WHEN_KILLED_TRANSITION = (args, nimbus, topoId, base) -> { int delay = base.get_topology_action_options().get_kill_options().get_wait_secs(); nimbus.delayEvent(topoId, delay, TopologyActions.REMOVE, null); return null; }; - - private static final TopologyStateTransition REMOVE_TRANSITION = (args, nimbus, topoId, base) -> { - LOG.info("Killing topology: {}", topoId); - IStormClusterState state = nimbus.getStormClusterState(); - Assignment oldAssignment = state.assignmentInfo(topoId, null); - state.removeStorm(topoId); - notifySupervisorsAsKilled(state, oldAssignment, nimbus.getAssignmentsDistributer()); - BlobStore store = nimbus.getBlobStore(); - if (store instanceof LocalFsBlobStore) { - for (String key: Nimbus.getKeyListFromId(nimbus.getConf(), topoId)) { - state.removeBlobstoreKey(key); - state.removeKeyVersion(key); - } - } - nimbus.getHeartbeatsCache().getAndUpdate(new Dissoc<>(topoId)); - return null; - }; - private static final TopologyStateTransition STARTUP_WHEN_REBALANCING_TRANSITION = (args, nimbus, topoId, base) -> { int delay = base.get_topology_action_options().get_rebalance_options().get_wait_secs(); nimbus.delayEvent(topoId, delay, TopologyActions.DO_REBALANCE, null); return null; }; - - private static final TopologyStateTransition DO_REBALANCE_TRANSITION = (args, nimbus, topoId, base) -> { - nimbus.doRebalance(topoId, base); - return Nimbus.make(base.get_prev_status()); - }; - private static final Map<TopologyStatus, Map<TopologyActions, TopologyStateTransition>> TOPO_STATE_TRANSITIONS = - new ImmutableMap.Builder<TopologyStatus, Map<TopologyActions, TopologyStateTransition>>() + new ImmutableMap.Builder<TopologyStatus, Map<TopologyActions, TopologyStateTransition>>() .put(TopologyStatus.ACTIVE, new ImmutableMap.Builder<TopologyActions, TopologyStateTransition>() - .put(TopologyActions.INACTIVATE, INACTIVE_TRANSITION) - .put(TopologyActions.ACTIVATE, NOOP_TRANSITION) - .put(TopologyActions.REBALANCE, REBALANCE_TRANSITION) - .put(TopologyActions.KILL, KILL_TRANSITION) - .build()) + .put(TopologyActions.INACTIVATE, INACTIVE_TRANSITION) + .put(TopologyActions.ACTIVATE, NOOP_TRANSITION) + .put(TopologyActions.REBALANCE, REBALANCE_TRANSITION) + .put(TopologyActions.KILL, KILL_TRANSITION) + .build()) .put(TopologyStatus.INACTIVE, new ImmutableMap.Builder<TopologyActions, TopologyStateTransition>() - .put(TopologyActions.ACTIVATE, ACTIVE_TRANSITION) - .put(TopologyActions.INACTIVATE, NOOP_TRANSITION) - .put(TopologyActions.REBALANCE, REBALANCE_TRANSITION) - .put(TopologyActions.KILL, KILL_TRANSITION) - .build()) + .put(TopologyActions.ACTIVATE, ACTIVE_TRANSITION) + .put(TopologyActions.INACTIVATE, NOOP_TRANSITION) + .put(TopologyActions.REBALANCE, REBALANCE_TRANSITION) + .put(TopologyActions.KILL, KILL_TRANSITION) + .build()) .put(TopologyStatus.KILLED, new ImmutableMap.Builder<TopologyActions, TopologyStateTransition>() - .put(TopologyActions.STARTUP, STARTUP_WHEN_KILLED_TRANSITION) - .put(TopologyActions.KILL, KILL_TRANSITION) - .put(TopologyActions.REMOVE, REMOVE_TRANSITION) - .build()) + .put(TopologyActions.STARTUP, STARTUP_WHEN_KILLED_TRANSITION) + .put(TopologyActions.KILL, KILL_TRANSITION) + .put(TopologyActions.REMOVE, REMOVE_TRANSITION) + .build()) .put(TopologyStatus.REBALANCING, new ImmutableMap.Builder<TopologyActions, TopologyStateTransition>() - .put(TopologyActions.STARTUP, STARTUP_WHEN_REBALANCING_TRANSITION) - .put(TopologyActions.KILL, KILL_TRANSITION) - .put(TopologyActions.DO_REBALANCE, DO_REBALANCE_TRANSITION) - .build()) + .put(TopologyActions.STARTUP, STARTUP_WHEN_REBALANCING_TRANSITION) + .put(TopologyActions.KILL, KILL_TRANSITION) + .put(TopologyActions.DO_REBALANCE, DO_REBALANCE_TRANSITION) + .build()) .build(); - + private static final List<String> EMPTY_STRING_LIST = Collections.unmodifiableList(Collections.emptyList()); + private static final Set<String> EMPTY_STRING_SET = Collections.unmodifiableSet(Collections.emptySet()); + private static final Pattern TOPOLOGY_NAME_REGEX = Pattern.compile("^[^/.:\\\\]+$"); + // END TOPOLOGY STATE TRANSITIONS - - private static final class Assoc<K,V> implements UnaryOperator<Map<K, V>> { - private final K key; - private final V value; - - public Assoc(K key, V value) { - this.key = key; - this.value = value; - } - - @Override - public Map<K, V> apply(Map<K, V> t) { - Map<K, V> ret = new HashMap<>(t); - ret.put(key, value); - return ret; - } + + static { + NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal()); + NIMBUS_SUBJECT.setReadOnly(); } - - private static final class Dissoc<K,V> implements UnaryOperator<Map<K, V>> { - private final K key; - - public Dissoc(K key) { - this.key = key; - } - - @Override - public Map<K, V> apply(Map<K, V> t) { - Map<K, V> ret = new HashMap<>(t); - ret.remove(key); - return ret; - } + + private final Map<String, Object> conf; + private final NavigableMap<SimpleVersion, List<String>> supervisorClasspaths; + private final NimbusInfo nimbusHostPortInfo; + private final INimbus inimbus; + private final IAuthorizer impersonationAuthorizationHandler; + private final AtomicLong submittedCount; + private final IStormClusterState stormClusterState; + private final Object submitLock = new Object(); + private final Object schedLock = new Object(); + private final Object credUpdateLock = new Object(); + private final AtomicReference<Map<String, Map<List<Integer>, Map<String, Object>>>> heartbeatsCache; + private final AtomicBoolean heartbeatsReadyFlag; + private final IWorkerHeartbeatsRecoveryStrategy heartbeatsRecoveryStrategy; + @SuppressWarnings("deprecation") + private final TimeCacheMap<String, BufferInputStream> downloaders; + @SuppressWarnings("deprecation") + private final TimeCacheMap<String, WritableByteChannel> uploaders; + private final BlobStore blobStore; + private final TopoCache topoCache; + @SuppressWarnings("deprecation") + private final TimeCacheMap<String, BufferInputStream> blobDownloaders; + @SuppressWarnings("deprecation") + private final TimeCacheMap<String, OutputStream> blobUploaders; + @SuppressWarnings("deprecation") + private final TimeCacheMap<String, Iterator<String>> blobListers; + private final UptimeComputer uptime; + private final ITopologyValidator validator; + private final StormTimer timer; + private final IScheduler scheduler; + private final IScheduler underlyingScheduler; + private final ILeaderElector leaderElector; + private final AssignmentDistributionService assignmentsDistributer; + private final AtomicReference<Map<String, String>> idToSchedStatus; + private final AtomicReference<Map<String, SupervisorResources>> nodeIdToResources; + private final AtomicReference<Map<String, TopologyResources>> idToResources; + private final AtomicReference<Map<String, Map<WorkerSlot, WorkerResources>>> idToWorkerResources; + private final Collection<ICredentialsRenewer> credRenewers; + private final Object topologyHistoryLock; + private final LocalState topologyHistoryState; + private final Collection<INimbusCredentialPlugin> nimbusAutocredPlugins; + private final ITopologyActionNotifierPlugin nimbusTopologyActionNotifier; + private final List<ClusterMetricsConsumerExecutor> clusterConsumerExceutors; + private final IGroupMappingServiceProvider groupMapper; + private final IPrincipalToLocal principalToLocal; + private MetricStore metricsStore; + private IAuthorizer authorizationHandler; + //Cached CuratorFramework, mainly used for BlobStore. + private CuratorFramework zkClient; + //May be null if worker tokens are not supported by the thrift transport. + private WorkerTokenManager workerTokenManager; + + public Nimbus(Map<String, Object> conf, INimbus inimbus) throws Exception { + this(conf, inimbus, null, null, null, null, null); } - - @VisibleForTesting - public static class StandaloneINimbus implements INimbus { - @Override - public void prepare(Map<String, Object> topoConf, String schedulerLocalDir) { - //NOOP - } + public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo, + BlobStore blobStore, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper) throws Exception { + this(conf, inimbus, stormClusterState, hostPortInfo, blobStore, null, leaderElector, groupMapper); + } - @SuppressWarnings("unchecked") - @Override - public Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> supervisors, - Topologies topologies, Set<String> topologiesMissingAssignments) { - Set<WorkerSlot> ret = new HashSet<>(); - for (SupervisorDetails sd: supervisors) { - String id = sd.getId(); - for (Number port: (Collection<Number>)sd.getMeta()) { - ret.add(new WorkerSlot(id, port)); - } - } - return ret; + public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo, + BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper) + throws Exception { + this.conf = conf; + + this.metricsStore = null; + try { + this.metricsStore = MetricStoreConfig.configure(conf); + } catch (Exception e) { + // the metrics store is not critical to the operation of the cluster, allow Nimbus to come up + LOG.error("Failed to initialize metric store", e); } - @Override - public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) { - //NOOP + if (hostPortInfo == null) { + hostPortInfo = NimbusInfo.fromConf(conf); + } + this.nimbusHostPortInfo = hostPortInfo; + if (inimbus != null) { + inimbus.prepare(conf, ServerConfigUtils.masterInimbusDir(conf)); } - @Override - public String getHostName(Map<String, SupervisorDetails> supervisors, String nodeId) { - SupervisorDetails sd = supervisors.get(nodeId); - if (sd != null) { - return sd.getHost(); - } - return null; + this.inimbus = inimbus; + this.authorizationHandler = StormCommon.mkAuthorizationHandler((String) conf.get(DaemonConfig.NIMBUS_AUTHORIZER), conf); + this.impersonationAuthorizationHandler = + StormCommon.mkAuthorizationHandler((String) conf.get(DaemonConfig.NIMBUS_IMPERSONATION_AUTHORIZER), conf); + this.submittedCount = new AtomicLong(0); + if (stormClusterState == null) { + stormClusterState = makeStormClusterState(conf); + } + this.stormClusterState = stormClusterState; + this.heartbeatsCache = new AtomicReference<>(new HashMap<>()); + this.heartbeatsReadyFlag = new AtomicBoolean(false); + this.heartbeatsRecoveryStrategy = WorkerHeartbeatsRecoveryStrategyFactory.getStrategy(conf); + this.downloaders = fileCacheMap(conf); + this.uploaders = fileCacheMap(conf); + if (blobStore == null) { + blobStore = ServerUtils.getNimbusBlobStore(conf, this.nimbusHostPortInfo); + } + this.blobStore = blobStore; + if (topoCache == null) { + topoCache = new TopoCache(blobStore, conf); + } + this.topoCache = topoCache; + this.blobDownloaders = makeBlobCacheMap(conf); + this.blobUploaders = makeBlobCacheMap(conf); + this.blobListers = makeBlobListCacheMap(conf); + this.uptime = Utils.makeUptimeComputer(); + this.validator = ReflectionUtils + .newInstance((String) conf.getOrDefault(DaemonConfig.NIMBUS_TOPOLOGY_VALIDATOR, DefaultTopologyValidator.class.getName())); + this.timer = new StormTimer(null, (t, e) -> { + LOG.error("Error while processing event", e); + Utils.exitProcess(20, "Error while processing event"); + }); + this.underlyingScheduler = makeScheduler(conf, inimbus); + this.scheduler = wrapAsBlacklistScheduler(conf, underlyingScheduler); + this.zkClient = makeZKClient(conf); + if (leaderElector == null) { + leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, blobStore, topoCache, stormClusterState, getNimbusAcls(conf)); } + this.leaderElector = leaderElector; + this.assignmentsDistributer = AssignmentDistributionService.getInstance(conf); + this.idToSchedStatus = new AtomicReference<>(new HashMap<>()); + this.nodeIdToResources = new AtomicReference<>(new HashMap<>()); + this.idToResources = new AtomicReference<>(new HashMap<>()); + this.idToWorkerResources = new AtomicReference<>(new HashMap<>()); + this.credRenewers = AuthUtils.GetCredentialRenewers(conf); + this.topologyHistoryLock = new Object(); + this.topologyHistoryState = ServerConfigUtils.nimbusTopoHistoryState(conf); + this.nimbusAutocredPlugins = AuthUtils.getNimbusAutoCredPlugins(conf); + this.nimbusTopologyActionNotifier = createTopologyActionNotifier(conf); + this.clusterConsumerExceutors = makeClusterMetricsConsumerExecutors(conf); + if (groupMapper == null) { + groupMapper = AuthUtils.GetGroupMappingServiceProviderPlugin(conf); + } + this.groupMapper = groupMapper; + this.principalToLocal = AuthUtils.GetPrincipalToLocalPlugin(conf); + this.supervisorClasspaths = Collections.unmodifiableNavigableMap( + Utils.getConfiguredClasspathVersions(conf, EMPTY_STRING_LIST));// We don't use the classpath part of this, so just an empty list + } - @Override - public IScheduler getForcedScheduler() { - return null; + private static List<ACL> getNimbusAcls(Map<String, Object> conf) { + List<ACL> acls = null; + if (Utils.isZkAuthenticationConfiguredStormServer(conf)) { + acls = ZK_ACLS; } - + return acls; } - - private static class CommonTopoInfo { - public Map<String, Object> topoConf; - public String topoName; - public StormTopology topology; - public Map<Integer, String> taskToComponent; - public StormBase base; - public int launchTimeSecs; - public Assignment assignment; - public Map<List<Integer>, Map<String, Object>> beats; - public HashSet<String> allComponents; + // TOPOLOGY STATE TRANSITIONS + private static StormBase make(TopologyStatus status) { + StormBase ret = new StormBase(); + ret.set_status(status); + //The following are required for backwards compatibility with clojure code + ret.set_component_executors(Collections.emptyMap()); + ret.set_component_debug(Collections.emptyMap()); + return ret; } - + @SuppressWarnings("deprecation") private static <T extends AutoCloseable> TimeCacheMap<String, T> fileCacheMap(Map<String, Object> conf) { return new TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_FILE_COPY_EXPIRATION_SECS), 600), - (id, stream) -> { - try { - stream.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + (id, stream) -> { + try { + stream.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } private static <K, V> Map<K, V> mapDiff(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> second) { Map<K, V> ret = new HashMap<>(); - for (Entry<? extends K, ? extends V> entry: second.entrySet()) { + for (Entry<? extends K, ? extends V> entry : second.entrySet()) { if (!entry.getValue().equals(first.get(entry.getKey()))) { ret.put(entry.getKey(), entry.getValue()); } @@ -531,30 +573,31 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } /** - * Constructs a TimeCacheMap instance with a blob store timeout whose - * expiration callback invokes cancel on the value held by an expired entry when - * that value is an AtomicOutputStream and calls close otherwise. + * Constructs a TimeCacheMap instance with a blob store timeout whose expiration callback invokes cancel on the value held by an expired + * entry when that value is an AtomicOutputStream and calls close otherwise. + * * @param conf the config to use * @return the newly created map */ @SuppressWarnings("deprecation") private static <T extends AutoCloseable> TimeCacheMap<String, T> makeBlobCacheMap(Map<String, Object> conf) { return new TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_BLOBSTORE_EXPIRATION_SECS), 600), - (id, stream) -> { - try { - if (stream instanceof AtomicOutputStream) { - ((AtomicOutputStream) stream).cancel(); - } else { - stream.close(); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + (id, stream) -> { + try { + if (stream instanceof AtomicOutputStream) { + ((AtomicOutputStream) stream).cancel(); + } else { + stream.close(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } - + /** * Constructs a TimeCacheMap instance with a blobstore timeout and no callback function. + * * @param conf the config to use * @return the newly created TimeCacheMap */ @@ -562,7 +605,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private static TimeCacheMap<String, Iterator<String>> makeBlobListCacheMap(Map<String, Object> conf) { return new TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_BLOBSTORE_EXPIRATION_SECS), 600)); } - + private static ITopologyActionNotifierPlugin createTopologyActionNotifier(Map<String, Object> conf) { String clazz = (String) conf.get(DaemonConfig.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN); ITopologyActionNotifierPlugin ret = null; @@ -577,7 +620,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } return ret; } - + @SuppressWarnings("unchecked") private static List<ClusterMetricsConsumerExecutor> makeClusterMetricsConsumerExecutors(Map<String, Object> conf) { Collection<Map<String, Object>> consumers = (Collection<Map<String, Object>>) conf.get( @@ -590,16 +633,16 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } return ret; } - + private static Subject getSubject() { return ReqContext.context().subject(); } - + static Map<String, Object> readTopoConf(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException { return tc.readTopoConf(topoId, getSubject()); } - + static List<String> getKeyListFromId(Map<String, Object> conf, String id) { List<String> ret = new ArrayList<>(3); ret.add(ConfigUtils.masterStormCodeKey(id)); @@ -609,56 +652,57 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } return ret; } - + private static int getVersionForKey(String key, NimbusInfo nimbusInfo, - CuratorFramework zkClient) throws KeyNotFoundException { + CuratorFramework zkClient) throws KeyNotFoundException { KeySequenceNumber kseq = new KeySequenceNumber(key, nimbusInfo); return kseq.getKeySequenceNumber(zkClient); } - + private static StormTopology readStormTopology(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException { return tc.readTopology(topoId, getSubject()); } - + private static Map<String, Object> readTopoConfAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException { return tc.readTopoConf(topoId, NIMBUS_SUBJECT); } - + private static StormTopology readStormTopologyAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException { return tc.readTopology(topoId, NIMBUS_SUBJECT); } - + /** - * convert {topology-id -> SchedulerAssignment} to - * {topology-id -> {executor [node port]}}. + * convert {topology-id -> SchedulerAssignment} to {topology-id -> {executor [node port]}}. + * * @return {topology-id -> {executor [node port]}} mapping */ - private static Map<String, Map<List<Long>, List<Object>>> computeTopoToExecToNodePort(Map<String, SchedulerAssignment> schedAssignments) { + private static Map<String, Map<List<Long>, List<Object>>> computeTopoToExecToNodePort( + Map<String, SchedulerAssignment> schedAssignments) { Map<String, Map<List<Long>, List<Object>>> ret = new HashMap<>(); - for (Entry<String, SchedulerAssignment> schedEntry: schedAssignments.entrySet()) { + for (Entry<String, SchedulerAssignment> schedEntry : schedAssignments.entrySet()) { Map<List<Long>, List<Object>> execToNodePort = new HashMap<>(); - for (Entry<ExecutorDetails, WorkerSlot> execAndNodePort: schedEntry.getValue().getExecutorToSlot().entrySet()) { + for (Entry<ExecutorDetails, WorkerSlot> execAndNodePort : schedEntry.getValue().getExecutorToSlot().entrySet()) { ExecutorDetails exec = execAndNodePort.getKey(); WorkerSlot slot = execAndNodePort.getValue(); - + List<Long> listExec = new ArrayList<>(2); listExec.add((long) exec.getStartTask()); listExec.add((long) exec.getEndTask()); - + List<Object> nodePort = new ArrayList<>(2); nodePort.add(slot.getNodeId()); - nodePort.add((long)slot.getPort()); - + nodePort.add((long) slot.getPort()); + execToNodePort.put(listExec, nodePort); } ret.put(schedEntry.getKey(), execToNodePort); } return ret; } - + private static int numUsedWorkers(SchedulerAssignment assignment) { if (assignment == null) { return 0; @@ -667,8 +711,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } /** - * Convert {topology-id -> SchedulerAssignment} to {topology-id -> {WorkerSlot WorkerResources}}. Make sure this - * can deal with other non-RAS schedulers later we may further support map-for-any-resources. + * Convert {topology-id -> SchedulerAssignment} to {topology-id -> {WorkerSlot WorkerResources}}. Make sure this can deal with other + * non-RAS schedulers later we may further support map-for-any-resources. * * @param schedAssignments the assignments * @return The resources used per slot @@ -687,7 +731,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { Map<String, Map<List<Long>, List<Object>>> ret = computeTopoToExecToNodePort(schedAssignments); // Print some useful information if (existingAssignments != null && !existingAssignments.isEmpty()) { - for (Entry<String, Map<List<Long>, List<Object>>> entry: ret.entrySet()) { + for (Entry<String, Map<List<Long>, List<Object>>> entry : ret.entrySet()) { String topoId = entry.getKey(); Map<List<Long>, List<Object>> execToNodePort = entry.getValue(); Assignment assignment = existingAssignments.get(topoId); @@ -696,12 +740,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } Map<List<Long>, NodeInfo> old = assignment.get_executor_node_port(); Map<List<Long>, List<Object>> reassigned = new HashMap<>(); - for (Entry<List<Long>, List<Object>> execAndNodePort: execToNodePort.entrySet()) { + for (Entry<List<Long>, List<Object>> execAndNodePort : execToNodePort.entrySet()) { NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); String node = (String) execAndNodePort.getValue().get(0); Long port = (Long) execAndNodePort.getValue().get(1); - if (oldAssigned == null || !oldAssigned.get_node().equals(node) - || !port.equals(oldAssigned.get_port_iterator().next())) { + if (oldAssigned == null || !oldAssigned.get_node().equals(node) + || !port.equals(oldAssigned.get_port_iterator().next())) { reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); } } @@ -716,12 +760,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } return ret; } - + private static List<List<Long>> changedExecutors(Map<List<Long>, NodeInfo> map, Map<List<Long>, List<Object>> newExecToNodePort) { HashMap<NodeInfo, List<List<Long>>> tmpSlotAssigned = map == null ? new HashMap<>() : Utils.reverseMap(map); HashMap<List<Object>, List<List<Long>>> slotAssigned = new HashMap<>(); - for (Entry<NodeInfo, List<List<Long>>> entry: tmpSlotAssigned.entrySet()) { + for (Entry<NodeInfo, List<List<Long>>> entry : tmpSlotAssigned.entrySet()) { NodeInfo ni = entry.getKey(); List<Object> key = new ArrayList<>(2); key.add(ni.get_node()); @@ -731,16 +775,16 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { slotAssigned.put(key, value); } HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap<>() : - Utils.reverseMap(newExecToNodePort); + Utils.reverseMap(newExecToNodePort); HashMap<List<Object>, List<List<Long>>> newSlotAssigned = new HashMap<>(); - for (Entry<List<Object>, List<List<Long>>> entry: tmpNewSlotAssigned.entrySet()) { + for (Entry<List<Object>, List<List<Long>>> entry : tmpNewSlotAssigned.entrySet()) { List<List<Long>> value = new ArrayList<>(entry.getValue()); value.sort((a, b) -> a.get(0).compareTo(b.get(0))); newSlotAssigned.put(entry.getKey(), value); } Map<List<Object>, List<List<Long>>> diff = mapDiff(slotAssigned, newSlotAssigned); List<List<Long>> ret = new ArrayList<>(); - for (List<List<Long>> val: diff.values()) { + for (List<List<Long>> val : diff.values()) { ret.addAll(val); } return ret; @@ -751,27 +795,27 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { Set<NodeInfo> niRet = new HashSet<>(current.get_executor_node_port().values()); niRet.removeAll(oldSlots); Set<WorkerSlot> ret = new HashSet<>(); - for (NodeInfo ni: niRet) { + for (NodeInfo ni : niRet) { ret.add(new WorkerSlot(ni.get_node(), ni.get_port_iterator().next())); } return ret; } - + private static Map<String, SupervisorDetails> basicSupervisorDetailsMap(IStormClusterState state) { Map<String, SupervisorDetails> ret = new HashMap<>(); - for (Entry<String, SupervisorInfo> entry: state.allSupervisorInfo().entrySet()) { + for (Entry<String, SupervisorInfo> entry : state.allSupervisorInfo().entrySet()) { String id = entry.getKey(); SupervisorInfo info = entry.getValue(); ret.put(id, new SupervisorDetails(id, info.get_server_port(), info.get_hostname(), - info.get_scheduler_meta(), null, info.get_resources_map())); + info.get_scheduler_meta(), null, info.get_resources_map())); } return ret; } - + private static boolean isTopologyActive(IStormClusterState state, String topoName) { return state.getTopoId(topoName).isPresent(); } - + private static Map<String, Object> tryReadTopoConf(String topoId, TopoCache tc) throws NotAliveException, AuthorizationException, IOException { try { @@ -785,10 +829,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { throw new NotAliveException(topoId); } } - - private static final List<String> EMPTY_STRING_LIST = Collections.unmodifiableList(Collections.emptyList()); - private static final Set<String> EMPTY_STRING_SET = Collections.unmodifiableSet(Collections.emptySet()); - + @VisibleForTesting public static Set<String> topoIdsToClean(IStormClusterState state, BlobStore store) { Set<String> ret = new HashSet<>(); @@ -800,7 +841,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { ret.removeAll(Utils.OR(state.activeStorms(), EMPTY_STRING_LIST)); return ret; } - + private static String extractStatusStr(StormBase base) { String ret = null; if (base != null) { @@ -815,7 +856,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private static StormTopology normalizeTopology(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException { StormTopology ret = topology.deepCopy(); - for (Object comp: StormCommon.allComponents(ret).values()) { + for (Object comp : StormCommon.allComponents(ret).values()) { Map<String, Object> mergedConf = StormCommon.componentConf(comp); mergedConf.put(Config.TOPOLOGY_TASKS, ServerUtils.getComponentParallelism(topoConf, comp)); String jsonConf = JSONValue.toJSONString(mergedConf); @@ -823,47 +864,47 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } return ret; } - + private static void addToDecorators(Set<String> decorators, List<String> conf) { if (conf != null) { decorators.addAll(conf); } } - + @SuppressWarnings("unchecked") private static void addToSerializers(Map<String, String> ser, List<Object> conf) { if (conf != null) { - for (Object o: conf) { + for (Object o : conf) { if (o instanceof Map) { - ser.putAll((Map<String,String>)o); + ser.putAll((Map<String, String>) o); } else { - ser.put((String)o, null); + ser.put((String) o, null); } } } } - + @SuppressWarnings("unchecked") - private static Map<String, Object> normalizeConf(Map<String,Object> conf, Map<String, Object> topoConf, StormTopology topology) { + private static Map<String, Object> normalizeConf(Map<String, Object> conf, Map<String, Object> topoConf, StormTopology topology) { //ensure that serializations are same for all tasks no matter what's on // the supervisors. this also allows you to declare the serializations as a sequence List<Map<String, Object>> allConfs = new ArrayList<>(); - for (Object comp: StormCommon.allComponents(topology).values()) { + for (Object comp : StormCommon.allComponents(topology).values()) { allConfs.add(StormCommon.componentConf(comp)); } Set<String> decorators = new HashSet<>(); //Yes we are putting in a config that is not the same type we pulled out. Map<String, String> serializers = new HashMap<>(); - for (Map<String, Object> c: allConfs) { + for (Map<String, Object> c : allConfs) { addToDecorators(decorators, (List<String>) c.get(Config.TOPOLOGY_KRYO_DECORATORS)); addToSerializers(serializers, (List<Object>) c.get(Config.TOPOLOGY_KRYO_REGISTER)); } - addToDecorators(decorators, (List<String>)topoConf.getOrDefault(Config.TOPOLOGY_KRYO_DECORATORS, - conf.get(Config.TOPOLOGY_KRYO_DECORATORS))); - addToSerializers(serializers, (List<Object>)topoConf.getOrDefault(Config.TOPOLOGY_KRYO_REGISTER, - conf.get(Config.TOPOLOGY_KRYO_REGISTER))); - + addToDecorators(decorators, (List<String>) topoConf.getOrDefault(Config.TOPOLOGY_KRYO_DECORATORS, + conf.get(Config.TOPOLOGY_KRYO_DECORATORS))); + addToSerializers(serializers, (List<Object>) topoConf.getOrDefault(Config.TOPOLOGY_KRYO_REGISTER, + conf.get(Config.TOPOLOGY_KRYO_REGISTER))); + Map<String, Object> mergedConf = Utils.merge(conf, topoConf); Map<String, Object> ret = new HashMap<>(topoConf); ret.put(Config.TOPOLOGY_KRYO_REGISTER, serializers); @@ -873,7 +914,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, mergedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM)); return ret; } - + private static void rmBlobKey(BlobStore store, String key, IStormClusterState state) { try { store.deleteBlob(key, NIMBUS_SUBJECT); @@ -885,10 +926,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { LOG.info("Exception {}", e); } } - + /** * Deletes jar files in dirLoc older than seconds. - * @param dirLoc the location to look in for file + * + * @param dirLoc the location to look in for file * @param seconds how old is too old and should be deleted */ @VisibleForTesting @@ -904,12 +946,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } } } - + private static ExecutorInfo toExecInfo(List<Long> exec) { return new ExecutorInfo(exec.get(0).intValue(), exec.get(1).intValue()); } - - private static final Pattern TOPOLOGY_NAME_REGEX = Pattern.compile("^[^/.:\\\\]+$"); private static void validateTopologyName(String name) throws InvalidTopologyException { Matcher m = TOPOLOGY_NAME_REGEX.matcher(name); @@ -919,16 +959,16 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } private static StormTopology tryReadTopology(String topoId, TopoCache tc) - throws NotAliveException, AuthorizationException, IOException { + throws NotAliveException, AuthorizationException, IOException { try { return readStormTopologyAsNimbus(topoId, tc); } catch (KeyNotFoundException e) { throw new NotAliveException(topoId); } } - + private static void validateTopologySize(Map<String, Object> topoConf, Map<String, Object> nimbusConf, - StormTopology topology) throws InvalidTopologyException { + StormTopology topology) throws InvalidTopologyException { int workerCount = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_WORKERS), 1); Integer allowedWorkers = ObjectReader.getInt(nimbusConf.get(DaemonConfig.NIMBUS_SLOTS_PER_TOPOLOGY), null); int executorsCount = 0; @@ -938,15 +978,15 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { Integer allowedExecutors = ObjectReader.getInt(nimbusConf.get(DaemonConfig.NIMBUS_EXECUTORS_PER_TOPOLOGY), null); if (allowedExecutors != null && executorsCount > allowedExecutors) { throw new InvalidTopologyException("Failed to submit topology. Topology requests more than " + - allowedExecutors + " executors."); + allowedExecutors + " executors."); } - + if (allowedWorkers != null && workerCount > allowedWorkers) { throw new InvalidTopologyException("Failed to submit topology. Topology requests more than " + - allowedWorkers + " workers."); + allowedWorkers + " workers."); } } - + private static void setLoggerTimeouts(LogLevel level) { int timeoutSecs = level.get_reset_log_level_timeout_secs(); if (timeoutSecs > 0) { @@ -955,45 +995,45 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { level.unset_reset_log_level_timeout_epoch(); } } - + @VisibleForTesting public static List<String> topologiesOnSupervisor(Map<String, Assignment> assignments, String supervisorId) { Set<String> ret = new HashSet<>(); - for (Entry<String, Assignment> entry: assignments.entrySet()) { + for (Entry<String, Assignment> entry : assignments.entrySet()) { Assignment assignment = entry.getValue(); - for (NodeInfo nodeInfo: assignment.get_executor_node_port().values()) { + for (NodeInfo nodeInfo : assignment.get_executor_node_port().values()) { if (supervisorId.equals(nodeInfo.get_node())) { ret.add(entry.getKey()); break; } } } - + return new ArrayList<>(ret); } - + private static IClusterMetricsConsumer.ClusterInfo mkClusterInfo() { return new IClusterMetricsConsumer.ClusterInfo(Time.currentTimeSecs()); } - + private static List<DataPoint> extractClusterMetrics(ClusterSummary summ) { List<DataPoint> ret = new ArrayList<>(); ret.add(new DataPoint("supervisors", summ.get_supervisors_size())); ret.add(new DataPoint("topologies", summ.get_topologies_size())); - + int totalSlots = 0; int usedSlots = 0; - for (SupervisorSummary sup: summ.get_supervisors()) { + for (SupervisorSummary sup : summ.get_supervisors()) { usedSlots += sup.get_num_used_workers(); totalSlots += sup.get_num_workers(); } ret.add(new DataPoint("slotsTotal", totalSlots)); ret.add(new DataPoint("slotsUsed", usedSlots)); ret.add(new DataPoint("slotsFree", totalSlots - usedSlots)); - + int totalExecutors = 0; int totalTasks = 0; - for (TopologySummary topo: summ.get_topologies()) { + for (TopologySummary topo : summ.get_topologies()) { totalExecutors += topo.get_num_executors(); totalTasks += topo.get_num_tasks(); } @@ -1004,8 +1044,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private static Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> extractSupervisorMetrics(ClusterSummary summ) { Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> ret = new HashMap<>(); - for (SupervisorSummary sup: summ.get_supervisors()) { - IClusterMetricsConsumer.SupervisorInfo info = new IClusterMetricsConsumer.SupervisorInfo(sup.get_host(), sup.get_supervisor_id(), Time.currentTimeSecs()); + for (SupervisorSummary sup : summ.get_supervisors()) { + IClusterMetricsConsumer.SupervisorInfo info = + new IClusterMetricsConsumer.SupervisorInfo(sup.get_host(), sup.get_supervisor_id(), Time.currentTimeSecs()); List<DataPoint> metrics = new ArrayList<>(); metrics.add(new DataPoint("slotsTotal", sup.get_num_workers())); metrics.add(new DataPoint("slotsUsed", sup.get_num_used_workers())); @@ -1017,7 +1058,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } return ret; } - + private static void setResourcesDefaultIfNotSet(Map<String, NormalizedResourceRequest> compResourcesMap, String compId, Map<String, Object> topoConf) { NormalizedResourceRequest resources = compResourcesMap.get(compId); @@ -1025,7 +1066,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { compResourcesMap.put(compId, new NormalizedResourceRequest(topoConf)); } } - + private static void validatePortAvailable(Map<String, Object> conf) throws IOException { int port = ObjectReader.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)); try (ServerSocket socket = new ServerSocket(port)) { @@ -1035,7 +1076,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { System.exit(0); } } - + private static Nimbus launchServer(Map<String, Object> conf, INimbus inimbus) throws Exception { StormCommon.validateDistributedMode(conf); validatePortAvailable(conf); @@ -1056,7 +1097,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { public static Nimbus launch(INimbus inimbus) throws Exception { Map<String, Object> conf = Utils.merge(Utils.readStormConfig(), - ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false)); + ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false)); boolean fixupAcl = (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP); boolean checkAcl = fixupAcl || (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK); if (checkAcl) { @@ -1064,168 +1105,165 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } return launchServer(conf, inimbus); } - + public static void main(String[] args) throws Exception { Utils.setupDefaultUncaughtExceptionHandler(); launch(new StandaloneINimbus()); } - - private final Map<String, Object> conf; - private MetricStore metricsStore; - private final NavigableMap<SimpleVersion, List<String>> supervisorClasspaths; - private final NimbusInfo nimbusHostPortInfo; - private final INimbus inimbus; - private IAuthorizer authorizationHandler; - private final IAuthorizer impersonationAuthorizationHandler; - private final AtomicLong submittedCount; - //Cached CuratorFramework, mainly used for BlobStore. - private CuratorFramework zkClient; - private final IStormClusterState stormClusterState; - private final Object submitLock = new Object(); - private final Object schedLock = new Object(); - private final Object credUpdateLock = new Object(); - private final AtomicReference<Map<String, Map<List<Integer>, Map<String, Object>>>> heartbeatsCache; - private final AtomicBoolean heartbeatsReadyFlag; - private final IWorkerHeartbeatsRecoveryStrategy heartbeatsRecoveryStrategy; - @SuppressWarnings("deprecation") - private final TimeCacheMap<String, BufferInputStream> downloaders; - @SuppressWarnings("deprecation") - private final TimeCacheMap<String, WritableByteChannel> uploaders; - private final BlobStore blobStore; - private final TopoCache topoCache; - @SuppressWarnings("deprecation") - private final TimeCacheMap<String, BufferInputStream> blobDownloaders; - @SuppressWarnings("deprecation") - private final TimeCacheMap<String, OutputStream> blobUploaders; - @SuppressWarnings("deprecation") - private final TimeCacheMap<String, Iterator<String>> blobListers; - private final UptimeComputer uptime; - private final ITopologyValidator validator; - private final StormTimer timer; - private final IScheduler scheduler; - private final IScheduler underlyingScheduler; - private final ILeaderElector leaderElector; - private final AssignmentDistributionService assignmentsDistributer; - private final AtomicReference<Map<String, String>> idToSchedStatus; - private final AtomicReference<Map<String, SupervisorResources>> nodeIdToResources; - private final AtomicReference<Map<String, TopologyResources>> idToResources; - private final AtomicReference<Map<String, Map<WorkerSlot, WorkerResources>>> idToWorkerResources; - private final Collection<ICredentialsRenewer> credRenewers; - private final Object topologyHistoryLock; - private final LocalState topologyHistoryState; - private final Collection<INimbusCredentialPlugin> nimbusAutocredPlugins; - private final ITopologyActionNotifierPlugin nimbusTopologyActionNotifier; - private final List<ClusterMetricsConsumerExecutor> clusterConsumerExceutors; - private final IGroupMappingServiceProvider groupMapper; - private final IPrincipalToLocal principalToLocal; - //May be null if worker tokens are not supported by the thrift transport. - private WorkerTokenManager workerTokenManager; - private static CuratorFramework makeZKClient(Map<String, Object> conf) { - List<String> servers = (List<String>)conf.get(Config.STORM_ZOOKEEPER_SERVERS); - Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); - String root = (String)conf.get(Config.STORM_ZOOKEEPER_ROOT); - CuratorFramework ret = null; - if (servers != null && port != null) { - ret = ClientZookeeper.mkClient(conf, servers, port, root, new DefaultWatcherCallBack(), conf, DaemonType.NIMBUS); + private static CuratorFramework makeZKClient(Map<String, Object> conf) { + List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); + Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); + String root = (String) conf.get(Config.STORM_ZOOKEEPER_ROOT); + CuratorFramework ret = null; + if (servers != null && port != null) { + ret = ClientZookeeper.mkClient(conf, servers, port, root, new DefaultWatcherCallBack(), conf, DaemonType.NIMBUS); + } + return ret; + } + + private static IStormClusterState makeStormClusterState(Map<String, Object> conf) throws Exception { + return ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf)); + } + + private static List<Integer> asIntExec(List<Long> exec) { + List<Integer> ret = new ArrayList<>(2); + ret.add(exec.get(0).intValue()); + ret.add(exec.get(1).intValue()); + return ret; + } + + /** + * Diff old/new assignment to find nodes which assigned assignments has changed. + * + * @param oldAss old assigned assignment + * @param newAss new assigned assignment + * @return nodeId -> host map of assignments changed nodes + */ + private static Map<String, String> assignmentChangedNodes(Assignment oldAss, Assignment newAss) { + Map<List<Long>, NodeInfo> oldExecutorNodePort = null; + Map<List<Long>, NodeInfo> newExecutorNodePort = null; + Map<String, String> allNodeHost = new HashMap<>(); + if (oldAss != null) { + oldExecutorNodePort = oldAss.get_executor_node_port(); + allNodeHost.putAll(oldAss.get_node_host()); + } + if (newAss != null) { + newExecutorNodePort = newAss.get_executor_node_port(); + allNodeHost.putAll(newAss.get_node_host()); + } + //kill or newly submit + if (oldAss == null || newAss == null) { + return allNodeHost; + } else { + // rebalance + Map<String, String> ret = new HashMap(); + for (Map.Entry<List<Long>, NodeInfo> entry : newExecutorNodePort.entrySet()) { + NodeInfo newNodeInfo = entry.getValue(); + NodeInfo oldNodeInfo = oldExecutorNodePort.get(entry.getKey()); + if (null != oldNodeInfo) { + if (!oldNodeInfo.equals(newNodeInfo)) { + ret.put(oldNodeInfo.get_node(), allNodeHost.get(oldNodeInfo.get_node())); + ret.put(newNodeInfo.get_node(), allNodeHost.get(newNodeInfo.get_node())); + } + } else { + ret.put(newNodeInfo.get_node(), allNodeHost.get(newNodeInfo.get_node())); + } + } + + return ret; } - return ret; - } - - private static IStormClusterState makeStormClusterState(Map<String, Object> conf) throws Exception { - return ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf)); } - - public Nimbus(Map<String, Object> conf, INimbus inimbus) throws Exception { - this(conf, inimbus, null, null, null, null, null); + + /** + * Pick out assignments for specific node from all assignments. + * + * @param assignmentMap stormId -> assignment map + * @param nodeId supervisor/node id + * @return stormId -> assignment map for the node + */ + private static Map<String, Assignment> assignmentsForNode(Map<String, Assignment> assignmentMap, String nodeId) { + Map<String, Assignment> ret = new HashMap<>(); + assignmentMap.entrySet().stream().filter(assignmentEntry -> assignmentEntry.getValue().get_node_host().keySet() + .contains(nodeId)) + .forEach(assignmentEntry -> { ret.put(assignmentEntry.getKey(), assignmentEntry.getValue()); }); + + return ret; } - - public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo, - BlobStore blobStore, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper) throws Exception { - this(conf, inimbus, stormClusterState, hostPortInfo, blobStore, null, leaderElector, groupMapper); + + /** + * Notify supervisors/nodes assigned assignments. + * + * @param assignments assignments map for nodes + * @param service {@link AssignmentDistributionService} for distributing assignments asynchronous + * @param nodeHost node -> host map + * @param supervisorDetails nodeId -> {@link SupervisorDetails} map + */ + private static void notifySupervisorsAssignments(Map<String, Assignment> assignments, + AssignmentDistributionService service, Map<String, String> nodeHost, + Map<String, SupervisorDetails> supervisorDetails) { + for (Map.Entry<String, String> nodeEntry : nodeHost.entrySet()) { + try { + String nodeId = nodeEntry.getKey(); + SupervisorAssignments supervisorAssignments = new SupervisorAssignments(); + supervisorAssignments.set_storm_assignment(assignmentsForNode(assignments, nodeEntry.getKey())); + SupervisorDetails details = supervisorDetails.get(nodeId); + Integer serverPort = details != null ? details.getServerPort() : null; + service.addAssignmentsForNode(nodeId, nodeEntry.getValue(), serverPort, supervisorAssignments); + } catch (Throwable tr1) { + //just skip when any error happens wait for next round assignments reassign + LOG.error("Exception when add assignments distribution task for node {}", nodeEntry.getKey()); + } + } } - public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo, - BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper) - throws Exception { - this.conf = conf; + private static void notifySupervisorsAsKilled(IStormClusterState clusterState, Assignment oldAss, + AssignmentDistributionService service) { + Map<String, String> nodeHost = assignmentChangedNodes(oldAss, null); + notifySupervisorsAssignments(clusterState.assignmentsInfo(), service, nodeHost, + basicSupervisorDetailsMap(clusterState)); + } - this.metricsStore = null; - try { - this.metricsStore = MetricStoreConfig.configure(conf); - } catch (Exception e) { - // the metrics store is not critical to the operation of the cluster, allow Nimbus to come up - LOG.error("Failed to initialize metric store", e); + @VisibleForTesting + static void validateTopologyWorkerMaxHeapSizeConfigs( + Map<String, Object> stormConf, StormTopology topology, double defaultWorkerMaxHeapSizeMB) { + double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, stormConf); + double topologyWorkerMaxHeapSize = + ObjectReader.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB), defaultWorkerMaxHeapSizeMB); + if (topologyWorkerMaxHeapSize < largestMemReq) { + throw new IllegalArgumentException( + "Topology will not be able to be successfully scheduled: Config " + + "TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=" + + topologyWorkerMaxHeapSize + + " < " + largestMemReq + " (Largest memory requirement of a component in the topology)." + + " Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount"); } + } - if (hostPortInfo == null) { - hostPortInfo = NimbusInfo.fromConf(conf); - } - this.nimbusHostPortInfo = hostPortInfo; - if (inimbus != null) { - inimbus.prepare(conf, ServerConfigUtils.masterInimbusDir(conf)); - } - - this.inimbus = inimbus; - this.authorizationHandler = StormCommon.mkAuthorizationHandler((String) conf.get(DaemonConfig.NIMBUS_AUTHORIZER), conf); - this.impersonationAuthorizationHandler = StormCommon.mkAuthorizationHandler((String) conf.get(DaemonConfig.NIMBUS_IMPERSONATION_AUTHORIZER), conf); - this.submittedCount = new AtomicLong(0); - if (stormClusterState == null) { - stormClusterState = makeStormClusterState(conf); - } - this.stormClusterState = stormClusterState; - this.heartbeatsCache = new AtomicReference<>(new HashMap<>()); - this.heartbeatsReadyFlag = new AtomicBoolean(false); - this.heartbeatsRecoveryStrategy = WorkerHeartbeatsRecoveryStrategyFactory.getStrategy(conf); - this.downloaders = fileCacheMap(conf); - this.uploaders = fileCacheMap(conf); - if (blobStore == null) { - blobStore = ServerUtils.getNimbusBlobStore(conf, this.nimbusHostPortInfo); - } - this.blobStore = blobStore; - if (topoCache == null) { - topoCache = new TopoCache(blobStore, conf); - } - this.topoCache = topoCache; - this.blobDownloaders = makeBlobCacheMap(conf); - this.blobUploaders = makeBlobCacheMap(conf); - this.blobListers = makeBlobListCacheMap(conf); - this.uptime = Utils.makeUptimeComputer(); - this.validator = ReflectionUtils.newInstance((String) conf.getOrDefault(DaemonConfig.NIMBUS_TOPOLOGY_VALIDATOR, DefaultTopologyValidator.class.getName())); - this.timer = new StormTimer(null, (t, e) -> { - LOG.error("Error while processing event", e); - Utils.exitProcess(20, "Error while processing event"); - }); - this.underlyingScheduler = makeScheduler(conf, inimbus); - this.scheduler = wrapAsBlacklistScheduler(conf, underlyingScheduler); - this.zkClient = makeZKClient(conf); - if (leaderElector == null) { - leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, blobStore, topoCache, stormClusterState, getNimbusAcls(conf)); + private static double getMaxExecutorMemoryUsageForTopo( + StormTopology topology, Map<String, Object> topologyConf) { + double largestMemoryOperator = 0.0; + for (NormalizedResourceRequest entry : + ResourceUtils.getBoltsResources(topology, topologyConf).values()) { + double memoryRequirement = entry.getTotalMemoryMb(); + if (memoryRequirement > largestMemoryOperator) { + largestMemoryOperator = memoryRequirement; + } } - this.leaderElector = leaderElector; - this.assignmentsDistributer = AssignmentDistributionService.getInstance(conf); - this.idToSchedStatus = new AtomicReference<>(new HashMap<>()); - this.nodeIdToResources = new AtomicReference<>(new HashMap<>()); - this.idToResources = new AtomicReference<>(new HashMap<>()); - this.idToWorkerResources = new AtomicReference<>(new HashMap<>()); - this.credRenewers = AuthUtils.GetCredentialRenewers(conf); - this.topologyHistoryLock = new Object(); - this.topologyHistoryState = ServerConfigUtils.nimbusTopoHistoryState(conf); - this.nimbusAutocredPlugins = AuthUtils.getNimbusAutoCredPlugins(conf); - this.nimbusTopologyActionNotifier = createTopologyActionNotifier(conf); - this.clusterConsumerExceutors = makeClusterMetricsConsumerExecutors(conf); - if (groupMapper == null) { - groupMapper = AuthUtils.GetGroupMappingServiceProviderPlugin(conf); + for (NormalizedResourceRequest entry : + ResourceUtils.getSpoutsResources(topology, topologyConf).values()) { + double memoryRequirement = entry.getTotalMemoryMb(); + if (memoryRequirement > largestMemoryOperator) { + largestMemoryOperator = memoryRequirement; + } } - this.groupMapper = groupMapper; - this.principalToLocal = AuthUtils.GetPrincipalToLocalPlugin(conf); - this.supervisorClasspaths = Collections.unmodifiableNavigableMap(Utils.getConfiguredClasspathVersions(conf, EMPTY_STRING_LIST));// We don't use the classpath part of this, so just an empty list + return largestMemoryOperator; } Map<String, Object> getConf() { return conf; } - + @VisibleForTesting public void setAuthorizationHandler(IAuthorizer authorizationHandler) { this.authorizationHandler = authorizationHandler; @@ -1238,9 +1276,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private AssignmentDistributionService getAssignmentsDistributer() { return assignmentsDistributer; } - + @VisibleForTesting - public AtomicReference<Map<String,Map<List<Integer>,Map<String,Object>>>> getHeartbeatsCache() { + public AtomicReference<Map<String, Map<List<Integer>, Map<String, Object>>>> getHeartbeatsCache() { return heartbeatsCache; } @@ -1262,26 +1300,27 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private boolean isLeader() throws Exception { return leaderElector.isLeader(); } - + private void assertIsLeader() throws Exception { if (!isLeader()) { NimbusInfo leaderAddress = leaderElector.getLeader(); throw new RuntimeException("not a leader, current leader is " + leaderAddress); } } - + private String getInbox() throws IOException { return ServerConfigUtils.masterInbox(conf); } /** * Used for local cluster. + * * @param supervisor {@link org.apache.storm.daemon.supervisor.Supervisor} */ public void addSupervisor(org.apache.storm.daemon.supervisor.Supervisor supervisor) { assignmentsDistributer.addLocalSupervisor(supervisor); } - + void delayEvent(String topoId, int delaySecs, TopologyActions event, Object args) { LOG.info("Delaying event {} for {} secs for {}", event, delaySecs, topoId); timer.schedule(delaySecs, () -> { @@ -1298,11 +1337,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { StormBase updated = new StormBase(); updated.set_topology_action_options(null); updated.set_component_debug(Collections.emptyMap()); - + if (rbo.is_set_num_executors()) { updated.set_component_executors(rbo.get_num_executors()); } - + if (rbo.is_set_num_workers()) { updated.set_num_workers(rbo.get_num_workers()); } @@ -1310,12 +1349,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { updateBlobStore(topoId, rbo, ServerUtils.principalNameToSubject(rbo.get_principal())); mkAssignments(topoId); } - + private String toTopoId(String topoName) throws NotAliveException { return stormClusterState.getTopoId(topoName) - .orElseThrow(() -> new NotAliveException(topoName + " is not alive")); + .orElseThrow(() -> new NotAliveException(topoName + " is not alive")); } - + private void transitionName(String topoName, TopologyActions event, Object eventArg, boolean errorOnNoTransition) throws Exception { transition(toTopoId(topoName), event, eventArg, errorOnNoTransition); } @@ -1323,12 +1362,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private void transition(String topoId, TopologyActions event, Object eventArg) throws Exception { transition(topoId, event, eventArg, false); } - + private void transition(String topoId, TopologyActions event, Object eventArg, boolean errorOnNoTransition) throws Exception { LOG.info("TRANSITION: {} {} {} {}", topoId, event, eventArg, errorOnNoTransition); assertIsLeader(); - synchronized(submitLock) { + synchronized (submitLock) { IStormClusterState clusterState = stormClusterState; StormBase base = clusterState.stormBase(topoId, null); if (base == null || base.get_status() == null) { @@ -1341,7 +1380,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { if (errorOnNoTransition) { throw new RuntimeException(message); } - + if (TopologyActions.STARTUP != event) { //STARTUP is a system event so don't log an issue LOG.info(message); @@ -1355,9 +1394,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } } } - + private void setupStormCode(Map<String, Object> conf, String topoId, String tmpJarLocation, - Map<String, Object> topoConf, StormTopology topology) throws Exception { + Map<String, Object> topoConf, StormTopology topology) throws Exception { Subject subject = getSubject(); IStormClusterState clusterState = stormClusterState; BlobStore store = blobStore; @@ -1398,7 +1437,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { throws AuthorizationException, IOException, KeyNotFoundException { Map<String, Object> topoConf = new HashMap<>(topoCache.readTopoConf(topoId, subject)); //Copy the data topoConf.putAll(configOverride); - topoCache.updateTopoConf(topoId, subject, topoConf); + topoCache.updateTopoConf(topoId, subject, topoConf); } private void updateBlobStore(String topoId, RebalanceOptions rbo, Subject subject) @@ -1412,7 +1451,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { updateTopologyConf(topoId, Utils.parseJson(confOverride), subject); } } - + private Integer getBlobReplicationCount(String key) throws Exception { BlobStore store = blobStore; if (store != null) { @@ -1420,7 +1459,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } return null; } - + private void waitForDesiredCodeReplication(Map<String, Object> topoConf, String topoId) throws Exception { int minReplicationCount = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MIN_REPLICATION_COUNT)); int maxWaitTime = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_REPLICATION_WAIT_TIME_SEC)); @@ -1434,12 +1473,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { //When is this ever null? if (blobStore != null) { while (jarCount < minReplicationCount && - codeCount < minReplicationCount && - confCount < minReplicationCount) { + codeCount < minReplicationCount && + confCount < minReplicationCount) { if (maxWaitTime > 0 && totalWaitTime > maxWaitTime) { LOG.info("desired replication count of {} not achieved but we have hit the max wait time {}" - + " so moving on with replication count for conf key = {} for code key = {} for jar key = ", - minReplicationCount, maxWaitTime, confCount, codeCount, jarCount); + + " so moving on with replication count for conf key = {} for code key = {} for jar key = ", + minReplicationCount, maxWaitTime, confCount, codeCount, jarCount); return; } LOG.debug("Checking if I am still the leader"); @@ -1456,10 +1495,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } } LOG.info("desired replication count {} achieved, current-replication-count for conf key = {}," - + " current-replication-count for code key = {}, current-replication-count for jar key = {}", - minReplicationCount, confCount, codeCount, jarCount); + + " current-replication-count for code key = {}, current-replication-count for jar key = {}", + minReplicationCount, confCount, codeCount, jarCount); } - + private TopologyDetails readTopologyDetails(String topoId, StormBase base) throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException { assert (base != null); @@ -1473,44 +1512,49 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } Map<List<Integer>, String> rawExecToComponent = computeExecutorToComponent(topoId, base, topoConf, topo); Map<ExecutorDetails, String> executorsToComponent = new HashMap<>(); - for (Entry<List<Integer>, String> entry: rawExecToComponent.entrySet()) { + for (Entry<List<Integer>, String> entry : rawExecToComponent.entrySet()) { List<Integer> execs = entry.getKey(); ExecutorDetails execDetails = new ExecutorDetails(execs.get(0), execs.get(1)); executorsToComponent.put(execDetails, entry.getValue()); } - + return new TopologyDetails(topoId, topoConf, topo, base.get_num_workers(), executorsToComponent, base.get_launch_time_secs(), - base.get_owner()); + base.get_owner()); } private void updateHeartbeatsFromZkHeartbeat(String topoId, Set<List<Integer>> allExecutors, Assignment existingAssignment) { LOG.debug("Updating heartbeats for {} {} (from ZK heartbeat)", topoId, allExecutors); IStormClusterState state = stormClusterState; - Map<List<Integer>, Map<String, Object>> executorBeats = StatsUtil.convertExecutorBeats(state.executorBeats(topoId, existingAssignment.get_executor_node_port())); + Map<List<Integer>, Map<String, Object>> executorBeats = + StatsUtil.convertExecutorBeats(state.executorBeats(topoId, existingAssignment.get_executor_node_port())); Map<List<Integer>, Map<String, Object>> cache = StatsUtil.updateHeartbeatCacheFromZkHeartbeat(heartbeatsCache.get().get(topoId), - executorBeats, allExecutors, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS))); + executorBeats, allExecutors, + ObjectReader.getInt(conf.get( + DaemonConfig + .NIMBUS_TASK_TIMEOUT_SECS))); heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache)); } private void updateHeartbeats(String topoId, Set<List<Integer>> allExecutors, Assignment existingAssignment) { LOG.debug("Updating heartbeats for {} {}", topoId, allExecutors); Map<List<Integer>, Map<String, Object>> cache = heartbeatsCache.get().get(topoId); - if(cache == null) { + if (cache == null) { cache = new HashMap<>(); heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache)); } StatsUtil.updateHeartbeatCache(heartbeatsCache.get().get(topoId), - null, allExecutors, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS))); + null, allExecutors, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS))); } /** * Update all the heartbeats for all the topologies' executors. + * * @param existingAssignments current assignments (thrift) * @param topologyToExecutors topology ID to executors. */ private void updateAllHeartbeats(Map<String, Assignment> existingAssignments, - Map<String, Set<List<Integer>>> topologyToExecutors, Set<String> zkHeartbeatTopologies) { - for (Entry<String, Assignment> entry: existingAssignments.entrySet()) { + Map<String, Set<List<Integer>>> topologyToExecutors, Set<String> zkHeartbeatTopologies) { + for (Entry<String, Assignment> entry : existingAssignments.entrySet()) { String topoId = entry.getKey(); if (zkHeartbeatTopologies.contains(topoId)) { updateHeartbeatsFromZkHeartbeat(topoId, topologyToExecutors.get(topoId), entry.getValue()); @@ -1524,42 +1568,43 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { Map<List<Integer>, Map<String, Object>> executorBeats = StatsUtil.convertWorkerBeats(workerHeartbeat); String topoId = workerHeartbeat.get_storm_id(); Map<List<Integer>, Map<String, Object>> cache = heartbeatsCache.get().get(topoId); - if(cache == null) { + if (cache == null) { cache = new HashMap<>(); heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache)); } Set<List<Integer>> executors = new HashSet<>(); - for(ExecutorInfo executorInfo : workerHeartbeat.get_executors()) { + for (ExecutorInfo executorInfo : workerHeartbeat.get_executors()) { executors.add(Arrays.asList(executorInfo.get_task_start(), executorInfo.get_task_end())); } StatsUtil.updateHeartbeatCache(heartbeatsCache.get().get(topoId), executorBeats, executors, - ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS))); + ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS))); } private void updateCachedHeartbeatsFromSupervisor(SupervisorWorkerHeartbeats workerHeartbeats) { workerHeartbeats.get_worker_heartbeats().forEach(this::updateCachedHeartbeatsFromWorker); - if(!heartbeatsReadyFlag.get() && !Strings.isNullOrEmpty(workerHeartbeats.get_supervisor_id())) { + if (!heartbeatsReadyFlag.get() && !Strings.isNullOrEmpty(workerHeartbeats.get_supervisor_id())) { heartbeatsRecoveryStrategy.reportNodeId(workerHeartbeats.get_supervisor_id()); } } /** - * Decide if the heartbeats is recovered for a master, will wait for all the assignments nodes to recovery, - * every node will take care its node heartbeats reporting. + * Decide if the heartbeats is recovered for a master, will wait for all the assignments nodes to recovery, every node will take care + * its node heartbeats reporting. + * * @return true if all nodes have reported heartbeats or exceeds max-time-out */ private boolean isHeartbeatsRecovered() { - if(heartbeatsReadyFlag.get()) { + if (heartbeatsReadyFlag.get()) { return true; } Set<String> allNodes = new HashSet<>(); - for(Map.Entry<String, Assignment> assignmentEntry: stormClusterState.assignmentsInfo().entrySet()) { + for (Map.Entry<String, Assignment> assignmentEntry : stormClusterState.assignmentsInfo().entrySet()) { allNodes.addAll(assignmentEntry.getValue().get_node_host().keySet()); } boolean isReady = heartbeatsRecoveryStrategy.isReady(allNodes); - if(isReady) { + if (isReady) { heartbeatsReadyFlag.getAndSet(true); } return isReady; @@ -1567,12 +1612,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { /** * Decide if the assignments is synchronized. + * * @return true if assignments have been synchronized from remote state store */ private boolean isAssignmentsRecovered() { return stormClusterState.isAssignmentsBackendSynchronized(); } - + private Set<List<Integer>> aliveExecutors(TopologyDetails td, Set<List<Integer>> allExecutors, Assignment assignment) { String topoId = td.getId(); Map<List<Integer>, Map<String, Object>> hbCache = heartbeatsCache.get().get(topoId); @@ -1581,13 +1627,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { hbCache = new HashMap<>(); } LOG.debug("NEW Computing alive executors for {}\nExecutors: {}\nAssignment: {}\nHeartbeat cache: {}", - topoId, allExecutors, assignment, hbCache); - + topoId, allExecutors, assignment, hbCache); + int taskLaunchSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_LAUNCH_SECS)); Set<List<Integer>> ret = new HashSet<>(); Map<List<Long>, Long> execToStartTimes = assignment.get_executor_start_time_secs(); - for (List<Integer> exec: allExecutors) { + for (List<Integer> exec : allExecutors) { List<Long> longExec = new ArrayList<Long>(exec.size()); for (Integer num : exec) { longExec.add(num.longValue()); @@ -1596,7 +1642,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { Long startTime = execToStartTimes.get(longExec); Map<String, Object> executorCache = hbCache.get(StatsUtil.convertExecutor(longExec)); //null isTimedOut means worker never reported any heartbeat - Boolean isTimedOut = executorCache == null ? null : (Boolean)executorCache.get("is-timed-out"); + Boolean isTimedOut = executorCache == null ? null : (Boolean) executorCache.get("is-timed-out"); Integer delta = startTime == null ? null : Time.deltaSecs(startTime.intValue()); if (startTime != null && ((delta < taskLaunchSecs) || (isTimedOut != null && !isTimedOut))) { ret.add(exec); @@ -1606,9 +1652,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } return ret; } - + private List<List<Integer>> computeExecutors(String topoId, StormBase base, Map<String, Object> topoConf, - StormTopology topology) + StormTopology topology) throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException { assert (base != null); @@ -1617,13 +1663,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { if (compToExecutors != null) { Map<Integer, String> taskInfo = StormCommon.stormTaskInfo(topology, topoConf); Map<String, List<Integer>> compToTaskList = Utils.reverseMap(taskInfo); - for (Entry<String, List<Integer>> entry: compToTaskList.entrySet()) { + for (Entry<String, List<Integer>> entry : compToTaskList.entrySet()) { List<Integer> comps = entry.getValue(); comps.sort(null); Integer numExecutors = compToExecutors.get(entry.getKey()); if (numExecutors != null) { List<List<Integer>> partitioned = Utils.partitionFixed(numExecutors, comps); - for (List<I
<TRUNCATED>
