http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java deleted file mode 100644 index 53436ae..0000000 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java +++ /dev/null @@ -1,448 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.daemon.supervisor; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.storm.Config; -import org.apache.storm.container.cgroup.CgroupManager; -import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; -import org.apache.storm.generated.ExecutorInfo; -import org.apache.storm.generated.LSWorkerHeartbeat; -import org.apache.storm.generated.LocalAssignment; -import org.apache.storm.generated.WorkerResources; -import org.apache.storm.utils.ConfigUtils; -import org.apache.storm.utils.LocalState; -import org.apache.storm.utils.Time; -import org.apache.storm.utils.Utils; -import org.eclipse.jetty.util.ConcurrentHashSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.yaml.snakeyaml.Yaml; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.util.*; - -/** - * 1. to kill are those in allocated that are dead or disallowed - * 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) - * 3. remove any downloaded code that's no longer assigned to this supervisor - * 4. of the rest, figure out what assignments aren't yet satisfied - * 5. generate new worker ids, write new "approved workers" to LS - * 6. create local dir for worker id - * 7. launch new workers (give worker-id, port, and supervisor-id) - * 8. wait for workers launch - */ -public class SyncProcessEvent implements Runnable { - - private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); - - private LocalState localState; - private SupervisorData supervisorData; - public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); - - private class ProcessExitCallback implements Utils.ExitCodeCallable { - private final String logPrefix; - private final String workerId; - - public ProcessExitCallback(String logPrefix, String workerId) { - this.logPrefix = logPrefix; - this.workerId = workerId; - } - - @Override - public Object call() throws Exception { - return null; - } - - @Override - public Object call(int exitCode) { - LOG.info("{} exited with code: {}", logPrefix, exitCode); - supervisorData.getDeadWorkers().add(workerId); - return null; - } - } - - public SyncProcessEvent(){ - - } - public SyncProcessEvent(SupervisorData supervisorData) { - init(supervisorData); - } - - public void init(SupervisorData supervisorData){ - this.supervisorData = supervisorData; - this.localState = supervisorData.getLocalState(); - } - - @Override - public void run() { - LOG.debug("Syncing processes"); - try { - Map conf = supervisorData.getConf(); - Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap(); - - if (assignedExecutors == null) { - assignedExecutors = new HashMap<>(); - } - - Set<String> assignedStormIds = new HashSet<>(); - for (Map.Entry<Integer, LocalAssignment> entry : assignedExecutors.entrySet()) { - assignedStormIds.add(entry.getValue().get_topology_id()); - } - - int now = Time.currentTimeSecs(); - - Map<String, StateHeartbeat> localWorkerStats = getLocalWorkerStats(supervisorData, assignedExecutors, now); - - Set<String> keeperWorkerIds = new HashSet<>(); - Set<Integer> keepPorts = new HashSet<>(); - for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) { - StateHeartbeat stateHeartbeat = entry.getValue(); - if (stateHeartbeat.getState() == State.VALID) { - keeperWorkerIds.add(entry.getKey()); - keepPorts.add(stateHeartbeat.getHeartbeat().get_port()); - } - } - Map<Integer, LocalAssignment> reassignExecutors = getReassignExecutors(assignedExecutors, keepPorts); - Map<Integer, String> newWorkerIds = new HashMap<>(); - for (Integer port : reassignExecutors.keySet()) { - newWorkerIds.put(port, Utils.uuid()); - } - Set<String> allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); - - LOG.debug("Assigned executors: {}", assignedExecutors); - LOG.debug("Allocated: {}", localWorkerStats); - - for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) { - StateHeartbeat stateHeartbeat = entry.getValue(); - if (stateHeartbeat.getState() != State.VALID) { - LOG.info("Shutting down and clearing state for id {}, Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now, - stateHeartbeat.getState(), stateHeartbeat.getHeartbeat()); - killWorker(supervisorData, supervisorData.getWorkerManager(), entry.getKey()); - } - } - - // remove any downloaded code that's no longer assigned or active - for (String downloadedTopologyId : allDownloadedTopologyIds) { - if (!assignedStormIds.contains(downloadedTopologyId)) { - LOG.info("Removing code for storm id {}.", downloadedTopologyId); - SupervisorUtils.rmTopoFiles(conf, downloadedTopologyId, supervisorData.getLocalizer(), true); - } - } - - // start new workers - Map<String, Integer> newWorkerPortToIds = startNewWorkers(newWorkerIds, reassignExecutors); - - Map<String, Integer> allWorkerPortToIds = new HashMap<>(); - Map<String, Integer> approvedWorkers = localState.getApprovedWorkers(); - for (String keeper : keeperWorkerIds) { - allWorkerPortToIds.put(keeper, approvedWorkers.get(keeper)); - } - allWorkerPortToIds.putAll(newWorkerPortToIds); - localState.setApprovedWorkers(allWorkerPortToIds); - waitForWorkersLaunch(conf, newWorkerPortToIds.keySet()); - - } catch (Exception e) { - LOG.error("Failed Sync Process", e); - throw Utils.wrapInRuntime(e); - } - - } - - protected void waitForWorkersLaunch(Map conf, Set<String> workerIds) throws Exception { - int startTime = Time.currentTimeSecs(); - int timeOut = (int) conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS); - for (String workerId : workerIds) { - LocalState localState = ConfigUtils.workerState(conf, workerId); - while (true) { - LSWorkerHeartbeat hb = localState.getWorkerHeartBeat(); - if (hb != null || (Time.currentTimeSecs() - startTime) > timeOut) - break; - LOG.info("{} still hasn't started", workerId); - Time.sleep(500); - } - if (localState.getWorkerHeartBeat() == null) { - LOG.info("Worker {} failed to start", workerId); - } - } - } - - protected Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, LocalAssignment> assignExecutors, Set<Integer> keepPorts) { - Map<Integer, LocalAssignment> reassignExecutors = new HashMap<>(); - reassignExecutors.putAll(assignExecutors); - for (Integer port : keepPorts) { - reassignExecutors.remove(port); - } - return reassignExecutors; - } - - /** - * Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead - * - * @param assignedExecutors - * @return - * @throws Exception - */ - public Map<String, StateHeartbeat> getLocalWorkerStats(SupervisorData supervisorData, Map<Integer, LocalAssignment> assignedExecutors, int now) throws Exception { - Map<String, StateHeartbeat> workerIdHbstate = new HashMap<>(); - Map conf = supervisorData.getConf(); - LocalState localState = supervisorData.getLocalState(); - Map<String, LSWorkerHeartbeat> idToHeartbeat = SupervisorUtils.readWorkerHeartbeats(conf); - Map<String, Integer> approvedWorkers = localState.getApprovedWorkers(); - Set<String> approvedIds = new HashSet<>(); - if (approvedWorkers != null) { - approvedIds.addAll(approvedWorkers.keySet()); - } - for (Map.Entry<String, LSWorkerHeartbeat> entry : idToHeartbeat.entrySet()) { - String workerId = entry.getKey(); - LSWorkerHeartbeat whb = entry.getValue(); - State state; - if (whb == null) { - state = State.NOT_STARTED; - } else if (!approvedIds.contains(workerId) || !matchesAssignment(whb, assignedExecutors)) { - state = State.DISALLOWED; - } else if (supervisorData.getDeadWorkers().contains(workerId)) { - LOG.info("Worker Process {} has died", workerId); - state = State.TIMED_OUT; - } else if (SupervisorUtils.isWorkerHbTimedOut(now, whb, conf)) { - state = State.TIMED_OUT; - } else { - state = State.VALID; - } - LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor time-secs {}", workerId, state, whb, now); - workerIdHbstate.put(workerId, new StateHeartbeat(state, whb)); - } - return workerIdHbstate; - } - - protected boolean matchesAssignment(LSWorkerHeartbeat whb, Map<Integer, LocalAssignment> assignedExecutors) { - LocalAssignment localAssignment = assignedExecutors.get(whb.get_port()); - if (localAssignment == null || !localAssignment.get_topology_id().equals(whb.get_topology_id())) { - return false; - } - List<ExecutorInfo> executorInfos = new ArrayList<>(); - executorInfos.addAll(whb.get_executors()); - // remove SYSTEM_EXECUTOR_ID - executorInfos.remove(SYSTEM_EXECUTOR_INFO); - List<ExecutorInfo> localExecuorInfos = localAssignment.get_executors(); - - if (localExecuorInfos.size() != executorInfos.size()) - return false; - - for (ExecutorInfo executorInfo : localExecuorInfos){ - if (!localExecuorInfos.contains(executorInfo)) - return false; - } - return true; - } - - /** - * launch a worker in local mode. - */ - protected void launchLocalWorker(SupervisorData supervisorData, String stormId, Long port, String workerId, WorkerResources resources) throws IOException { - // port this function after porting worker to java - } - - protected void launchDistributedWorker(IWorkerManager workerManager, Map conf, String supervisorId, String assignmentId, String stormId, Long port, String workerId, - WorkerResources resources, ConcurrentHashSet deadWorkers) throws IOException { - Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); - String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); - writeLogMetadata(stormConf, user, workerId, stormId, port, conf); - ConfigUtils.setWorkerUserWSE(conf, workerId, user); - createArtifactsLink(conf, stormId, port, workerId); - - String logPrefix = "Worker Process " + workerId; - if (deadWorkers != null) - deadWorkers.remove(workerId); - createBlobstoreLinks(conf, stormId, workerId); - ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix, workerId); - workerManager.launchWorker(supervisorId, assignmentId, stormId, port, workerId, resources, processExitCallback); - } - - protected Map<String, Integer> startNewWorkers(Map<Integer, String> newWorkerIds, Map<Integer, LocalAssignment> reassignExecutors) throws IOException { - - Map<String, Integer> newValidWorkerIds = new HashMap<>(); - Map conf = supervisorData.getConf(); - String supervisorId = supervisorData.getSupervisorId(); - String clusterMode = ConfigUtils.clusterMode(conf); - - for (Map.Entry<Integer, LocalAssignment> entry : reassignExecutors.entrySet()) { - Integer port = entry.getKey(); - LocalAssignment assignment = entry.getValue(); - String workerId = newWorkerIds.get(port); - String stormId = assignment.get_topology_id(); - WorkerResources resources = assignment.get_resources(); - - // This condition checks for required files exist before launching the worker - if (SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) { - String pidsPath = ConfigUtils.workerPidsRoot(conf, workerId); - String hbPath = ConfigUtils.workerHeartbeatsRoot(conf, workerId); - - LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", assignment, supervisorData.getSupervisorId(), port, - workerId); - - FileUtils.forceMkdir(new File(pidsPath)); - FileUtils.forceMkdir(new File(ConfigUtils.workerTmpRoot(conf, workerId))); - FileUtils.forceMkdir(new File(hbPath)); - - if (clusterMode.endsWith("distributed")) { - launchDistributedWorker(supervisorData.getWorkerManager(), conf, supervisorId, supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, resources, supervisorData.getDeadWorkers()); - } else if (clusterMode.endsWith("local")) { - launchLocalWorker(supervisorData, stormId, port.longValue(), workerId, resources); - } - newValidWorkerIds.put(workerId, port); - - } else { - LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", assignment, - supervisorData.getSupervisorId(), port, workerId); - } - - } - return newValidWorkerIds; - } - - public void writeLogMetadata(Map stormconf, String user, String workerId, String stormId, Long port, Map conf) throws IOException { - Map data = new HashMap(); - data.put(Config.TOPOLOGY_SUBMITTER_USER, user); - data.put("worker-id", workerId); - - Set<String> logsGroups = new HashSet<>(); - //for supervisor-test - if (stormconf.get(Config.LOGS_GROUPS) != null) { - List<String> groups = (List<String>) stormconf.get(Config.LOGS_GROUPS); - for (String group : groups){ - logsGroups.add(group); - } - } - if (stormconf.get(Config.TOPOLOGY_GROUPS) != null) { - List<String> topGroups = (List<String>) stormconf.get(Config.TOPOLOGY_GROUPS); - logsGroups.addAll(topGroups); - } - data.put(Config.LOGS_GROUPS, logsGroups.toArray()); - - Set<String> logsUsers = new HashSet<>(); - if (stormconf.get(Config.LOGS_USERS) != null) { - List<String> logUsers = (List<String>) stormconf.get(Config.LOGS_USERS); - for (String logUser : logUsers){ - logsUsers.add(logUser); - } - } - if (stormconf.get(Config.TOPOLOGY_USERS) != null) { - List<String> topUsers = (List<String>) stormconf.get(Config.TOPOLOGY_USERS); - for (String logUser : topUsers){ - logsUsers.add(logUser); - } - } - data.put(Config.LOGS_USERS, logsUsers.toArray()); - writeLogMetadataToYamlFile(stormId, port, data, conf); - } - - /** - * run worker as user needs the directory to have special permissions or it is insecure - * - * @param stormId - * @param port - * @param data - * @param conf - * @throws IOException - */ - public void writeLogMetadataToYamlFile(String stormId, Long port, Map data, Map conf) throws IOException { - File file = ConfigUtils.getLogMetaDataFile(conf, stormId, port.intValue()); - - if (!Utils.checkFileExists(file.getParent())) { - if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { - FileUtils.forceMkdir(file.getParentFile()); - SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), file.getParentFile().getCanonicalPath()); - } else { - file.getParentFile().mkdirs(); - } - } - FileWriter writer = new FileWriter(file); - Yaml yaml = new Yaml(); - try { - yaml.dump(data, writer); - }finally { - writer.close(); - } - } - - /** - * Create a symlink from workder directory to its port artifacts directory - * - * @param conf - * @param stormId - * @param port - * @param workerId - */ - protected void createArtifactsLink(Map conf, String stormId, Long port, String workerId) throws IOException { - String workerDir = ConfigUtils.workerRoot(conf, workerId); - String topoDir = ConfigUtils.workerArtifactsRoot(conf, stormId); - if (Utils.checkFileExists(workerDir)) { - LOG.debug("Creating symlinks for worker-id: {} storm-id: {} to its port artifacts directory", workerId, stormId); - Utils.createSymlink(workerDir, topoDir, "artifacts", String.valueOf(port)); - } - } - - /** - * Create symlinks in worker launch directory for all blobs - * - * @param conf - * @param stormId - * @param workerId - * @throws IOException - */ - protected void createBlobstoreLinks(Map conf, String stormId, String workerId) throws IOException { - String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); - Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); - String workerRoot = ConfigUtils.workerRoot(conf, workerId); - Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); - List<String> blobFileNames = new ArrayList<>(); - if (blobstoreMap != null) { - for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) { - String key = entry.getKey(); - Map<String, Object> blobInfo = entry.getValue(); - String ret = null; - if (blobInfo != null && blobInfo.containsKey("localname")) { - ret = (String) blobInfo.get("localname"); - } else { - ret = key; - } - blobFileNames.add(ret); - } - } - List<String> resourceFileNames = new ArrayList<>(); - resourceFileNames.add(ConfigUtils.RESOURCES_SUBDIR); - resourceFileNames.addAll(blobFileNames); - LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", workerId, stormId, resourceFileNames.size(), resourceFileNames); - Utils.createSymlink(workerRoot, stormRoot, ConfigUtils.RESOURCES_SUBDIR); - for (String fileName : blobFileNames) { - Utils.createSymlink(workerRoot, stormRoot, fileName, fileName); - } - } - - public void killWorker(SupervisorData supervisorData, IWorkerManager workerManager, String workerId) throws IOException, InterruptedException{ - workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, supervisorData.getWorkerThreadPids()); - boolean success = workerManager.cleanupWorker(workerId); - if (success){ - supervisorData.getDeadWorkers().remove(workerId); - } - } -}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java deleted file mode 100644 index ce467ea..0000000 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java +++ /dev/null @@ -1,612 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.daemon.supervisor; - -import org.apache.commons.io.FileUtils; -import org.apache.storm.Config; -import org.apache.storm.blobstore.BlobStore; -import org.apache.storm.blobstore.ClientBlobStore; -import org.apache.storm.cluster.IStateStorage; -import org.apache.storm.cluster.IStormClusterState; -import org.apache.storm.event.EventManager; -import org.apache.storm.generated.*; -import org.apache.storm.localizer.LocalResource; -import org.apache.storm.localizer.LocalizedResource; -import org.apache.storm.localizer.Localizer; -import org.apache.storm.utils.*; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.net.JarURLConnection; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.StandardCopyOption; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; - -public class SyncSupervisorEvent implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); - - private EventManager syncSupEventManager; - private EventManager syncProcessManager; - private IStormClusterState stormClusterState; - private LocalState localState; - private SyncProcessEvent syncProcesses; - private SupervisorData supervisorData; - - public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, - EventManager syncProcessManager) { - - this.syncProcesses = syncProcesses; - this.syncSupEventManager = syncSupEventManager; - this.syncProcessManager = syncProcessManager; - this.stormClusterState = supervisorData.getStormClusterState(); - this.localState = supervisorData.getLocalState(); - this.supervisorData = supervisorData; - } - - @Override - public void run() { - try { - Map conf = supervisorData.getConf(); - Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); - List<String> stormIds = stormClusterState.assignments(syncCallback); - Map<String, Map<String, Object>> assignmentsSnapshot = - getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); - Map<String, List<ProfileRequest>> stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); - - Set<String> allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); - Map<String, String> stormcodeMap = readStormCodeLocations(assignmentsSnapshot); - Map<Integer, LocalAssignment> existingAssignment = localState.getLocalAssignmentsMap(); - if (existingAssignment == null) { - existingAssignment = new HashMap<>(); - } - - Map<Integer, LocalAssignment> allAssignment = - readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); - - Map<Integer, LocalAssignment> newAssignment = new HashMap<>(); - Set<String> assignedStormIds = new HashSet<>(); - - for (Map.Entry<Integer, LocalAssignment> entry : allAssignment.entrySet()) { - if (supervisorData.getiSupervisor().confirmAssigned(entry.getKey())) { - newAssignment.put(entry.getKey(), entry.getValue()); - assignedStormIds.add(entry.getValue().get_topology_id()); - } - } - - Set<String> crashedStormIds = verifyDownloadedFiles(conf, supervisorData.getLocalizer(), assignedStormIds, allDownloadedTopologyIds); - Set<String> downloadedStormIds = new HashSet<>(); - downloadedStormIds.addAll(allDownloadedTopologyIds); - downloadedStormIds.removeAll(crashedStormIds); - - LOG.debug("Synchronizing supervisor"); - LOG.debug("Storm code map: {}", stormcodeMap); - LOG.debug("All assignment: {}", allAssignment); - LOG.debug("New assignment: {}", newAssignment); - LOG.debug("Assigned Storm Ids {}", assignedStormIds); - LOG.debug("All Downloaded Ids {}", allDownloadedTopologyIds); - LOG.debug("Checked Downloaded Ids {}", crashedStormIds); - LOG.debug("Downloaded Ids {}", downloadedStormIds); - LOG.debug("Storm Ids Profiler Actions {}", stormIdToProfilerActions); - - // download code first - // This might take awhile - // - should this be done separately from usual monitoring? - // should we only download when topology is assigned to this supervisor? - for (Map.Entry<String, String> entry : stormcodeMap.entrySet()) { - String stormId = entry.getKey(); - if (!downloadedStormIds.contains(stormId) && assignedStormIds.contains(stormId)) { - LOG.info("Downloading code for storm id {}.", stormId); - try { - downloadStormCode(conf, stormId, entry.getValue(), supervisorData.getLocalizer()); - } catch (Exception e) { - if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) { - LOG.warn("Nimbus leader was not available.", e); - } else if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) { - LOG.warn("There was a connection problem with nimbus.", e); - } else { - throw e; - } - } - LOG.info("Finished downloading code for storm id {}", stormId); - } - } - - LOG.debug("Writing new assignment {}", newAssignment); - - Set<Integer> killWorkers = new HashSet<>(); - killWorkers.addAll(existingAssignment.keySet()); - killWorkers.removeAll(newAssignment.keySet()); - for (Integer port : killWorkers) { - supervisorData.getiSupervisor().killedWorker(port); - } - - supervisorData.getiSupervisor().assigned(newAssignment.keySet()); - localState.setLocalAssignmentsMap(newAssignment); - supervisorData.setAssignmentVersions(assignmentsSnapshot); - supervisorData.setStormIdToProfilerActions(stormIdToProfilerActions); - - Map<Long, LocalAssignment> convertNewAssignment = new HashMap<>(); - for (Map.Entry<Integer, LocalAssignment> entry : newAssignment.entrySet()) { - convertNewAssignment.put(entry.getKey().longValue(), entry.getValue()); - } - supervisorData.setCurrAssignment(convertNewAssignment); - - syncProcessManager.add(syncProcesses); - } catch (Exception e) { - LOG.error("Failed to Sync Supervisor", e); - throw new RuntimeException(e); - } - - } - - protected Map<String, Map<String, Object>> getAssignmentsSnapshot(IStormClusterState stormClusterState, List<String> stormIds, - Map<String, Map<String, Object>> localAssignmentVersion, Runnable callback) throws Exception { - Map<String, Map<String, Object>> updateAssignmentVersion = new HashMap<>(); - for (String stormId : stormIds) { - Integer recordedVersion = -1; - Integer version = stormClusterState.assignmentVersion(stormId, callback); - if (localAssignmentVersion.containsKey(stormId) && localAssignmentVersion.get(stormId) != null) { - recordedVersion = (Integer) localAssignmentVersion.get(stormId).get(IStateStorage.VERSION); - } - if (version == null) { - // ignore - } else if (version == recordedVersion) { - updateAssignmentVersion.put(stormId, localAssignmentVersion.get(stormId)); - } else { - Map<String, Object> assignmentVersion = (Map<String, Object>) stormClusterState.assignmentInfoWithVersion(stormId, callback); - updateAssignmentVersion.put(stormId, assignmentVersion); - } - } - return updateAssignmentVersion; - } - - protected Map<String, List<ProfileRequest>> getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) throws Exception { - Map<String, List<ProfileRequest>> ret = new HashMap<String, List<ProfileRequest>>(); - for (String stormId : stormIds) { - List<ProfileRequest> profileRequests = stormClusterState.getTopologyProfileRequests(stormId); - ret.put(stormId, profileRequests); - } - return ret; - } - - protected Map<String, String> readStormCodeLocations(Map<String, Map<String, Object>> assignmentsSnapshot) { - Map<String, String> stormcodeMap = new HashMap<>(); - for (Map.Entry<String, Map<String, Object>> entry : assignmentsSnapshot.entrySet()) { - Assignment assignment = (Assignment) (entry.getValue().get(IStateStorage.DATA)); - if (assignment != null) { - stormcodeMap.put(entry.getKey(), assignment.get_master_code_dir()); - } - } - return stormcodeMap; - } - - /** - * Check for the files exists to avoid supervisor crashing Also makes sure there is no necessity for locking" - * - * @param conf - * @param localizer - * @param assignedStormIds - * @param allDownloadedTopologyIds - * @return - */ - protected Set<String> verifyDownloadedFiles(Map conf, Localizer localizer, Set<String> assignedStormIds, Set<String> allDownloadedTopologyIds) - throws IOException { - Set<String> srashStormIds = new HashSet<>(); - for (String stormId : allDownloadedTopologyIds) { - if (assignedStormIds.contains(stormId)) { - if (!SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) { - LOG.debug("Files not present in topology directory"); - SupervisorUtils.rmTopoFiles(conf, stormId, localizer, false); - srashStormIds.add(stormId); - } - } - } - return srashStormIds; - } - - /** - * download code ; two cluster mode: local and distributed - * - * @param conf - * @param stormId - * @param masterCodeDir - * @throws IOException - */ - private void downloadStormCode(Map conf, String stormId, String masterCodeDir, Localizer localizer) throws Exception { - String clusterMode = ConfigUtils.clusterMode(conf); - - if (clusterMode.endsWith("distributed")) { - downloadDistributeStormCode(conf, stormId, masterCodeDir, localizer); - } else if (clusterMode.endsWith("local")) { - downloadLocalStormCode(conf, stormId, masterCodeDir, localizer); - } - } - - private void downloadLocalStormCode(Map conf, String stormId, String masterCodeDir, Localizer localizer) throws Exception { - - String tmproot = ConfigUtils.supervisorTmpDir(conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid(); - String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId); - BlobStore blobStore = Utils.getNimbusBlobStore(conf, masterCodeDir, null); - FileOutputStream codeOutStream = null; - FileOutputStream confOutStream = null; - try { - FileUtils.forceMkdir(new File(tmproot)); - String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId); - String stormConfKey = ConfigUtils.masterStormConfKey(stormId); - String codePath = ConfigUtils.supervisorStormCodePath(tmproot); - String confPath = ConfigUtils.supervisorStormConfPath(tmproot); - codeOutStream = new FileOutputStream(codePath); - blobStore.readBlobTo(stormCodeKey, codeOutStream, null); - confOutStream = new FileOutputStream(confPath); - blobStore.readBlobTo(stormConfKey, confOutStream, null); - } finally { - if (codeOutStream != null) - codeOutStream.close(); - if (confOutStream != null) - codeOutStream.close(); - blobStore.shutdown(); - } - FileUtils.moveDirectory(new File(tmproot), new File(stormroot)); - SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot); - ClassLoader classloader = Thread.currentThread().getContextClassLoader(); - - String resourcesJar = resourcesJar(); - - URL url = classloader.getResource(ConfigUtils.RESOURCES_SUBDIR); - - String targetDir = stormroot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR; - - if (resourcesJar != null) { - LOG.info("Extracting resources from jar at {} to {}", resourcesJar, targetDir); - Utils.extractDirFromJar(resourcesJar, ConfigUtils.RESOURCES_SUBDIR, stormroot); - } else if (url != null) { - - LOG.info("Copying resources at {} to {} ", url.toString(), targetDir); - if (url.getProtocol() == "jar") { - JarURLConnection urlConnection = (JarURLConnection) url.openConnection(); - Utils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), ConfigUtils.RESOURCES_SUBDIR, stormroot); - } else { - FileUtils.copyDirectory(new File(url.getFile()), (new File(targetDir))); - } - } - } - - /** - * Downloading to permanent location is atomic - * - * @param conf - * @param stormId - * @param masterCodeDir - * @param localizer - * @throws Exception - */ - private void downloadDistributeStormCode(Map conf, String stormId, String masterCodeDir, Localizer localizer) throws Exception { - - String tmproot = ConfigUtils.supervisorTmpDir(conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid(); - String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId); - ClientBlobStore blobStore = Utils.getClientBlobStoreForSupervisor(conf); - FileUtils.forceMkdir(new File(tmproot)); - if (Utils.isOnWindows()) { - if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { - throw new RuntimeException("ERROR: Windows doesn't implement setting the correct permissions"); - } - } else { - Utils.restrictPermissions(tmproot); - } - String stormJarKey = ConfigUtils.masterStormJarKey(stormId); - String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId); - String stormConfKey = ConfigUtils.masterStormConfKey(stormId); - String jarPath = ConfigUtils.supervisorStormJarPath(tmproot); - String codePath = ConfigUtils.supervisorStormCodePath(tmproot); - String confPath = ConfigUtils.supervisorStormConfPath(tmproot); - Utils.downloadResourcesAsSupervisor(stormJarKey, jarPath, blobStore); - Utils.downloadResourcesAsSupervisor(stormCodeKey, codePath, blobStore); - Utils.downloadResourcesAsSupervisor(stormConfKey, confPath, blobStore); - blobStore.shutdown(); - Utils.extractDirFromJar(jarPath, ConfigUtils.RESOURCES_SUBDIR, tmproot); - downloadBlobsForTopology(conf, confPath, localizer, tmproot); - downloadDependenciesForTopology(conf, confPath, codePath, localizer, tmproot); - if (areBlobsInBlobStoreMapDownloaded(confPath, tmproot) && - areDependencyFilesDownloaded(codePath, tmproot)) { - LOG.info("Successfully downloaded blob resources for storm-id {}", stormId); - if (Utils.isOnWindows()) { - // Files/move with non-empty directory doesn't work well on Windows - FileUtils.moveDirectory(new File(tmproot), new File(stormroot)); - } else { - FileUtils.forceMkdir(new File(stormroot)); - Files.move(new File(tmproot).toPath(), new File(stormroot).toPath(), StandardCopyOption.ATOMIC_MOVE); - } - } else { - LOG.info("Failed to download blob resources for storm-id ", stormId); - Utils.forceDelete(tmproot); - } - } - - /** - * Assert if all blobs are downloaded for the given topology - * - * @param targetDir - * @param blobFileNames - * @return - */ - protected boolean areBlobsDownloaded(String targetDir, List<String> blobFileNames) throws IOException { - for (String string : blobFileNames) { - if (!Utils.checkFileExists(targetDir, string)) { - LOG.info("Fail to find downloaded file: dir {} filename {}", targetDir, string); - return false; - } - } - return true; - } - - /** - * Assert if all blobs in blobstore map are downloaded for the given topology - * - * @param stormconfPath - * @param targetDir - * @return - */ - protected boolean areBlobsInBlobStoreMapDownloaded(String stormconfPath, String targetDir) throws IOException { - Map stormConf = Utils.fromCompressedJsonConf(FileUtils.readFileToByteArray(new File(stormconfPath))); - Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); - List<String> blobFileNames = new ArrayList<>(); - if (blobstoreMap != null) { - for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) { - String key = entry.getKey(); - Map<String, Object> blobInfo = entry.getValue(); - String ret = null; - if (blobInfo != null && blobInfo.containsKey("localname")) { - ret = (String) blobInfo.get("localname"); - } else { - ret = key; - } - blobFileNames.add(ret); - } - } - - return areBlobsDownloaded(targetDir, blobFileNames); - } - - /** - * Assert if all dependencies blobs are downloaded for the given topology - * - * @param stormcodePath - * @param targetDir - * @return - */ - protected boolean areDependencyFilesDownloaded(String stormcodePath, String targetDir) throws IOException { - StormTopology stormCode = ConfigUtils.readSupervisorStormCodeGivenPath(stormcodePath); - List<String> blobFileNames = new ArrayList<>(); - blobFileNames.addAll(stormCode.get_dependency_jars()); - blobFileNames.addAll(stormCode.get_dependency_artifacts()); - - return areBlobsDownloaded(targetDir, blobFileNames); - } - - // FIXME: refactor to downloadBlobsForTopology and downloadDependenciesForTopology to extract common code like 1.x of supervisor.clj - - /** - * Download all blobs listed in the topology configuration for a given topology. - * - * @param conf - * @param stormconfPath - * @param localizer - * @param tmpRoot - */ - protected void downloadBlobsForTopology(Map conf, String stormconfPath, Localizer localizer, String tmpRoot) throws IOException { - Map stormConf = ConfigUtils.readSupervisorStormConfGivenPath(conf, stormconfPath); - Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); - String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); - String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME); - File userDir = localizer.getLocalUserFileCacheDir(user); - List<LocalResource> localResourceList = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); - if (localResourceList.size() > 0) { - if (!userDir.exists()) { - FileUtils.forceMkdir(userDir); - } - try { - List<LocalizedResource> localizedResources = localizer.getBlobs(localResourceList, user, topoName, userDir); - setupBlobPermission(conf, user, userDir.toString()); - for (LocalizedResource localizedResource : localizedResources) { - File rsrcFilePath = new File(localizedResource.getFilePath()); - String keyName = rsrcFilePath.getName(); - String blobSymlinkTargetName = new File(localizedResource.getCurrentSymlinkPath()).getName(); - - String symlinkName = null; - if (blobstoreMap != null) { - Map<String, Object> blobInfo = blobstoreMap.get(keyName); - if (blobInfo != null && blobInfo.containsKey("localname")) { - symlinkName = (String) blobInfo.get("localname"); - } else { - symlinkName = keyName; - } - } - Utils.createSymlink(tmpRoot, rsrcFilePath.getParent(), symlinkName, blobSymlinkTargetName); - } - } catch (AuthorizationException authExp) { - LOG.error("AuthorizationException error {}", authExp); - } catch (KeyNotFoundException knf) { - LOG.error("KeyNotFoundException error {}", knf); - } - } - } - - /** - * Download all dependencies blobs listed in the topology configuration for a given topology. - * - * @param conf - * @param stormconfPath - * @param stormcodePath - * @param localizer - * @param tmpRoot - */ - protected void downloadDependenciesForTopology(Map conf, String stormconfPath, String stormcodePath, Localizer localizer, String tmpRoot) throws IOException { - Map stormConf = ConfigUtils.readSupervisorStormConfGivenPath(conf, stormconfPath); - StormTopology stormCode = ConfigUtils.readSupervisorStormCodeGivenPath(stormcodePath); - - List<String> dependencies = new ArrayList<>(); - dependencies.addAll(stormCode.get_dependency_jars()); - dependencies.addAll(stormCode.get_dependency_artifacts()); - - String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); - String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME); - File userDir = localizer.getLocalUserFileCacheDir(user); - - List<LocalResource> localResourceList = new ArrayList<>(); - for (String dependency : dependencies) { - localResourceList.add(new LocalResource(dependency, false)); - } - - if (localResourceList.size() > 0) { - if (!userDir.exists()) { - FileUtils.forceMkdir(userDir); - } - try { - List<LocalizedResource> localizedResources = localizer.getBlobs(localResourceList, user, topoName, userDir); - setupBlobPermission(conf, user, userDir.toString()); - for (LocalizedResource localizedResource : localizedResources) { - File rsrcFilePath = new File(localizedResource.getFilePath()); - String keyName = rsrcFilePath.getName(); - String blobSymlinkTargetName = new File(localizedResource.getCurrentSymlinkPath()).getName(); - - String symlinkName = keyName; - Utils.createSymlink(tmpRoot, rsrcFilePath.getParent(), symlinkName, blobSymlinkTargetName); - } - } catch (AuthorizationException authExp) { - LOG.error("AuthorizationException error {}", authExp); - } catch (KeyNotFoundException knf) { - LOG.error("KeyNotFoundException error {}", knf); - } - } - } - - protected void setupBlobPermission(Map conf, String user, String path) throws IOException { - if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { - String logPrefix = "setup blob permissions for " + path; - SupervisorUtils.processLauncherAndWait(conf, user, Arrays.asList("blob", path), null, logPrefix); - } - - } - - private String resourcesJar() throws IOException { - - String path = Utils.currentClasspath(); - if (path == null) { - return null; - } - String[] paths = path.split(File.pathSeparator); - List<String> jarPaths = new ArrayList<String>(); - for (String s : paths) { - if (s.endsWith(".jar")) { - jarPaths.add(s); - } - } - - List<String> rtn = new ArrayList<String>(); - int size = jarPaths.size(); - for (int i = 0; i < size; i++) { - if (Utils.zipDoesContainDir(jarPaths.get(i), ConfigUtils.RESOURCES_SUBDIR)) { - rtn.add(jarPaths.get(i)); - } - } - if (rtn.size() == 0) - return null; - - return rtn.get(0); - } - - protected Map<Integer, LocalAssignment> readAssignments(Map<String, Map<String, Object>> assignmentsSnapshot, - Map<Integer, LocalAssignment> existingAssignment, String assignmentId, AtomicInteger retries) { - try { - Map<Integer, LocalAssignment> portLA = new HashMap<Integer, LocalAssignment>(); - for (Map.Entry<String, Map<String, Object>> assignEntry : assignmentsSnapshot.entrySet()) { - String stormId = assignEntry.getKey(); - Assignment assignment = (Assignment) assignEntry.getValue().get(IStateStorage.DATA); - - Map<Integer, LocalAssignment> portTasks = readMyExecutors(stormId, assignmentId, assignment); - - for (Map.Entry<Integer, LocalAssignment> entry : portTasks.entrySet()) { - - Integer port = entry.getKey(); - - LocalAssignment la = entry.getValue(); - - if (!portLA.containsKey(port)) { - portLA.put(port, la); - } else { - throw new RuntimeException("Should not have multiple topologys assigned to one port"); - } - } - } - retries.set(0); - return portLA; - } catch (RuntimeException e) { - if (retries.get() > 2) { - throw e; - } else { - retries.addAndGet(1); - } - LOG.warn("{} : retrying {} of 3", e.getMessage(), retries.get()); - return existingAssignment; - } - } - - protected Map<Integer, LocalAssignment> readMyExecutors(String stormId, String assignmentId, Assignment assignment) { - Map<Integer, LocalAssignment> portTasks = new HashMap<>(); - Map<Long, WorkerResources> slotsResources = new HashMap<>(); - Map<NodeInfo, WorkerResources> nodeInfoWorkerResourcesMap = assignment.get_worker_resources(); - if (nodeInfoWorkerResourcesMap != null) { - for (Map.Entry<NodeInfo, WorkerResources> entry : nodeInfoWorkerResourcesMap.entrySet()) { - if (entry.getKey().get_node().equals(assignmentId)) { - Set<Long> ports = entry.getKey().get_port(); - for (Long port : ports) { - slotsResources.put(port, entry.getValue()); - } - } - } - } - Map<List<Long>, NodeInfo> executorNodePort = assignment.get_executor_node_port(); - if (executorNodePort != null) { - for (Map.Entry<List<Long>, NodeInfo> entry : executorNodePort.entrySet()) { - if (entry.getValue().get_node().equals(assignmentId)) { - for (Long port : entry.getValue().get_port()) { - LocalAssignment localAssignment = portTasks.get(port.intValue()); - if (localAssignment == null) { - List<ExecutorInfo> executors = new ArrayList<ExecutorInfo>(); - localAssignment = new LocalAssignment(stormId, executors); - if (slotsResources.containsKey(port)) { - localAssignment.set_resources(slotsResources.get(port)); - } - portTasks.put(port.intValue(), localAssignment); - } - List<ExecutorInfo> executorInfoList = localAssignment.get_executors(); - executorInfoList.add(new ExecutorInfo(entry.getKey().get(0).intValue(), entry.getKey().get(entry.getKey().size() - 1).intValue())); - } - } - } - } - return portTasks; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java deleted file mode 100644 index 3e1e34d..0000000 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java +++ /dev/null @@ -1,211 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.daemon.supervisor.timer; - -import com.google.common.collect.Lists; -import org.apache.storm.Config; -import org.apache.storm.cluster.IStormClusterState; -import org.apache.storm.daemon.supervisor.SupervisorData; -import org.apache.storm.daemon.supervisor.SupervisorUtils; -import org.apache.storm.generated.ProfileAction; -import org.apache.storm.generated.ProfileRequest; -import org.apache.storm.utils.ConfigUtils; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.util.*; - -public class RunProfilerActions implements Runnable { - private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class); - - private Map conf; - private IStormClusterState stormClusterState; - private String hostName; - - private String profileCmd; - - private SupervisorData supervisorData; - - private class ActionExitCallback implements Utils.ExitCodeCallable { - private String stormId; - private ProfileRequest profileRequest; - private String logPrefix; - private boolean stop; - - public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix, boolean stop) { - this.stormId = stormId; - this.profileRequest = profileRequest; - this.logPrefix = logPrefix; - this.stop = stop; - } - - @Override - public Object call() throws Exception { - return null; - } - - @Override - public Object call(int exitCode) { - LOG.info("{} profile-action exited for {}", logPrefix, exitCode); - try { - if (stop) - stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest); - } catch (Exception e) { - LOG.warn("failed delete profileRequest: " + profileRequest); - } - return null; - } - } - - public RunProfilerActions(SupervisorData supervisorData) { - this.conf = supervisorData.getConf(); - this.stormClusterState = supervisorData.getStormClusterState(); - this.hostName = supervisorData.getHostName(); - String stormHome = System.getProperty("storm.home"); - this.profileCmd = stormHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + conf.get(Config.WORKER_PROFILER_COMMAND); - this.supervisorData = supervisorData; - } - - @Override - public void run() { - Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfilerActions().get(); - try { - for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) { - String stormId = entry.getKey(); - List<ProfileRequest> requests = entry.getValue(); - if (requests != null) { - for (ProfileRequest profileRequest : requests) { - if (profileRequest.get_nodeInfo().get_node().equals(hostName)) { - boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp(); - Long port = profileRequest.get_nodeInfo().get_port().iterator().next(); - String targetDir = ConfigUtils.workerArtifactsRoot(conf, stormId, port.intValue()); - Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); - - String user = null; - if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) { - user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)); - } - Map<String, String> env = null; - if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) { - env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT); - } else { - env = new HashMap<String, String>(); - } - - String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue()); - StringBuilder stringBuilder = new StringBuilder(); - - String workerPid = null; - try (FileReader reader = new FileReader(str); BufferedReader br = new BufferedReader(reader)) { - workerPid = br.readLine().trim(); - } - ProfileAction profileAction = profileRequest.get_action(); - String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " "; - - // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted - // The profiler plugin script validates if JVM is recording before starting another recording. - List<String> command = mkCommand(profileAction, stop, workerPid, targetDir); - try { - ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix, stop); - launchProfilerActionForWorker(user, targetDir, command, env, actionExitCallback, logPrefix); - } catch (IOException e) { - LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port); - } catch (RuntimeException e) { - LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port); - } - } - } - } - } - } catch (Exception e) { - LOG.error("Error running profiler actions, will retry again later"); - } - } - - private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment, - final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException { - File targetFile = new File(targetDir); - if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { - LOG.info("Running as user:{} command:{}", user, commands); - String containerFile = Utils.containerFilePath(targetDir); - if (Utils.checkFileExists(containerFile)) { - SupervisorUtils.rmrAsUser(conf, containerFile, containerFile); - } - String scriptFile = Utils.scriptFilePath(targetDir); - if (Utils.checkFileExists(scriptFile)) { - SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile); - } - String script = Utils.writeScript(targetDir, commands, environment); - List<String> args = new ArrayList<>(); - args.add("profiler"); - args.add(targetDir); - args.add(script); - SupervisorUtils.processLauncher(conf, user, null, args, environment, logPrefix, exitCodeCallable, targetFile); - } else { - Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile); - } - } - - private List<String> mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) { - if (action == ProfileAction.JMAP_DUMP) { - return jmapDumpCmd(workerPid, targetDir); - } else if (action == ProfileAction.JSTACK_DUMP) { - return jstackDumpCmd(workerPid, targetDir); - } else if (action == ProfileAction.JPROFILE_DUMP) { - return jprofileDump(workerPid, targetDir); - } else if (action == ProfileAction.JVM_RESTART) { - return jprofileJvmRestart(workerPid); - } else if (!stop && action == ProfileAction.JPROFILE_STOP) { - return jprofileStart(workerPid); - } else if (stop && action == ProfileAction.JPROFILE_STOP) { - return jprofileStop(workerPid, targetDir); - } - return Lists.newArrayList(); - } - - private List<String> jmapDumpCmd(String pid, String targetDir) { - return Lists.newArrayList(profileCmd, pid, "jmap", targetDir); - } - - private List<String> jstackDumpCmd(String pid, String targetDir) { - return Lists.newArrayList(profileCmd, pid, "jstack", targetDir); - } - - private List<String> jprofileStart(String pid) { - return Lists.newArrayList(profileCmd, pid, "start"); - } - - private List<String> jprofileStop(String pid, String targetDir) { - return Lists.newArrayList(profileCmd, pid, "stop", targetDir); - } - - private List<String> jprofileDump(String pid, String targetDir) { - return Lists.newArrayList(profileCmd, pid, "dump", targetDir); - } - - private List<String> jprofileJvmRestart(String pid) { - return Lists.newArrayList(profileCmd, pid, "kill"); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java index 3ce8f5d..0017092 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java @@ -18,35 +18,24 @@ package org.apache.storm.daemon.supervisor.timer; -import org.apache.storm.command.HealthCheck; -import org.apache.storm.daemon.supervisor.SupervisorData; -import org.apache.storm.daemon.supervisor.SupervisorUtils; -import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; import java.util.Map; -public class SupervisorHealthCheck implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(SupervisorHealthCheck.class); +import org.apache.storm.command.HealthCheck; +import org.apache.storm.daemon.supervisor.Supervisor; - private SupervisorData supervisorData; +public class SupervisorHealthCheck implements Runnable { + private final Supervisor supervisor; - public SupervisorHealthCheck(SupervisorData supervisorData) { - this.supervisorData = supervisorData; + public SupervisorHealthCheck(Supervisor supervisor) { + this.supervisor = supervisor; } @Override public void run() { - Map conf = supervisorData.getConf(); - IWorkerManager workerManager = supervisorData.getWorkerManager(); + Map<String, Object> conf = supervisor.getConf(); int healthCode = HealthCheck.healthCheck(conf); if (healthCode != 0) { - SupervisorUtils.shutdownAllWorkers(conf, supervisorData.getSupervisorId(), supervisorData.getWorkerThreadPids(), supervisorData.getDeadWorkers(), - workerManager); + supervisor.shutdownAllWorkers(); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java index fd357c0..34c5682 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java @@ -19,7 +19,7 @@ package org.apache.storm.daemon.supervisor.timer; import org.apache.storm.Config; import org.apache.storm.cluster.IStormClusterState; -import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.Supervisor; import org.apache.storm.generated.SupervisorInfo; import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; @@ -33,26 +33,26 @@ public class SupervisorHeartbeat implements Runnable { private final IStormClusterState stormClusterState; private final String supervisorId; - private final Map conf; - private final SupervisorData supervisorData; + private final Map<String, Object> conf; + private final Supervisor supervisor; - public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) { - this.stormClusterState = supervisorData.getStormClusterState(); - this.supervisorId = supervisorData.getSupervisorId(); - this.supervisorData = supervisorData; + public SupervisorHeartbeat(Map<String, Object> conf, Supervisor supervisor) { + this.stormClusterState = supervisor.getStormClusterState(); + this.supervisorId = supervisor.getId(); + this.supervisor = supervisor; this.conf = conf; } - private SupervisorInfo buildSupervisorInfo(Map conf, SupervisorData supervisorData) { + private SupervisorInfo buildSupervisorInfo(Map<String, Object> conf, Supervisor supervisor) { SupervisorInfo supervisorInfo = new SupervisorInfo(); supervisorInfo.set_time_secs(Time.currentTimeSecs()); - supervisorInfo.set_hostname(supervisorData.getHostName()); - supervisorInfo.set_assignment_id(supervisorData.getAssignmentId()); + supervisorInfo.set_hostname(supervisor.getHostName()); + supervisorInfo.set_assignment_id(supervisor.getAssignmentId()); List<Long> usedPorts = new ArrayList<>(); - usedPorts.addAll(supervisorData.getCurrAssignment().get().keySet()); + usedPorts.addAll(supervisor.getCurrAssignment().get().keySet()); supervisorInfo.set_used_ports(usedPorts); - List metaDatas = (List)supervisorData.getiSupervisor().getMetadata(); + List metaDatas = (List)supervisor.getiSupervisor().getMetadata(); List<Long> portList = new ArrayList<>(); if (metaDatas != null){ for (Object data : metaDatas){ @@ -64,8 +64,8 @@ public class SupervisorHeartbeat implements Runnable { supervisorInfo.set_meta(portList); supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META)); - supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime()); - supervisorInfo.set_version(supervisorData.getStormVersion()); + supervisorInfo.set_uptime_secs(supervisor.getUpTime().upTime()); + supervisorInfo.set_version(supervisor.getStormVersion()); supervisorInfo.set_resources_map(mkSupervisorCapacities(conf)); return supervisorInfo; } @@ -81,7 +81,7 @@ public class SupervisorHeartbeat implements Runnable { @Override public void run() { - SupervisorInfo supervisorInfo = buildSupervisorInfo(conf, supervisorData); + SupervisorInfo supervisorInfo = buildSupervisorInfo(conf, supervisor); stormClusterState.supervisorHeartbeat(supervisorId, supervisorInfo); } } http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java index 159697f..0b6d996 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java @@ -18,7 +18,7 @@ package org.apache.storm.daemon.supervisor.timer; import org.apache.storm.Config; -import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.Supervisor; import org.apache.storm.daemon.supervisor.SupervisorUtils; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.KeyNotFoundException; @@ -47,18 +47,18 @@ public class UpdateBlobs implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(UpdateBlobs.class); - private SupervisorData supervisorData; + private Supervisor supervisor; - public UpdateBlobs(SupervisorData supervisorData) { - this.supervisorData = supervisorData; + public UpdateBlobs(Supervisor supervisor) { + this.supervisor = supervisor; } @Override public void run() { try { - Map conf = supervisorData.getConf(); - Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf); - AtomicReference<Map<Long, LocalAssignment>> newAssignment = supervisorData.getCurrAssignment(); + Map<String, Object> conf = supervisor.getConf(); + Set<String> downloadedStormIds = SupervisorUtils.readDownloadedTopologyIds(conf); + AtomicReference<Map<Long, LocalAssignment>> newAssignment = supervisor.getCurrAssignment(); Set<String> assignedStormIds = new HashSet<>(); for (LocalAssignment localAssignment : newAssignment.get().values()) { assignedStormIds.add(localAssignment.get_topology_id()); @@ -67,7 +67,7 @@ public class UpdateBlobs implements Runnable { if (assignedStormIds.contains(stormId)) { String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot); - updateBlobsForTopology(conf, stormId, supervisorData.getLocalizer()); + updateBlobsForTopology(conf, stormId, supervisor.getLocalizer()); } } } catch (Exception e) {