update supervisor based on revans2 and longdafeng
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dba69b52 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dba69b52 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dba69b52 Branch: refs/heads/master Commit: dba69b528860b29fed435c85ee1f76b09f982105 Parents: d46ed8f 8be5417 Author: xiaojian.fxj <[email protected]> Authored: Thu Mar 31 09:22:13 2016 +0800 Committer: xiaojian.fxj <[email protected]> Committed: Thu Mar 31 13:24:15 2016 +0800 ---------------------------------------------------------------------- .gitignore | 33 ++++++---- CHANGELOG.md | 1 + external/flux/.gitignore | 15 ----- .../storm/daemon/supervisor/SupervisorData.java | 46 +++++++------- .../daemon/supervisor/SupervisorManager.java | 19 ++---- .../daemon/supervisor/SupervisorUtils.java | 27 +++++++-- .../daemon/supervisor/SyncProcessEvent.java | 2 +- .../daemon/supervisor/SyncSupervisorEvent.java | 41 +++++++------ .../supervisor/timer/RunProfilerActions.java | 63 +++++++++----------- .../supervisor/timer/SupervisorHealthCheck.java | 14 +---- .../workermanager/DefaultWorkerManager.java | 41 ++++++------- .../workermanager/IWorkerManager.java | 11 ++-- .../supervisor/workermanager/IWorkerResult.java | 21 ------- 13 files changed, 148 insertions(+), 186 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java index 213457d,0000000..da4102c mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java @@@ -1,234 -1,0 +1,234 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.Config; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.messaging.IContext; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.apache.zookeeper.data.ACL; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class SupervisorData { + + private static final Logger LOG = LoggerFactory.getLogger(SupervisorData.class); + + private final Map conf; + private final IContext sharedContext; + private volatile boolean active; - private ISupervisor iSupervisor; - private Utils.UptimeComputer upTime; - private String stormVersion; - private ConcurrentHashMap<String, String> workerThreadPids; // for local mode - private IStormClusterState stormClusterState; - private LocalState localState; - private String supervisorId; - private String assignmentId; - private String hostName; ++ private final ISupervisor iSupervisor; ++ private final Utils.UptimeComputer upTime; ++ private final String stormVersion; ++ private final ConcurrentHashMap<String, String> workerThreadPids; // for local mode ++ private final IStormClusterState stormClusterState; ++ private final LocalState localState; ++ private final String supervisorId; ++ private final String assignmentId; ++ private final String hostName; + // used for reporting used ports when heartbeating - private AtomicReference<Map<Long, LocalAssignment>> currAssignment; - private StormTimer heartbeatTimer; - private StormTimer eventTimer; - private StormTimer blobUpdateTimer; - private Localizer localizer; - private AtomicReference<Map<String, Map<String, Object>>> assignmentVersions; - private AtomicInteger syncRetry; ++ private final AtomicReference<Map<Long, LocalAssignment>> currAssignment; ++ private final StormTimer heartbeatTimer; ++ private final StormTimer eventTimer; ++ private final StormTimer blobUpdateTimer; ++ private final Localizer localizer; ++ private final AtomicReference<Map<String, Map<String, Object>>> assignmentVersions; ++ private final AtomicInteger syncRetry; + private final Object downloadLock = new Object(); - private AtomicReference<Map<String, List<ProfileRequest>>> stormIdToProfileActions; - private ConcurrentHashSet<String> deadWorkers; ++ private final AtomicReference<Map<String, List<ProfileRequest>>> stormIdToProfilerActions; ++ private final ConcurrentHashSet<String> deadWorkers; + private final IWorkerManager workerManager; + + public SupervisorData(Map conf, IContext sharedContext, ISupervisor iSupervisor) { + this.conf = conf; + this.sharedContext = sharedContext; + this.iSupervisor = iSupervisor; + this.active = true; + this.upTime = Utils.makeUptimeComputer(); + this.stormVersion = VersionInfo.getVersion(); + this.workerThreadPids = new ConcurrentHashMap<String, String>(); + this.deadWorkers = new ConcurrentHashSet(); + + List<ACL> acls = null; + if (Utils.isZkAuthenticationConfiguredStormServer(conf)) { + acls = SupervisorUtils.supervisorZkAcls(); + } + + try { + this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.SUPERVISOR)); + } catch (Exception e) { + LOG.error("supervisor can't create stormClusterState"); + throw Utils.wrapInRuntime(e); + } + + try { + this.localState = ConfigUtils.supervisorState(conf); + this.localizer = Utils.createLocalizer(conf, ConfigUtils.supervisorLocalDir(conf)); + } catch (IOException e) { + throw Utils.wrapInRuntime(e); + } + this.supervisorId = iSupervisor.getSupervisorId(); + this.assignmentId = iSupervisor.getAssignmentId(); + + try { + this.hostName = Utils.hostname(conf); + } catch (UnknownHostException e) { + throw Utils.wrapInRuntime(e); + } + + this.currAssignment = new AtomicReference<Map<Long, LocalAssignment>>(new HashMap<Long,LocalAssignment>()); + + this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler()); + + this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler()); + + this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler()); + + this.assignmentVersions = new AtomicReference<Map<String, Map<String, Object>>>(new HashMap<String, Map<String, Object>>()); + this.syncRetry = new AtomicInteger(0); - this.stormIdToProfileActions = new AtomicReference<Map<String, List<ProfileRequest>>>(new HashMap<String, List<ProfileRequest>>()); ++ this.stormIdToProfilerActions = new AtomicReference<Map<String, List<ProfileRequest>>>(new HashMap<String, List<ProfileRequest>>()); + this.workerManager = Utils.newInstance((String) conf.get(Config.STORM_SUPERVISOR_WORKER_MANAGER_PLUGIN)); + this.workerManager.prepareWorker(conf, localizer); + } + - public AtomicReference<Map<String, List<ProfileRequest>>> getStormIdToProfileActions() { - return stormIdToProfileActions; ++ public AtomicReference<Map<String, List<ProfileRequest>>> getStormIdToProfilerActions() { ++ return stormIdToProfilerActions; + } + - public void setStormIdToProfileActions(Map<String, List<ProfileRequest>> stormIdToProfileActions) { - this.stormIdToProfileActions.set(stormIdToProfileActions); ++ public void setStormIdToProfilerActions(Map<String, List<ProfileRequest>> stormIdToProfilerActions) { ++ this.stormIdToProfilerActions.set(stormIdToProfilerActions); + } + + public Map getConf() { + return conf; + } + + public IContext getSharedContext() { + return sharedContext; + } + + public boolean isActive() { + return active; + } + + public void setActive(boolean active) { + this.active = active; + } + + public ISupervisor getiSupervisor() { + return iSupervisor; + } + + public Utils.UptimeComputer getUpTime() { + return upTime; + } + + public String getStormVersion() { + return stormVersion; + } + + public ConcurrentHashMap<String, String> getWorkerThreadPids() { + return workerThreadPids; + } + + public IStormClusterState getStormClusterState() { + return stormClusterState; + } + + public LocalState getLocalState() { + return localState; + } + + public String getSupervisorId() { + return supervisorId; + } + + public String getAssignmentId() { + return assignmentId; + } + + public String getHostName() { + return hostName; + } + + public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() { + return currAssignment; + } + + public void setCurrAssignment(Map<Long, LocalAssignment> currAssignment) { + this.currAssignment.set(currAssignment); + } + + public StormTimer getHeartbeatTimer() { + return heartbeatTimer; + } + + public StormTimer getEventTimer() { + return eventTimer; + } + + public StormTimer getBlobUpdateTimer() { + return blobUpdateTimer; + } + + public Localizer getLocalizer() { + return localizer; + } + + public AtomicInteger getSyncRetry() { + return syncRetry; + } + + public AtomicReference<Map<String, Map<String, Object>>> getAssignmentVersions() { + return assignmentVersions; + } + + public void setAssignmentVersions(Map<String, Map<String, Object>> assignmentVersions) { + this.assignmentVersions.set(assignmentVersions); + } + + public ConcurrentHashSet getDeadWorkers() { + return deadWorkers; + } + + public IWorkerManager getWorkerManager() { + return workerManager; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java index d593d3c,0000000..70363fa mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java @@@ -1,103 -1,0 +1,92 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.daemon.DaemonCommon; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.event.EventManager; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; + +public class SupervisorManager implements SupervisorDaemon, DaemonCommon, Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(SupervisorManager.class); + private final EventManager eventManager; + private final EventManager processesEventManager; - private SupervisorData supervisorData; ++ private final SupervisorData supervisorData; + + public SupervisorManager(SupervisorData supervisorData, EventManager eventManager, EventManager processesEventManager) { + this.eventManager = eventManager; + this.supervisorData = supervisorData; + this.processesEventManager = processesEventManager; + } + + public void shutdown() { - LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId()); ++ LOG.info("Shutting down supervisor {}", supervisorData.getSupervisorId()); + supervisorData.setActive(false); + try { + supervisorData.getHeartbeatTimer().close(); + supervisorData.getEventTimer().close(); + supervisorData.getBlobUpdateTimer().close(); + eventManager.close(); + processesEventManager.close(); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + supervisorData.getStormClusterState().disconnect(); + } + + @Override + public void shutdownAllWorkers() { - Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(supervisorData.getConf()); + IWorkerManager workerManager = supervisorData.getWorkerManager(); - try { - for (String workerId : workerIds) { - workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, supervisorData.getWorkerThreadPids()); - boolean success = workerManager.cleanupWorker(workerId); - if (success){ - supervisorData.getDeadWorkers().remove(workerId); - } - } - } catch (Exception e) { - LOG.error("shutWorker failed"); - throw Utils.wrapInRuntime(e); - } ++ SupervisorUtils.shutdownAllWorkers(supervisorData.getConf(), supervisorData.getSupervisorId(), supervisorData.getWorkerThreadPids(), ++ supervisorData.getDeadWorkers(), workerManager); + } + + @Override + public Map getConf() { + return supervisorData.getConf(); + } + + @Override + public String getId() { + return supervisorData.getSupervisorId(); + } + + @Override + public boolean isWaiting() { + if (!supervisorData.isActive()) { + return true; + } + + if (supervisorData.getHeartbeatTimer().isTimerWaiting() && supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting() + && processesEventManager.waiting()) { + return true; + } + return false; + } + + public void run() { + shutdown(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java index a567956,0000000..33a8525 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java @@@ -1,271 -1,0 +1,286 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; ++import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.*; ++import java.util.concurrent.ConcurrentHashMap; + +public class SupervisorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + + private static final SupervisorUtils INSTANCE = new SupervisorUtils(); + private static SupervisorUtils _instance = INSTANCE; + public static void setInstance(SupervisorUtils u) { + _instance = u; + } + public static void resetInstance() { + _instance = INSTANCE; + } + + public static Process processLauncher(Map conf, String user, List<String> commandPrefix, List<String> args, Map<String, String> environment, final String logPreFix, + final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException { + if (StringUtils.isBlank(user)) { + throw new IllegalArgumentException("User cannot be blank when calling processLauncher."); + } + String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER)); + String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); + String wl; + if (StringUtils.isNotBlank(wlinitial)) { + wl = wlinitial; + } else { + wl = stormHome + "/bin/worker-launcher"; + } + List<String> commands = new ArrayList<>(); + if (commandPrefix != null){ + commands.addAll(commandPrefix); + } + commands.add(wl); + commands.add(user); + commands.addAll(args); + LOG.info("Running as user: {} command: {}", user, commands); + return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir); + } + + public static int processLauncherAndWait(Map conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix) + throws IOException { + int ret = 0; + Process process = processLauncher(conf, user, null, args, environment, logPreFix, null, null); + if (StringUtils.isNotBlank(logPreFix)) + Utils.readAndLogStream(logPreFix, process.getInputStream()); + try { + process.waitFor(); + } catch (InterruptedException e) { + LOG.info("{} interrupted.", logPreFix); + } + ret = process.exitValue(); + return ret; + } + + public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException { + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + String logPrefix = "setup conf for " + dir; + List<String> commands = new ArrayList<>(); + commands.add("code-dir"); + commands.add(dir); + processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix); + } + } + + public static void rmrAsUser(Map conf, String id, String path) throws IOException { + String user = Utils.getFileOwner(path); + String logPreFix = "rmr " + id; + List<String> commands = new ArrayList<>(); + commands.add("rmr"); + commands.add(path); + SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPreFix); + if (Utils.checkFileExists(path)) { + throw new RuntimeException(path + " was not deleted."); + } + } + + /** + * Given the blob information returns the value of the uncompress field, handling it either being a string or a boolean value, or if it's not specified then + * returns false + * + * @param blobInfo + * @return + */ + public static Boolean shouldUncompressBlob(Map<String, Object> blobInfo) { + return Utils.getBoolean(blobInfo.get("uncompress"), false); + } + + /** + * Returns a list of LocalResources based on the blobstore-map passed in + * + * @param blobstoreMap + * @return + */ + public static List<LocalResource> blobstoreMapToLocalresources(Map<String, Map<String, Object>> blobstoreMap) { + List<LocalResource> localResourceList = new ArrayList<>(); + if (blobstoreMap != null) { + for (Map.Entry<String, Map<String, Object>> map : blobstoreMap.entrySet()) { + LocalResource localResource = new LocalResource(map.getKey(), shouldUncompressBlob(map.getValue())); + localResourceList.add(localResource); + } + } + return localResourceList; + } + + /** + * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the cache on restart. + * + * @param localizer + * @param stormId + * @param conf + */ + public static void addBlobReferences(Localizer localizer, String stormId, Map conf) throws IOException { + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME); + List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); + if (blobstoreMap != null) { + localizer.addReferences(localresources, user, topoName); + } + } + + public static Set<String> readDownLoadedStormIds(Map conf) throws IOException { + Set<String> stormIds = new HashSet<>(); + String path = ConfigUtils.supervisorStormDistRoot(conf); + Collection<String> rets = Utils.readDirContents(path); + for (String ret : rets) { + stormIds.add(URLDecoder.decode(ret)); + } + return stormIds; + } + + public static Collection<String> supervisorWorkerIds(Map conf) { + String workerRoot = ConfigUtils.workerRoot(conf); + return Utils.readDirContents(workerRoot); + } + + public static boolean doRequiredTopoFilesExist(Map conf, String stormId) throws IOException { + String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot); + String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot); + String stormconfpath = ConfigUtils.supervisorStormConfPath(stormroot); + if (!Utils.checkFileExists(stormroot)) + return false; + if (!Utils.checkFileExists(stormcodepath)) + return false; + if (!Utils.checkFileExists(stormconfpath)) + return false; + if (ConfigUtils.isLocalMode(conf) || Utils.checkFileExists(stormjarpath)) + return true; + return false; + } + + /** + * map from worker id to heartbeat + * + * @param conf + * @return + * @throws Exception + */ + public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map conf) throws Exception { + return _instance.readWorkerHeartbeatsImpl(conf); + } + + public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map conf) throws Exception { + Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>(); + + Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf); + + for (String workerId : workerIds) { + LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId); + // ATTENTION: whb can be null + workerHeartbeats.put(workerId, whb); + } + return workerHeartbeats; + } + + + /** + * get worker heartbeat by workerId + * + * @param conf + * @param workerId + * @return + * @throws IOException + */ + public static LSWorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) { + return _instance.readWorkerHeartbeatImpl(conf, workerId); + } + + public LSWorkerHeartbeat readWorkerHeartbeatImpl(Map conf, String workerId) { + try { + LocalState localState = ConfigUtils.workerState(conf, workerId); + return localState.getWorkerHeartBeat(); + } catch (Exception e) { + LOG.warn("Failed to read local heartbeat for workerId : {},Ignoring exception.", workerId, e); + return null; + } + } + + public static boolean isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map conf) { + return _instance.isWorkerHbTimedOutImpl(now, whb, conf); + } + + public boolean isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, Map conf) { - boolean result = false; - if ((now - whb.get_time_secs()) > Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) { - result = true; - } - return result; ++ return (now - whb.get_time_secs()) > Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); + } + + public static String javaCmd(String cmd) { + return _instance.javaCmdImpl(cmd); + } + + public String javaCmdImpl(String cmd) { + String ret = null; + String javaHome = System.getenv().get("JAVA_HOME"); + if (StringUtils.isNotBlank(javaHome)) { + ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + cmd; + } else { + ret = cmd; + } + return ret; + } + - public final static List<ACL> supervisorZkAcls() { ++ public static List<ACL> supervisorZkAcls() { + final List<ACL> acls = new ArrayList<>(); + acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0)); + acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE)); + return acls; + } ++ ++ public static void shutdownAllWorkers(Map conf, String supervisorId, Map<String, String> workerThreadPids, Set<String> deadWorkers, ++ IWorkerManager workerManager) { ++ Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf); ++ try { ++ for (String workerId : workerIds) { ++ workerManager.shutdownWorker(supervisorId, workerId, workerThreadPids); ++ boolean success = workerManager.cleanupWorker(workerId); ++ if (success) { ++ deadWorkers.remove(workerId); ++ } ++ } ++ } catch (Exception e) { ++ LOG.error("shutWorker failed"); ++ throw Utils.wrapInRuntime(e); ++ } ++ } +} http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java index fb4e7ab,0000000..38b79e1 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java @@@ -1,427 -1,0 +1,427 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + + private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + + private LocalState localState; + private SupervisorData supervisorData; + public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + + private class ProcessExitCallback implements Utils.ExitCodeCallable { + private final String logPrefix; + private final String workerId; + + public ProcessExitCallback(String logPrefix, String workerId) { + this.logPrefix = logPrefix; + this.workerId = workerId; + } + + @Override + public Object call() throws Exception { + return null; + } + + @Override + public Object call(int exitCode) { + LOG.info("{} exited with code: {}", logPrefix, exitCode); + supervisorData.getDeadWorkers().add(workerId); + return null; + } + } + + public SyncProcessEvent(){ + + } + public SyncProcessEvent(SupervisorData supervisorData) { + init(supervisorData); + } + + public void init(SupervisorData supervisorData){ + this.supervisorData = supervisorData; + this.localState = supervisorData.getLocalState(); + } + + @Override + public void run() { + LOG.debug("Syncing processes"); + try { + Map conf = supervisorData.getConf(); + Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap(); + + if (assignedExecutors == null) { + assignedExecutors = new HashMap<>(); + } + int now = Time.currentTimeSecs(); + + Map<String, StateHeartbeat> localWorkerStats = getLocalWorkerStats(supervisorData, assignedExecutors, now); + + Set<String> keeperWorkerIds = new HashSet<>(); + Set<Integer> keepPorts = new HashSet<>(); + for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) { + StateHeartbeat stateHeartbeat = entry.getValue(); + if (stateHeartbeat.getState() == State.VALID) { + keeperWorkerIds.add(entry.getKey()); + keepPorts.add(stateHeartbeat.getHeartbeat().get_port()); + } + } + Map<Integer, LocalAssignment> reassignExecutors = getReassignExecutors(assignedExecutors, keepPorts); + Map<Integer, String> newWorkerIds = new HashMap<>(); + for (Integer port : reassignExecutors.keySet()) { + newWorkerIds.put(port, Utils.uuid()); + } - LOG.debug("Syncing processes"); + LOG.debug("Assigned executors: {}", assignedExecutors); + LOG.debug("Allocated: {}", localWorkerStats); + + for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) { + StateHeartbeat stateHeartbeat = entry.getValue(); + if (stateHeartbeat.getState() != State.VALID) { + LOG.info("Shutting down and clearing state for id {}, Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now, + stateHeartbeat.getState(), stateHeartbeat.getHeartbeat()); + killWorker(supervisorData, supervisorData.getWorkerManager(), entry.getKey()); + } + } + // start new workers + Map<String, Integer> newWorkerPortToIds = startNewWorkers(newWorkerIds, reassignExecutors); + + Map<String, Integer> allWorkerPortToIds = new HashMap<>(); + Map<String, Integer> approvedWorkers = localState.getApprovedWorkers(); + for (String keeper : keeperWorkerIds) { + allWorkerPortToIds.put(keeper, approvedWorkers.get(keeper)); + } + allWorkerPortToIds.putAll(newWorkerPortToIds); + localState.setApprovedWorkers(allWorkerPortToIds); + waitForWorkersLaunch(conf, newWorkerPortToIds.keySet()); + + } catch (Exception e) { + LOG.error("Failed Sync Process", e); + throw Utils.wrapInRuntime(e); + } + + } + + protected void waitForWorkersLaunch(Map conf, Set<String> workerIds) throws Exception { + int startTime = Time.currentTimeSecs(); + int timeOut = (int) conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS); + for (String workerId : workerIds) { + LocalState localState = ConfigUtils.workerState(conf, workerId); + while (true) { + LSWorkerHeartbeat hb = localState.getWorkerHeartBeat(); + if (hb != null || (Time.currentTimeSecs() - startTime) > timeOut) + break; + LOG.info("{} still hasn't started", workerId); + Time.sleep(500); + } + if (localState.getWorkerHeartBeat() == null) { + LOG.info("Worker {} failed to start", workerId); + } + } + } + + protected Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, LocalAssignment> assignExecutors, Set<Integer> keepPorts) { + Map<Integer, LocalAssignment> reassignExecutors = new HashMap<>(); + reassignExecutors.putAll(assignExecutors); + for (Integer port : keepPorts) { + reassignExecutors.remove(port); + } + return reassignExecutors; + } + + /** + * Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead + * + * @param assignedExecutors + * @return + * @throws Exception + */ + public Map<String, StateHeartbeat> getLocalWorkerStats(SupervisorData supervisorData, Map<Integer, LocalAssignment> assignedExecutors, int now) throws Exception { + Map<String, StateHeartbeat> workerIdHbstate = new HashMap<>(); + Map conf = supervisorData.getConf(); + LocalState localState = supervisorData.getLocalState(); + Map<String, LSWorkerHeartbeat> idToHeartbeat = SupervisorUtils.readWorkerHeartbeats(conf); + Map<String, Integer> approvedWorkers = localState.getApprovedWorkers(); + Set<String> approvedIds = new HashSet<>(); + if (approvedWorkers != null) { + approvedIds.addAll(approvedWorkers.keySet()); + } + for (Map.Entry<String, LSWorkerHeartbeat> entry : idToHeartbeat.entrySet()) { + String workerId = entry.getKey(); + LSWorkerHeartbeat whb = entry.getValue(); + State state; + if (whb == null) { + state = State.NOT_STARTED; + } else if (!approvedIds.contains(workerId) || !matchesAssignment(whb, assignedExecutors)) { + state = State.DISALLOWED; + } else if (supervisorData.getDeadWorkers().contains(workerId)) { + LOG.info("Worker Process {} has died", workerId); + state = State.TIMED_OUT; + } else if (SupervisorUtils.isWorkerHbTimedOut(now, whb, conf)) { + state = State.TIMED_OUT; + } else { + state = State.VALID; + } + LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor time-secs {}", workerId, state, whb, now); + workerIdHbstate.put(workerId, new StateHeartbeat(state, whb)); + } + return workerIdHbstate; + } + + protected boolean matchesAssignment(LSWorkerHeartbeat whb, Map<Integer, LocalAssignment> assignedExecutors) { + LocalAssignment localAssignment = assignedExecutors.get(whb.get_port()); + if (localAssignment == null || !localAssignment.get_topology_id().equals(whb.get_topology_id())) { + return false; + } + List<ExecutorInfo> executorInfos = new ArrayList<>(); + executorInfos.addAll(whb.get_executors()); + // remove SYSTEM_EXECUTOR_ID + executorInfos.remove(SYSTEM_EXECUTOR_INFO); + List<ExecutorInfo> localExecuorInfos = localAssignment.get_executors(); + + if (localExecuorInfos.size() != executorInfos.size()) + return false; + + for (ExecutorInfo executorInfo : localExecuorInfos){ + if (!localExecuorInfos.contains(executorInfo)) + return false; + } + return true; + } + + /** + * launch a worker in local mode. + */ + protected void launchLocalWorker(SupervisorData supervisorData, String stormId, Long port, String workerId, WorkerResources resources) throws IOException { + // port this function after porting worker to java + } + + protected void launchDistributedWorker(IWorkerManager workerManager, Map conf, String supervisorId, String assignmentId, String stormId, Long port, String workerId, + WorkerResources resources, ConcurrentHashSet deadWorkers) throws IOException { + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + writeLogMetadata(stormConf, user, workerId, stormId, port, conf); + ConfigUtils.setWorkerUserWSE(conf, workerId, user); + createArtifactsLink(conf, stormId, port, workerId); + + String logPrefix = "Worker Process " + workerId; + if (deadWorkers != null) + deadWorkers.remove(workerId); + createBlobstoreLinks(conf, stormId, workerId); + ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix, workerId); + workerManager.launchWorker(supervisorId, assignmentId, stormId, port, workerId, resources, processExitCallback); + } + + protected Map<String, Integer> startNewWorkers(Map<Integer, String> newWorkerIds, Map<Integer, LocalAssignment> reassignExecutors) throws IOException { + + Map<String, Integer> newValidWorkerIds = new HashMap<>(); + Map conf = supervisorData.getConf(); + String supervisorId = supervisorData.getSupervisorId(); + String clusterMode = ConfigUtils.clusterMode(conf); + + for (Map.Entry<Integer, LocalAssignment> entry : reassignExecutors.entrySet()) { + Integer port = entry.getKey(); + LocalAssignment assignment = entry.getValue(); + String workerId = newWorkerIds.get(port); + String stormId = assignment.get_topology_id(); + WorkerResources resources = assignment.get_resources(); + + // This condition checks for required files exist before launching the worker + if (SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) { + String pidsPath = ConfigUtils.workerPidsRoot(conf, workerId); + String hbPath = ConfigUtils.workerHeartbeatsRoot(conf, workerId); + + LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", assignment, supervisorData.getSupervisorId(), port, + workerId); + + FileUtils.forceMkdir(new File(pidsPath)); + FileUtils.forceMkdir(new File(ConfigUtils.workerTmpRoot(conf, workerId))); + FileUtils.forceMkdir(new File(hbPath)); + + if (clusterMode.endsWith("distributed")) { + launchDistributedWorker(supervisorData.getWorkerManager(), conf, supervisorId, supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, resources, supervisorData.getDeadWorkers()); + } else if (clusterMode.endsWith("local")) { + launchLocalWorker(supervisorData, stormId, port.longValue(), workerId, resources); + } + newValidWorkerIds.put(workerId, port); + + } else { + LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", assignment, + supervisorData.getSupervisorId(), port, workerId); + } + + } + return newValidWorkerIds; + } + + public void writeLogMetadata(Map stormconf, String user, String workerId, String stormId, Long port, Map conf) throws IOException { + Map data = new HashMap(); + data.put(Config.TOPOLOGY_SUBMITTER_USER, user); + data.put("worker-id", workerId); + + Set<String> logsGroups = new HashSet<>(); + //for supervisor-test + if (stormconf.get(Config.LOGS_GROUPS) != null) { + List<String> groups = (List<String>) stormconf.get(Config.LOGS_GROUPS); + for (String group : groups){ + logsGroups.add(group); + } + } + if (stormconf.get(Config.TOPOLOGY_GROUPS) != null) { + List<String> topGroups = (List<String>) stormconf.get(Config.TOPOLOGY_GROUPS); + logsGroups.addAll(topGroups); + } + data.put(Config.LOGS_GROUPS, logsGroups.toArray()); + + Set<String> logsUsers = new HashSet<>(); + if (stormconf.get(Config.LOGS_USERS) != null) { + List<String> logUsers = (List<String>) stormconf.get(Config.LOGS_USERS); + for (String logUser : logUsers){ + logsUsers.add(logUser); + } + } + if (stormconf.get(Config.TOPOLOGY_USERS) != null) { + List<String> topUsers = (List<String>) stormconf.get(Config.TOPOLOGY_USERS); + for (String logUser : topUsers){ + logsUsers.add(logUser); + } + } + data.put(Config.LOGS_USERS, logsUsers.toArray()); + writeLogMetadataToYamlFile(stormId, port, data, conf); + } + + /** + * run worker as user needs the directory to have special permissions or it is insecure + * + * @param stormId + * @param port + * @param data + * @param conf + * @throws IOException + */ + public void writeLogMetadataToYamlFile(String stormId, Long port, Map data, Map conf) throws IOException { + File file = ConfigUtils.getLogMetaDataFile(conf, stormId, port.intValue()); + + if (!Utils.checkFileExists(file.getParent())) { + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + FileUtils.forceMkdir(file.getParentFile()); + SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), file.getParentFile().getCanonicalPath()); + } else { + file.getParentFile().mkdirs(); + } + } + FileWriter writer = new FileWriter(file); + Yaml yaml = new Yaml(); + try { + yaml.dump(data, writer); + }finally { + writer.close(); + } + } + + /** + * Create a symlink from workder directory to its port artifacts directory + * + * @param conf + * @param stormId + * @param port + * @param workerId + */ + protected void createArtifactsLink(Map conf, String stormId, Long port, String workerId) throws IOException { + String workerDir = ConfigUtils.workerRoot(conf, workerId); + String topoDir = ConfigUtils.workerArtifactsRoot(conf, stormId); + if (Utils.checkFileExists(workerDir)) { ++ LOG.debug("Creating symlinks for worker-id: {} storm-id: {} to its port artifacts directory", workerId, stormId); + Utils.createSymlink(workerDir, topoDir, "artifacts", String.valueOf(port)); + } + } + + /** + * Create symlinks in worker launch directory for all blobs + * + * @param conf + * @param stormId + * @param workerId + * @throws IOException + */ + protected void createBlobstoreLinks(Map conf, String stormId, String workerId) throws IOException { + String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + String workerRoot = ConfigUtils.workerRoot(conf, workerId); + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + List<String> blobFileNames = new ArrayList<>(); + if (blobstoreMap != null) { + for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) { + String key = entry.getKey(); + Map<String, Object> blobInfo = entry.getValue(); + String ret = null; + if (blobInfo != null && blobInfo.containsKey("localname")) { + ret = (String) blobInfo.get("localname"); + } else { + ret = key; + } + blobFileNames.add(ret); + } + } + List<String> resourceFileNames = new ArrayList<>(); + resourceFileNames.add(ConfigUtils.RESOURCES_SUBDIR); + resourceFileNames.addAll(blobFileNames); + LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", workerId, stormId, resourceFileNames.size(), resourceFileNames); + Utils.createSymlink(workerRoot, stormRoot, ConfigUtils.RESOURCES_SUBDIR); + for (String fileName : blobFileNames) { + Utils.createSymlink(workerRoot, stormRoot, fileName, fileName); + } + } + + public void killWorker(SupervisorData supervisorData, IWorkerManager workerManager, String workerId) throws IOException, InterruptedException{ + workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, supervisorData.getWorkerThreadPids()); + boolean success = workerManager.cleanupWorker(workerId); + if (success){ + supervisorData.getDeadWorkers().remove(workerId); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java index b53db06,0000000..128d229 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java @@@ -1,626 -1,0 +1,633 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + + private EventManager syncSupEventManager; + private EventManager syncProcessManager; + private IStormClusterState stormClusterState; + private LocalState localState; + private SyncProcessEvent syncProcesses; + private SupervisorData supervisorData; + + public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, + EventManager syncProcessManager) { + + this.syncProcesses = syncProcesses; + this.syncSupEventManager = syncSupEventManager; + this.syncProcessManager = syncProcessManager; + this.stormClusterState = supervisorData.getStormClusterState(); + this.localState = supervisorData.getLocalState(); + this.supervisorData = supervisorData; + } + + @Override + public void run() { + try { + Map conf = supervisorData.getConf(); + Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); + List<String> stormIds = stormClusterState.assignments(syncCallback); + Map<String, Map<String, Object>> assignmentsSnapshot = + getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); + Map<String, List<ProfileRequest>> stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + + Set<String> allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); + Map<String, String> stormcodeMap = readStormCodeLocations(assignmentsSnapshot); + Map<Integer, LocalAssignment> existingAssignment = localState.getLocalAssignmentsMap(); + if (existingAssignment == null) { + existingAssignment = new HashMap<>(); + } + + Map<Integer, LocalAssignment> allAssignment = + readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + + Map<Integer, LocalAssignment> newAssignment = new HashMap<>(); + Set<String> assignedStormIds = new HashSet<>(); + + for (Map.Entry<Integer, LocalAssignment> entry : allAssignment.entrySet()) { + if (supervisorData.getiSupervisor().confirmAssigned(entry.getKey())) { + newAssignment.put(entry.getKey(), entry.getValue()); + assignedStormIds.add(entry.getValue().get_topology_id()); + } + } + - Set<String> srashStormIds = verifyDownloadedFiles(conf, supervisorData.getLocalizer(), assignedStormIds, allDownloadedTopologyIds); ++ Set<String> crashedStormIds = verifyDownloadedFiles(conf, supervisorData.getLocalizer(), assignedStormIds, allDownloadedTopologyIds); + Set<String> downloadedStormIds = new HashSet<>(); + downloadedStormIds.addAll(allDownloadedTopologyIds); - downloadedStormIds.removeAll(srashStormIds); ++ downloadedStormIds.removeAll(crashedStormIds); + + LOG.debug("Synchronizing supervisor"); + LOG.debug("Storm code map: {}", stormcodeMap); + LOG.debug("All assignment: {}", allAssignment); + LOG.debug("New assignment: {}", newAssignment); + LOG.debug("Assigned Storm Ids {}", assignedStormIds); + LOG.debug("All Downloaded Ids {}", allDownloadedTopologyIds); - LOG.debug("Checked Downloaded Ids {}", srashStormIds); ++ LOG.debug("Checked Downloaded Ids {}", crashedStormIds); + LOG.debug("Downloaded Ids {}", downloadedStormIds); + LOG.debug("Storm Ids Profiler Actions {}", stormIdToProfilerActions); + + // download code first + // This might take awhile + // - should this be done separately from usual monitoring? + // should we only download when topology is assigned to this supervisor? + for (Map.Entry<String, String> entry : stormcodeMap.entrySet()) { + String stormId = entry.getKey(); + if (!downloadedStormIds.contains(stormId) && assignedStormIds.contains(stormId)) { + LOG.info("Downloading code for storm id {}.", stormId); + try { + downloadStormCode(conf, stormId, entry.getValue(), supervisorData.getLocalizer()); + } catch (Exception e) { + if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) { + LOG.warn("Nimbus leader was not available.", e); + } else if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) { + LOG.warn("There was a connection problem with nimbus.", e); + } else { + throw e; + } + } + LOG.info("Finished downloading code for storm id {}", stormId); + } + } + + LOG.debug("Writing new assignment {}", newAssignment); + + Set<Integer> killWorkers = new HashSet<>(); + killWorkers.addAll(existingAssignment.keySet()); + killWorkers.removeAll(newAssignment.keySet()); + for (Integer port : killWorkers) { + supervisorData.getiSupervisor().killedWorker(port); + } + + killExistingWorkersWithChangeInComponents(supervisorData, existingAssignment, newAssignment); + + supervisorData.getiSupervisor().assigned(newAssignment.keySet()); + localState.setLocalAssignmentsMap(newAssignment); + supervisorData.setAssignmentVersions(assignmentsSnapshot); - supervisorData.setStormIdToProfileActions(stormIdToProfilerActions); ++ supervisorData.setStormIdToProfilerActions(stormIdToProfilerActions); + + Map<Long, LocalAssignment> convertNewAssignment = new HashMap<>(); + for (Map.Entry<Integer, LocalAssignment> entry : newAssignment.entrySet()) { + convertNewAssignment.put(entry.getKey().longValue(), entry.getValue()); + } + supervisorData.setCurrAssignment(convertNewAssignment); + // remove any downloaded code that's no longer assigned or active + // important that this happens after setting the local assignment so that + // synchronize-supervisor doesn't try to launch workers for which the + // resources don't exist + if (Utils.isOnWindows()) { + shutdownDisallowedWorkers(); + } + for (String stormId : allDownloadedTopologyIds) { + if (!stormcodeMap.containsKey(stormId)) { + LOG.info("Removing code for storm id {}.", stormId); + rmTopoFiles(conf, stormId, supervisorData.getLocalizer(), true); + } + } + syncProcessManager.add(syncProcesses); + } catch (Exception e) { + LOG.error("Failed to Sync Supervisor", e); + throw new RuntimeException(e); + } + + } + + private void killExistingWorkersWithChangeInComponents(SupervisorData supervisorData, Map<Integer, LocalAssignment> existingAssignment, + Map<Integer, LocalAssignment> newAssignment) throws Exception { - LocalState localState = supervisorData.getLocalState(); - Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap(); - if (assignedExecutors == null) { - assignedExecutors = new HashMap<>(); - } + int now = Time.currentTimeSecs(); - Map<String, StateHeartbeat> workerIdHbstate = syncProcesses.getLocalWorkerStats(supervisorData, assignedExecutors, now); ++ Map<String, StateHeartbeat> workerIdHbstate = syncProcesses.getLocalWorkerStats(supervisorData, existingAssignment, now); + Map<Integer, String> vaildPortToWorkerIds = new HashMap<>(); + for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()) { + String workerId = entry.getKey(); + StateHeartbeat stateHeartbeat = entry.getValue(); + if (stateHeartbeat != null && stateHeartbeat.getState() == State.VALID) { + vaildPortToWorkerIds.put(stateHeartbeat.getHeartbeat().get_port(), workerId); + } + } + + Map<Integer, LocalAssignment> intersectAssignment = new HashMap<>(); + for (Map.Entry<Integer, LocalAssignment> entry : newAssignment.entrySet()) { + Integer port = entry.getKey(); + if (existingAssignment.containsKey(port)) { + intersectAssignment.put(port, entry.getValue()); + } + } + + for (Integer port : intersectAssignment.keySet()) { + List<ExecutorInfo> existExecutors = existingAssignment.get(port).get_executors(); + List<ExecutorInfo> newExecutors = newAssignment.get(port).get_executors(); + Set<ExecutorInfo> setExitExecutors = new HashSet<>(existExecutors); + Set<ExecutorInfo> setNewExecutors = new HashSet<>(newExecutors); + if (!setExitExecutors.equals(setNewExecutors)){ + syncProcesses.killWorker(supervisorData, supervisorData.getWorkerManager(), vaildPortToWorkerIds.get(port)); + } + } + } + + protected Map<String, Map<String, Object>> getAssignmentsSnapshot(IStormClusterState stormClusterState, List<String> stormIds, + Map<String, Map<String, Object>> localAssignmentVersion, Runnable callback) throws Exception { + Map<String, Map<String, Object>> updateAssignmentVersion = new HashMap<>(); + for (String stormId : stormIds) { + Integer recordedVersion = -1; + Integer version = stormClusterState.assignmentVersion(stormId, callback); + if (localAssignmentVersion.containsKey(stormId) && localAssignmentVersion.get(stormId) != null) { + recordedVersion = (Integer) localAssignmentVersion.get(stormId).get(IStateStorage.VERSION); + } + if (version == null) { + // ignore + } else if (version == recordedVersion) { + updateAssignmentVersion.put(stormId, localAssignmentVersion.get(stormId)); + } else { + Map<String, Object> assignmentVersion = (Map<String, Object>) stormClusterState.assignmentInfoWithVersion(stormId, callback); + updateAssignmentVersion.put(stormId, assignmentVersion); + } + } + return updateAssignmentVersion; + } + + protected Map<String, List<ProfileRequest>> getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) throws Exception { + Map<String, List<ProfileRequest>> ret = new HashMap<String, List<ProfileRequest>>(); + for (String stormId : stormIds) { + List<ProfileRequest> profileRequests = stormClusterState.getTopologyProfileRequests(stormId); + ret.put(stormId, profileRequests); + } + return ret; + } + + protected Map<String, String> readStormCodeLocations(Map<String, Map<String, Object>> assignmentsSnapshot) { + Map<String, String> stormcodeMap = new HashMap<>(); + for (Map.Entry<String, Map<String, Object>> entry : assignmentsSnapshot.entrySet()) { + Assignment assignment = (Assignment) (entry.getValue().get(IStateStorage.DATA)); + if (assignment != null) { + stormcodeMap.put(entry.getKey(), assignment.get_master_code_dir()); + } + } + return stormcodeMap; + } + + /** + * Remove a reference to a blob when its no longer needed. + * + * @param localizer + * @param stormId + * @param conf + */ + protected void removeBlobReferences(Localizer localizer, String stormId, Map conf) throws Exception { + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME); + if (blobstoreMap != null) { + for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) { + String key = entry.getKey(); + Map<String, Object> blobInfo = entry.getValue(); + localizer.removeBlobReference(key, user, topoName, SupervisorUtils.shouldUncompressBlob(blobInfo)); + } + } + } + + protected void rmTopoFiles(Map conf, String stormId, Localizer localizer, boolean isrmBlobRefs) throws IOException { + String path = ConfigUtils.supervisorStormDistRoot(conf, stormId); + try { + if (isrmBlobRefs) { + removeBlobReferences(localizer, stormId, conf); + } + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + SupervisorUtils.rmrAsUser(conf, stormId, path); + } else { + Utils.forceDelete(ConfigUtils.supervisorStormDistRoot(conf, stormId)); + } + } catch (Exception e) { + LOG.info("Exception removing: {} ", stormId, e); + } + } + + /** + * Check for the files exists to avoid supervisor crashing Also makes sure there is no necessity for locking" + * + * @param conf + * @param localizer + * @param assignedStormIds + * @param allDownloadedTopologyIds + * @return + */ + protected Set<String> verifyDownloadedFiles(Map conf, Localizer localizer, Set<String> assignedStormIds, Set<String> allDownloadedTopologyIds) + throws IOException { + Set<String> srashStormIds = new HashSet<>(); + for (String stormId : allDownloadedTopologyIds) { + if (assignedStormIds.contains(stormId)) { + if (!SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) { + LOG.debug("Files not present in topology directory"); + rmTopoFiles(conf, stormId, localizer, false); + srashStormIds.add(stormId); + } + } + } + return srashStormIds; + } + + /** + * download code ; two cluster mode: local and distributed + * + * @param conf + * @param stormId + * @param masterCodeDir + * @throws IOException + */ + private void downloadStormCode(Map conf, String stormId, String masterCodeDir, Localizer localizer) throws Exception { + String clusterMode = ConfigUtils.clusterMode(conf); + + if (clusterMode.endsWith("distributed")) { + downloadDistributeStormCode(conf, stormId, masterCodeDir, localizer); + } else if (clusterMode.endsWith("local")) { + downloadLocalStormCode(conf, stormId, masterCodeDir, localizer); + } + } + + private void downloadLocalStormCode(Map conf, String stormId, String masterCodeDir, Localizer localizer) throws Exception { + + String tmproot = ConfigUtils.supervisorTmpDir(conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid(); + String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + BlobStore blobStore = Utils.getNimbusBlobStore(conf, masterCodeDir, null); ++ FileOutputStream codeOutStream = null; ++ FileOutputStream confOutStream = null; + try { + FileUtils.forceMkdir(new File(tmproot)); + String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId); + String stormConfKey = ConfigUtils.masterStormConfKey(stormId); + String codePath = ConfigUtils.supervisorStormCodePath(tmproot); + String confPath = ConfigUtils.supervisorStormConfPath(tmproot); - blobStore.readBlobTo(stormCodeKey, new FileOutputStream(codePath), null); - blobStore.readBlobTo(stormConfKey, new FileOutputStream(confPath), null); ++ codeOutStream = new FileOutputStream(codePath); ++ blobStore.readBlobTo(stormCodeKey, codeOutStream, null); ++ confOutStream = new FileOutputStream(confPath); ++ blobStore.readBlobTo(stormConfKey, confOutStream, null); + } finally { ++ if (codeOutStream != null) ++ codeOutStream.close(); ++ if (confOutStream != null) ++ codeOutStream.close(); + blobStore.shutdown(); + } + FileUtils.moveDirectory(new File(tmproot), new File(stormroot)); + SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot); + ClassLoader classloader = Thread.currentThread().getContextClassLoader(); + + String resourcesJar = resourcesJar(); + + URL url = classloader.getResource(ConfigUtils.RESOURCES_SUBDIR); + + String targetDir = stormroot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR; + + if (resourcesJar != null) { + LOG.info("Extracting resources from jar at {} to {}", resourcesJar, targetDir); + Utils.extractDirFromJar(resourcesJar, ConfigUtils.RESOURCES_SUBDIR, stormroot); + } else if (url != null) { + + LOG.info("Copying resources at {} to {} ", url.toString(), targetDir); + if (url.getProtocol() == "jar") { + JarURLConnection urlConnection = (JarURLConnection) url.openConnection(); + Utils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), ConfigUtils.RESOURCES_SUBDIR, stormroot); + } else { + FileUtils.copyDirectory(new File(url.getFile()), (new File(targetDir))); + } + } + } + + /** + * Downloading to permanent location is atomic + * + * @param conf + * @param stormId + * @param masterCodeDir + * @param localizer + * @throws Exception + */ + private void downloadDistributeStormCode(Map conf, String stormId, String masterCodeDir, Localizer localizer) throws Exception { + + String tmproot = ConfigUtils.supervisorTmpDir(conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid(); + String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + ClientBlobStore blobStore = Utils.getClientBlobStoreForSupervisor(conf); + FileUtils.forceMkdir(new File(tmproot)); + if (Utils.isOnWindows()) { + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + throw new RuntimeException("ERROR: Windows doesn't implement setting the correct permissions"); + } + } else { + Utils.restrictPermissions(tmproot); + } + String stormJarKey = ConfigUtils.masterStormJarKey(stormId); + String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId); + String stormConfKey = ConfigUtils.masterStormConfKey(stormId); + String jarPath = ConfigUtils.supervisorStormJarPath(tmproot); + String codePath = ConfigUtils.supervisorStormCodePath(tmproot); + String confPath = ConfigUtils.supervisorStormConfPath(tmproot); + Utils.downloadResourcesAsSupervisor(stormJarKey, jarPath, blobStore); + Utils.downloadResourcesAsSupervisor(stormCodeKey, codePath, blobStore); + Utils.downloadResourcesAsSupervisor(stormConfKey, confPath, blobStore); + blobStore.shutdown(); + Utils.extractDirFromJar(jarPath, ConfigUtils.RESOURCES_SUBDIR, tmproot); + downloadBlobsForTopology(conf, confPath, localizer, tmproot); - if (IsDownloadBlobsForTopologySucceed(confPath, tmproot)) { ++ if (didDownloadBlobsForTopologySucceed(confPath, tmproot)) { + LOG.info("Successfully downloaded blob resources for storm-id {}", stormId); - FileUtils.forceMkdir(new File(stormroot)); - Files.move(new File(tmproot).toPath(), new File(stormroot).toPath(), StandardCopyOption.ATOMIC_MOVE); - SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot); ++ if (Utils.isOnWindows()) { ++ // Files/move with non-empty directory doesn't work well on Windows ++ FileUtils.moveDirectory(new File(tmproot), new File(stormroot)); ++ } else { ++ FileUtils.forceMkdir(new File(stormroot)); ++ Files.move(new File(tmproot).toPath(), new File(stormroot).toPath(), StandardCopyOption.ATOMIC_MOVE); ++ } + } else { + LOG.info("Failed to download blob resources for storm-id ", stormId); + Utils.forceDelete(tmproot); + } + } + + /** + * Assert if all blobs are downloaded for the given topology + * + * @param stormconfPath + * @param targetDir + * @return + */ - protected boolean IsDownloadBlobsForTopologySucceed(String stormconfPath, String targetDir) throws IOException { ++ protected boolean didDownloadBlobsForTopologySucceed(String stormconfPath, String targetDir) throws IOException { + Map stormConf = Utils.fromCompressedJsonConf(FileUtils.readFileToByteArray(new File(stormconfPath))); + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + List<String> blobFileNames = new ArrayList<>(); + if (blobstoreMap != null) { + for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) { + String key = entry.getKey(); + Map<String, Object> blobInfo = entry.getValue(); + String ret = null; + if (blobInfo != null && blobInfo.containsKey("localname")) { + ret = (String) blobInfo.get("localname"); + } else { + ret = key; + } + blobFileNames.add(ret); + } + } + for (String string : blobFileNames) { + if (!Utils.checkFileExists(string)) + return false; + } + return true; + } + + /** + * Download all blobs listed in the topology configuration for a given topology. + * + * @param conf + * @param stormconfPath + * @param localizer + * @param tmpRoot + */ + protected void downloadBlobsForTopology(Map conf, String stormconfPath, Localizer localizer, String tmpRoot) throws IOException { + Map stormConf = ConfigUtils.readSupervisorStormConfGivenPath(conf, stormconfPath); + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME); + File userDir = localizer.getLocalUserFileCacheDir(user); + List<LocalResource> localResourceList = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); + if (localResourceList.size() > 0) { + if (!userDir.exists()) { + FileUtils.forceMkdir(userDir); + } + try { + List<LocalizedResource> localizedResources = localizer.getBlobs(localResourceList, user, topoName, userDir); + setupBlobPermission(conf, user, userDir.toString()); + for (LocalizedResource localizedResource : localizedResources) { + File rsrcFilePath = new File(localizedResource.getFilePath()); + String keyName = rsrcFilePath.getName(); + String blobSymlinkTargetName = new File(localizedResource.getCurrentSymlinkPath()).getName(); + + String symlinkName = null; + if (blobstoreMap != null) { + Map<String, Object> blobInfo = blobstoreMap.get(keyName); + if (blobInfo != null && blobInfo.containsKey("localname")) { + symlinkName = (String) blobInfo.get("localname"); + } else { + symlinkName = keyName; + } + } + Utils.createSymlink(tmpRoot, rsrcFilePath.getParent(), symlinkName, blobSymlinkTargetName); + } + } catch (AuthorizationException authExp) { + LOG.error("AuthorizationException error {}", authExp); + } catch (KeyNotFoundException knf) { + LOG.error("KeyNotFoundException error {}", knf); + } + } + } + + protected void setupBlobPermission(Map conf, String user, String path) throws IOException { + if (Utils.getBoolean(Config.SUPERVISOR_RUN_WORKER_AS_USER, false)) { + String logPrefix = "setup blob permissions for " + path; + SupervisorUtils.processLauncherAndWait(conf, user, Arrays.asList("blob", path), null, logPrefix); + } + + } + + private String resourcesJar() throws IOException { + + String path = Utils.currentClasspath(); + if (path == null) { + return null; + } + String[] paths = path.split(File.pathSeparator); + List<String> jarPaths = new ArrayList<String>(); + for (String s : paths) { + if (s.endsWith(".jar")) { + jarPaths.add(s); + } + } + + List<String> rtn = new ArrayList<String>(); + int size = jarPaths.size(); + for (int i = 0; i < size; i++) { + if (Utils.zipDoesContainDir(jarPaths.get(i), ConfigUtils.RESOURCES_SUBDIR)) { + rtn.add(jarPaths.get(i)); + } + } + if (rtn.size() == 0) + return null; + + return rtn.get(0); + } + + protected Map<Integer, LocalAssignment> readAssignments(Map<String, Map<String, Object>> assignmentsSnapshot, + Map<Integer, LocalAssignment> existingAssignment, String assignmentId, AtomicInteger retries) { + try { + Map<Integer, LocalAssignment> portLA = new HashMap<Integer, LocalAssignment>(); + for (Map.Entry<String, Map<String, Object>> assignEntry : assignmentsSnapshot.entrySet()) { + String stormId = assignEntry.getKey(); + Assignment assignment = (Assignment) assignEntry.getValue().get(IStateStorage.DATA); + + Map<Integer, LocalAssignment> portTasks = readMyExecutors(stormId, assignmentId, assignment); + + for (Map.Entry<Integer, LocalAssignment> entry : portTasks.entrySet()) { + + Integer port = entry.getKey(); + + LocalAssignment la = entry.getValue(); + + if (!portLA.containsKey(port)) { + portLA.put(port, la); + } else { + throw new RuntimeException("Should not have multiple topologys assigned to one port"); + } + } + } + retries.set(0); + return portLA; + } catch (RuntimeException e) { + if (retries.get() > 2) { + throw e; + } else { + retries.addAndGet(1); + } + LOG.warn("{} : retrying {} of 3", e.getMessage(), retries.get()); + return existingAssignment; + } + } + + protected Map<Integer, LocalAssignment> readMyExecutors(String stormId, String assignmentId, Assignment assignment) { + Map<Integer, LocalAssignment> portTasks = new HashMap<>(); + Map<Long, WorkerResources> slotsResources = new HashMap<>(); + Map<NodeInfo, WorkerResources> nodeInfoWorkerResourcesMap = assignment.get_worker_resources(); + if (nodeInfoWorkerResourcesMap != null) { + for (Map.Entry<NodeInfo, WorkerResources> entry : nodeInfoWorkerResourcesMap.entrySet()) { + if (entry.getKey().get_node().equals(assignmentId)) { + Set<Long> ports = entry.getKey().get_port(); + for (Long port : ports) { + slotsResources.put(port, entry.getValue()); + } + } + } + } + Map<List<Long>, NodeInfo> executorNodePort = assignment.get_executor_node_port(); + if (executorNodePort != null) { + for (Map.Entry<List<Long>, NodeInfo> entry : executorNodePort.entrySet()) { + if (entry.getValue().get_node().equals(assignmentId)) { + for (Long port : entry.getValue().get_port()) { + LocalAssignment localAssignment = portTasks.get(port.intValue()); + if (localAssignment == null) { + List<ExecutorInfo> executors = new ArrayList<ExecutorInfo>(); + localAssignment = new LocalAssignment(stormId, executors); + if (slotsResources.containsKey(port)) { + localAssignment.set_resources(slotsResources.get(port)); + } + portTasks.put(port.intValue(), localAssignment); + } + List<ExecutorInfo> executorInfoList = localAssignment.get_executors(); + executorInfoList.add(new ExecutorInfo(entry.getKey().get(0).intValue(), entry.getKey().get(entry.getKey().size() - 1).intValue())); + } + } + } + } + return portTasks; + } + + // I konw it's not a good idea to create SyncProcessEvent, but I only hope SyncProcessEvent is responsible for start/shutdown + // workers, and SyncSupervisorEvent is responsible for download/remove topologys' binary. + protected void shutdownDisallowedWorkers() throws Exception { + LocalState localState = supervisorData.getLocalState(); + Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap(); + if (assignedExecutors == null) { + assignedExecutors = new HashMap<>(); + } + int now = Time.currentTimeSecs(); + Map<String, StateHeartbeat> workerIdHbstate = syncProcesses.getLocalWorkerStats(supervisorData, assignedExecutors, now); + LOG.debug("Allocated workers ", assignedExecutors); + for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()) { + String workerId = entry.getKey(); + StateHeartbeat stateHeartbeat = entry.getValue(); + if (stateHeartbeat.getState() == State.DISALLOWED) { + syncProcesses.killWorker(supervisorData, supervisorData.getWorkerManager(), workerId); + LOG.debug("{}'s state disallowed, so shutdown this worker"); + } + } + } +}
