http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java index e96395f,0000000..4c08014 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java @@@ -1,641 -1,0 +1,631 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + + private EventManager syncSupEventManager; + private EventManager syncProcessManager; - + private IStormClusterState stormClusterState; - + private LocalState localState; - + private SyncProcessEvent syncProcesses; + private SupervisorData supervisorData; + + public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, + EventManager syncProcessManager) { + + this.syncProcesses = syncProcesses; + this.syncSupEventManager = syncSupEventManager; + this.syncProcessManager = syncProcessManager; + this.stormClusterState = supervisorData.getStormClusterState(); + this.localState = supervisorData.getLocalState(); + this.supervisorData = supervisorData; + } + + @Override + public void run() { + try { + Map conf = supervisorData.getConf(); + Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); + List<String> stormIds = stormClusterState.assignments(syncCallback); + Map<String, Map<String, Object>> assignmentsSnapshot = + getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); + Map<String, List<ProfileRequest>> stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + + Set<String> allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); + Map<String, String> stormcodeMap = readStormCodeLocations(assignmentsSnapshot); + Map<Integer, LocalAssignment> existingAssignment = localState.getLocalAssignmentsMap(); - if (existingAssignment == null){ ++ if (existingAssignment == null) { + existingAssignment = new HashMap<>(); + } + + Map<Integer, LocalAssignment> allAssignment = + readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + - + Map<Integer, LocalAssignment> newAssignment = new HashMap<>(); + Set<String> assignedStormIds = new HashSet<>(); + + for (Map.Entry<Integer, LocalAssignment> entry : allAssignment.entrySet()) { + if (supervisorData.getiSupervisor().confirmAssigned(entry.getKey())) { + newAssignment.put(entry.getKey(), entry.getValue()); + assignedStormIds.add(entry.getValue().get_topology_id()); + } + } + + Set<String> srashStormIds = verifyDownloadedFiles(conf, supervisorData.getLocalizer(), assignedStormIds, allDownloadedTopologyIds); + Set<String> downloadedStormIds = new HashSet<>(); + downloadedStormIds.addAll(allDownloadedTopologyIds); + downloadedStormIds.removeAll(srashStormIds); + + LOG.debug("Synchronizing supervisor"); + LOG.debug("Storm code map: {}", stormcodeMap); + LOG.debug("All assignment: {}", allAssignment); + LOG.debug("New assignment: {}", newAssignment); + LOG.debug("Assigned Storm Ids {}", assignedStormIds); + LOG.debug("All Downloaded Ids {}", allDownloadedTopologyIds); + LOG.debug("Checked Downloaded Ids {}", srashStormIds); + LOG.debug("Downloaded Ids {}", downloadedStormIds); + LOG.debug("Storm Ids Profiler Actions {}", stormIdToProfilerActions); + // download code first + // This might take awhile + // - should this be done separately from usual monitoring? + // should we only download when topology is assigned to this supervisor? + for (Map.Entry<String, String> entry : stormcodeMap.entrySet()) { + String stormId = entry.getKey(); + if (!downloadedStormIds.contains(stormId) && assignedStormIds.contains(stormId)) { + LOG.info("Downloading code for storm id {}.", stormId); + try { + downloadStormCode(conf, stormId, entry.getValue(), supervisorData.getLocalizer()); + } catch (Exception e) { + if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) { + LOG.warn("Nimbus leader was not available.", e); + } else if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) { + LOG.warn("There was a connection problem with nimbus.", e); + } else { + throw e; + } + } + LOG.info("Finished downloading code for storm id {}", stormId); + } + } + + LOG.debug("Writing new assignment {}", newAssignment); + + Set<Integer> killWorkers = new HashSet<>(); + killWorkers.addAll(existingAssignment.keySet()); + killWorkers.removeAll(newAssignment.keySet()); + for (Integer port : killWorkers) { + supervisorData.getiSupervisor().killedWorker(port); + } + + killExistingWorkersWithChangeInComponents(supervisorData, existingAssignment, newAssignment); + + supervisorData.getiSupervisor().assigned(newAssignment.keySet()); + localState.setLocalAssignmentsMap(newAssignment); + supervisorData.setAssignmentVersions(assignmentsSnapshot); + supervisorData.setStormIdToProfileActions(stormIdToProfilerActions); + + Map<Long, LocalAssignment> convertNewAssignment = new HashMap<>(); + for (Map.Entry<Integer, LocalAssignment> entry : newAssignment.entrySet()) { + convertNewAssignment.put(entry.getKey().longValue(), entry.getValue()); + } + supervisorData.setCurrAssignment(convertNewAssignment); + // remove any downloaded code that's no longer assigned or active + // important that this happens after setting the local assignment so that + // synchronize-supervisor doesn't try to launch workers for which the + // resources don't exist + if (Utils.isOnWindows()) { + shutdownDisallowedWorkers(); + } + for (String stormId : allDownloadedTopologyIds) { + if (!stormcodeMap.containsKey(stormId)) { + LOG.info("Removing code for storm id {}.", stormId); + rmTopoFiles(conf, stormId, supervisorData.getLocalizer(), true); + } + } + syncProcessManager.add(syncProcesses); + } catch (Exception e) { + LOG.error("Failed to Sync Supervisor", e); + throw new RuntimeException(e); + } + + } + + private void killExistingWorkersWithChangeInComponents(SupervisorData supervisorData, Map<Integer, LocalAssignment> existingAssignment, + Map<Integer, LocalAssignment> newAssignment) throws Exception { + LocalState localState = supervisorData.getLocalState(); + Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap(); + if (assignedExecutors == null) { + assignedExecutors = new HashMap<>(); + } + int now = Time.currentTimeSecs(); + Map<String, StateHeartbeat> workerIdHbstate = syncProcesses.getLocalWorkerStats(supervisorData, assignedExecutors, now); + Map<Integer, String> vaildPortToWorkerIds = new HashMap<>(); + for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()) { + String workerId = entry.getKey(); + StateHeartbeat stateHeartbeat = entry.getValue(); + if (stateHeartbeat != null && stateHeartbeat.getState() == State.VALID) { + vaildPortToWorkerIds.put(stateHeartbeat.getHeartbeat().get_port(), workerId); + } + } + + Map<Integer, LocalAssignment> intersectAssignment = new HashMap<>(); + for (Map.Entry<Integer, LocalAssignment> entry : newAssignment.entrySet()) { + Integer port = entry.getKey(); + if (existingAssignment.containsKey(port)) { + intersectAssignment.put(port, entry.getValue()); + } + } + + for (Integer port : intersectAssignment.keySet()) { + List<ExecutorInfo> existExecutors = existingAssignment.get(port).get_executors(); + List<ExecutorInfo> newExecutors = newAssignment.get(port).get_executors(); + if (newExecutors.size() != existExecutors.size()) { + syncProcesses.shutWorker(supervisorData, vaildPortToWorkerIds.get(port)); + continue; + } + for (ExecutorInfo executorInfo : newExecutors) { + if (!existExecutors.contains(executorInfo)) { + syncProcesses.shutWorker(supervisorData, vaildPortToWorkerIds.get(port)); + break; + } + } + + } + } ++ + protected Map<String, Map<String, Object>> getAssignmentsSnapshot(IStormClusterState stormClusterState, List<String> stormIds, + Map<String, Map<String, Object>> localAssignmentVersion, Runnable callback) throws Exception { + Map<String, Map<String, Object>> updateAssignmentVersion = new HashMap<>(); + for (String stormId : stormIds) { + Integer recordedVersion = -1; + Integer version = stormClusterState.assignmentVersion(stormId, callback); + if (localAssignmentVersion.containsKey(stormId) && localAssignmentVersion.get(stormId) != null) { + recordedVersion = (Integer) localAssignmentVersion.get(stormId).get(IStateStorage.VERSION); + } + if (version == null) { + // ignore + } else if (version == recordedVersion) { + updateAssignmentVersion.put(stormId, localAssignmentVersion.get(stormId)); + } else { + Map<String, Object> assignmentVersion = (Map<String, Object>) stormClusterState.assignmentInfoWithVersion(stormId, callback); + updateAssignmentVersion.put(stormId, assignmentVersion); + } + } + return updateAssignmentVersion; + } + + protected Map<String, List<ProfileRequest>> getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) throws Exception { + Map<String, List<ProfileRequest>> ret = new HashMap<String, List<ProfileRequest>>(); + for (String stormId : stormIds) { + List<ProfileRequest> profileRequests = stormClusterState.getTopologyProfileRequests(stormId); + ret.put(stormId, profileRequests); + } + return ret; + } + + protected Map<String, String> readStormCodeLocations(Map<String, Map<String, Object>> assignmentsSnapshot) { + Map<String, String> stormcodeMap = new HashMap<>(); + for (Map.Entry<String, Map<String, Object>> entry : assignmentsSnapshot.entrySet()) { + Assignment assignment = (Assignment) (entry.getValue().get(IStateStorage.DATA)); + if (assignment != null) { + stormcodeMap.put(entry.getKey(), assignment.get_master_code_dir()); + } + } + return stormcodeMap; + } + + /** + * Remove a reference to a blob when its no longer needed. + * + * @param localizer + * @param stormId + * @param conf + */ + protected void removeBlobReferences(Localizer localizer, String stormId, Map conf) throws Exception { + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME); + if (blobstoreMap != null) { + for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) { + String key = entry.getKey(); + Map<String, Object> blobInfo = entry.getValue(); + localizer.removeBlobReference(key, user, topoName, SupervisorUtils.shouldUncompressBlob(blobInfo)); + } + } + } + + protected void rmTopoFiles(Map conf, String stormId, Localizer localizer, boolean isrmBlobRefs) throws IOException { + String path = ConfigUtils.supervisorStormDistRoot(conf, stormId); + try { + if (isrmBlobRefs) { + removeBlobReferences(localizer, stormId, conf); + } + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + SupervisorUtils.rmrAsUser(conf, stormId, path); + } else { + Utils.forceDelete(ConfigUtils.supervisorStormDistRoot(conf, stormId)); + } + } catch (Exception e) { + LOG.info("Exception removing: {} ", stormId, e); + } + } + + /** + * Check for the files exists to avoid supervisor crashing Also makes sure there is no necessity for locking" + * + * @param conf + * @param localizer + * @param assignedStormIds + * @param allDownloadedTopologyIds + * @return + */ + protected Set<String> verifyDownloadedFiles(Map conf, Localizer localizer, Set<String> assignedStormIds, Set<String> allDownloadedTopologyIds) + throws IOException { + Set<String> srashStormIds = new HashSet<>(); + for (String stormId : allDownloadedTopologyIds) { + if (assignedStormIds.contains(stormId)) { + if (!SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) { + LOG.debug("Files not present in topology directory"); + rmTopoFiles(conf, stormId, localizer, false); + srashStormIds.add(stormId); + } + } + } + return srashStormIds; + } + + /** + * download code ; two cluster mode: local and distributed + * + * @param conf + * @param stormId + * @param masterCodeDir + * @throws IOException + */ + private void downloadStormCode(Map conf, String stormId, String masterCodeDir, Localizer localizer) throws Exception { + String clusterMode = ConfigUtils.clusterMode(conf); + + if (clusterMode.endsWith("distributed")) { + downloadDistributeStormCode(conf, stormId, masterCodeDir, localizer); + } else if (clusterMode.endsWith("local")) { + downloadLocalStormCode(conf, stormId, masterCodeDir, localizer); + } + } + + private void downloadLocalStormCode(Map conf, String stormId, String masterCodeDir, Localizer localizer) throws Exception { + + String tmproot = ConfigUtils.supervisorTmpDir(conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid(); + String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + BlobStore blobStore = Utils.getNimbusBlobStore(conf, masterCodeDir, null); + try { + FileUtils.forceMkdir(new File(tmproot)); + String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId); + String stormConfKey = ConfigUtils.masterStormConfKey(stormId); + String codePath = ConfigUtils.supervisorStormCodePath(tmproot); + String confPath = ConfigUtils.supervisorStormConfPath(tmproot); + blobStore.readBlobTo(stormCodeKey, new FileOutputStream(codePath), null); + blobStore.readBlobTo(stormConfKey, new FileOutputStream(confPath), null); + } finally { + blobStore.shutdown(); + } - - try { - FileUtils.moveDirectory(new File(tmproot), new File(stormroot)); - }catch (Exception e){ - ; - } - - ++ FileUtils.moveDirectory(new File(tmproot), new File(stormroot)); + SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot); + ClassLoader classloader = Thread.currentThread().getContextClassLoader(); + + String resourcesJar = resourcesJar(); + + URL url = classloader.getResource(ConfigUtils.RESOURCES_SUBDIR); + + String targetDir = stormroot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR; + + if (resourcesJar != null) { + LOG.info("Extracting resources from jar at {} to {}", resourcesJar, targetDir); + Utils.extractDirFromJar(resourcesJar, ConfigUtils.RESOURCES_SUBDIR, stormroot); + } else if (url != null) { + + LOG.info("Copying resources at {} to {} ", url.toString(), targetDir); + if (url.getProtocol() == "jar") { + JarURLConnection urlConnection = (JarURLConnection) url.openConnection(); + Utils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), ConfigUtils.RESOURCES_SUBDIR, stormroot); + } else { + FileUtils.copyDirectory(new File(url.getFile()), (new File(targetDir))); + } + } + } + + /** + * Downloading to permanent location is atomic + * + * @param conf + * @param stormId + * @param masterCodeDir + * @param localizer + * @throws Exception + */ + private void downloadDistributeStormCode(Map conf, String stormId, String masterCodeDir, Localizer localizer) throws Exception { + + String tmproot = ConfigUtils.supervisorTmpDir(conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid(); + String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + ClientBlobStore blobStore = Utils.getClientBlobStoreForSupervisor(conf); + FileUtils.forceMkdir(new File(tmproot)); + if (Utils.isOnWindows()) { + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + throw new RuntimeException("ERROR: Windows doesn't implement setting the correct permissions"); + } + } else { + Utils.restrictPermissions(tmproot); + } + String stormJarKey = ConfigUtils.masterStormJarKey(stormId); + String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId); + String stormConfKey = ConfigUtils.masterStormConfKey(stormId); + String jarPath = ConfigUtils.supervisorStormJarPath(tmproot); + String codePath = ConfigUtils.supervisorStormCodePath(tmproot); + String confPath = ConfigUtils.supervisorStormConfPath(tmproot); + Utils.downloadResourcesAsSupervisor(stormJarKey, jarPath, blobStore); + Utils.downloadResourcesAsSupervisor(stormCodeKey, codePath, blobStore); + Utils.downloadResourcesAsSupervisor(stormConfKey, confPath, blobStore); + blobStore.shutdown(); + Utils.extractDirFromJar(jarPath, ConfigUtils.RESOURCES_SUBDIR, tmproot); + downloadBlobsForTopology(conf, confPath, localizer, tmproot); + if (IsDownloadBlobsForTopologySucceed(confPath, tmproot)) { + LOG.info("Successfully downloaded blob resources for storm-id {}", stormId); + FileUtils.forceMkdir(new File(stormroot)); + Files.move(new File(tmproot).toPath(), new File(stormroot).toPath(), StandardCopyOption.ATOMIC_MOVE); + SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot); + } else { + LOG.info("Failed to download blob resources for storm-id ", stormId); + Utils.forceDelete(tmproot); + } + } + + /** + * Assert if all blobs are downloaded for the given topology + * + * @param stormconfPath + * @param targetDir + * @return + */ + protected boolean IsDownloadBlobsForTopologySucceed(String stormconfPath, String targetDir) throws IOException { + Map stormConf = Utils.fromCompressedJsonConf(FileUtils.readFileToByteArray(new File(stormconfPath))); + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + List<String> blobFileNames = new ArrayList<>(); + if (blobstoreMap != null) { + for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) { + String key = entry.getKey(); + Map<String, Object> blobInfo = entry.getValue(); + String ret = null; + if (blobInfo != null && blobInfo.containsKey("localname")) { + ret = (String) blobInfo.get("localname"); + } else { + ret = key; + } + blobFileNames.add(ret); + } + } + for (String string : blobFileNames) { + if (!Utils.checkFileExists(string)) + return false; + } + return true; + } + + /** + * Download all blobs listed in the topology configuration for a given topology. + * + * @param conf + * @param stormconfPath + * @param localizer + * @param tmpRoot + */ + protected void downloadBlobsForTopology(Map conf, String stormconfPath, Localizer localizer, String tmpRoot) throws IOException { + Map stormConf = ConfigUtils.readSupervisorStormConfGivenPath(conf, stormconfPath); + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME); + File userDir = localizer.getLocalUserFileCacheDir(user); + List<LocalResource> localResourceList = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); + if (localResourceList.size() > 0) { + if (!userDir.exists()) { + FileUtils.forceMkdir(userDir); + } + try { + List<LocalizedResource> localizedResources = localizer.getBlobs(localResourceList, user, topoName, userDir); + setupBlobPermission(conf, user, userDir.toString()); + for (LocalizedResource localizedResource : localizedResources) { + File rsrcFilePath = new File(localizedResource.getFilePath()); + String keyName = rsrcFilePath.getName(); + String blobSymlinkTargetName = new File(localizedResource.getCurrentSymlinkPath()).getName(); + + String symlinkName = null; + if (blobstoreMap != null) { + Map<String, Object> blobInfo = blobstoreMap.get(keyName); + if (blobInfo != null && blobInfo.containsKey("localname")) { + symlinkName = (String) blobInfo.get("localname"); + } else { + symlinkName = keyName; + } + } + Utils.createSymlink(tmpRoot, rsrcFilePath.getParent(), symlinkName, blobSymlinkTargetName); + } + } catch (AuthorizationException authExp) { + LOG.error("AuthorizationException error {}", authExp); + } catch (KeyNotFoundException knf) { + LOG.error("KeyNotFoundException error {}", knf); + } + } + } + + protected void setupBlobPermission(Map conf, String user, String path) throws IOException { + if (Utils.getBoolean(Config.SUPERVISOR_RUN_WORKER_AS_USER, false)) { + String logPrefix = "setup blob permissions for " + path; + SupervisorUtils.workerLauncherAndWait(conf, user, Arrays.asList("blob", path), null, logPrefix); + } + + } + + private String resourcesJar() throws IOException { + + String path = Utils.currentClasspath(); + if (path == null) { + return null; + } + String[] paths = path.split(File.pathSeparator); + List<String> jarPaths = new ArrayList<String>(); + for (String s : paths) { + if (s.endsWith(".jar")) { + jarPaths.add(s); + } + } + + List<String> rtn = new ArrayList<String>(); + int size = jarPaths.size(); + for (int i = 0; i < size; i++) { + if (Utils.zipDoesContainDir(jarPaths.get(i), ConfigUtils.RESOURCES_SUBDIR)) { + rtn.add(jarPaths.get(i)); + } + } + if (rtn.size() == 0) + return null; + + return rtn.get(0); + } + + protected Map<Integer, LocalAssignment> readAssignments(Map<String, Map<String, Object>> assignmentsSnapshot, + Map<Integer, LocalAssignment> existingAssignment, String assignmentId, AtomicInteger retries) { + try { + Map<Integer, LocalAssignment> portLA = new HashMap<Integer, LocalAssignment>(); + for (Map.Entry<String, Map<String, Object>> assignEntry : assignmentsSnapshot.entrySet()) { + String stormId = assignEntry.getKey(); + Assignment assignment = (Assignment) assignEntry.getValue().get(IStateStorage.DATA); + + Map<Integer, LocalAssignment> portTasks = readMyExecutors(stormId, assignmentId, assignment); + + for (Map.Entry<Integer, LocalAssignment> entry : portTasks.entrySet()) { + + Integer port = entry.getKey(); + + LocalAssignment la = entry.getValue(); + + if (!portLA.containsKey(port)) { + portLA.put(port, la); + } else { + throw new RuntimeException("Should not have multiple topologys assigned to one port"); + } + } + } + retries.set(0); + return portLA; + } catch (RuntimeException e) { + if (retries.get() > 2) { + throw e; + } else { + retries.addAndGet(1); + } + LOG.warn("{} : retrying {} of 3", e.getMessage(), retries.get()); + return existingAssignment; + } + } + + protected Map<Integer, LocalAssignment> readMyExecutors(String stormId, String assignmentId, Assignment assignment) { + Map<Integer, LocalAssignment> portTasks = new HashMap<>(); + Map<Long, WorkerResources> slotsResources = new HashMap<>(); + Map<NodeInfo, WorkerResources> nodeInfoWorkerResourcesMap = assignment.get_worker_resources(); + if (nodeInfoWorkerResourcesMap != null) { + for (Map.Entry<NodeInfo, WorkerResources> entry : nodeInfoWorkerResourcesMap.entrySet()) { + if (entry.getKey().get_node().equals(assignmentId)) { + Set<Long> ports = entry.getKey().get_port(); + for (Long port : ports) { + slotsResources.put(port, entry.getValue()); + } + } + } + } + Map<List<Long>, NodeInfo> executorNodePort = assignment.get_executor_node_port(); + if (executorNodePort != null) { + for (Map.Entry<List<Long>, NodeInfo> entry : executorNodePort.entrySet()) { + if (entry.getValue().get_node().equals(assignmentId)) { + for (Long port : entry.getValue().get_port()) { + LocalAssignment localAssignment = portTasks.get(port.intValue()); + if (localAssignment == null) { + List<ExecutorInfo> executors = new ArrayList<ExecutorInfo>(); + localAssignment = new LocalAssignment(stormId, executors); + if (slotsResources.containsKey(port)) { + localAssignment.set_resources(slotsResources.get(port)); + } + portTasks.put(port.intValue(), localAssignment); + } + List<ExecutorInfo> executorInfoList = localAssignment.get_executors(); + executorInfoList.add(new ExecutorInfo(entry.getKey().get(0).intValue(), entry.getKey().get(entry.getKey().size() - 1).intValue())); + } + } + } + } + return portTasks; + } + + // I konw it's not a good idea to create SyncProcessEvent, but I only hope SyncProcessEvent is responsible for start/shutdown - //workers, and SyncSupervisorEvent is responsible for download/remove topologys' binary. - protected void shutdownDisallowedWorkers() throws Exception{ ++ // workers, and SyncSupervisorEvent is responsible for download/remove topologys' binary. ++ protected void shutdownDisallowedWorkers() throws Exception { + LocalState localState = supervisorData.getLocalState(); + Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap(); + if (assignedExecutors == null) { + assignedExecutors = new HashMap<>(); + } + int now = Time.currentTimeSecs(); + Map<String, StateHeartbeat> workerIdHbstate = syncProcesses.getLocalWorkerStats(supervisorData, assignedExecutors, now); + LOG.debug("Allocated workers ", assignedExecutors); - for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()){ ++ for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()) { + String workerId = entry.getKey(); + StateHeartbeat stateHeartbeat = entry.getValue(); - if (stateHeartbeat.getState() == State.DISALLOWED){ ++ if (stateHeartbeat.getState() == State.DISALLOWED) { + syncProcesses.shutWorker(supervisorData, workerId); + LOG.debug("{}'s state disallowed, so shutdown this worker"); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java index 49f48ef,0000000..f6b3ed6 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java @@@ -1,57 -1,0 +1,56 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.command.HealthCheck; - import org.apache.storm.daemon.supervisor.ShutdownWork; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; + +public class SupervisorHealthCheck implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(SupervisorHealthCheck.class); + + private SupervisorData supervisorData; + + public SupervisorHealthCheck(SupervisorData supervisorData) { + this.supervisorData = supervisorData; + } + + @Override + public void run() { + Map conf = supervisorData.getConf(); + int healthCode = HealthCheck.healthCheck(conf); + Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf); + if (healthCode != 0) { + for (String workerId : workerIds) { + try { + SupervisorUtils.shutWorker(supervisorData, workerId); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java index ebb1d5f,0000000..159697f mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java @@@ -1,106 -1,0 +1,105 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.KeyNotFoundException; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.NimbusLeaderNotFoundException; +import org.apache.storm.utils.Utils; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; - import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +/** + * downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The + * Runnable is intended to be run periodically by a timer, created elsewhere. + */ +public class UpdateBlobs implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(UpdateBlobs.class); + + private SupervisorData supervisorData; + + public UpdateBlobs(SupervisorData supervisorData) { + this.supervisorData = supervisorData; + } + + @Override + public void run() { + try { + Map conf = supervisorData.getConf(); + Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf); + AtomicReference<Map<Long, LocalAssignment>> newAssignment = supervisorData.getCurrAssignment(); + Set<String> assignedStormIds = new HashSet<>(); + for (LocalAssignment localAssignment : newAssignment.get().values()) { + assignedStormIds.add(localAssignment.get_topology_id()); + } + for (String stormId : downloadedStormIds) { + if (assignedStormIds.contains(stormId)) { + String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot); + updateBlobsForTopology(conf, stormId, supervisorData.getLocalizer()); + } + } + } catch (Exception e) { + if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) { + LOG.error("Network error while updating blobs, will retry again later", e); + } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) { + LOG.error("Nimbus unavailable to update blobs, will retry again later", e); + } else { + throw Utils.wrapInRuntime(e); + } + } + } + + /** + * Update each blob listed in the topology configuration if the latest version of the blob has not been downloaded. + * + * @param conf + * @param stormId + * @param localizer + * @throws IOException + */ + private void updateBlobsForTopology(Map conf, String stormId, Localizer localizer) throws IOException { + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); + try { + localizer.updateBlobs(localresources, user); + } catch (AuthorizationException authExp) { + LOG.error("AuthorizationException error", authExp); + } catch (KeyNotFoundException knf) { + LOG.error("KeyNotFoundException error", knf); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java index 0000000,28f334b..103c2ce mode 000000,100644..100644 --- a/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java +++ b/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java @@@ -1,0 -1,84 +1,86 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.storm.metric; + + import clojure.lang.IFn; + import com.codahale.metrics.Gauge; + import com.codahale.metrics.Meter; + import com.codahale.metrics.Metric; + import com.codahale.metrics.MetricRegistry; + import java.util.Map; ++import java.util.concurrent.Callable; ++ + import org.apache.storm.daemon.metrics.MetricsUtils; + import org.apache.storm.daemon.metrics.reporters.PreparableReporter; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + @SuppressWarnings("unchecked") + public class StormMetricsRegistry { + private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistry.class); + public static final MetricRegistry DEFAULT_REGISTRY = new MetricRegistry(); + + public static Meter registerMeter(String name) { + Meter meter = new Meter(); + return register(name, meter); + } + - // TODO: should replace fn to Gauge<Integer> when nimbus.clj is translated to java - public static Gauge<Integer> registerGauge(final String name, final IFn fn) { ++ // TODO: should replace Callable to Gauge<Integer> when nimbus.clj is translated to java ++ public static Gauge<Integer> registerGauge(final String name, final Callable fn) { + Gauge<Integer> gauge = new Gauge<Integer>() { + @Override + public Integer getValue() { + try { + return (Integer) fn.call(); + } catch (Exception e) { + LOG.error("Error getting gauge value for {}", name, e); + } + return 0; + } + }; + return register(name, gauge); + } + + public static void startMetricsReporters(Map stormConf) { + for (PreparableReporter reporter : MetricsUtils.getPreparableReporters(stormConf)) { + startMetricsReporter(reporter, stormConf); + } + } + + private static void startMetricsReporter(PreparableReporter reporter, Map stormConf) { + reporter.prepare(StormMetricsRegistry.DEFAULT_REGISTRY, stormConf); + reporter.start(); + LOG.info("Started statistics report plugin..."); + } + + private static <T extends Metric> T register(String name, T metric) { + T ret; + try { + ret = DEFAULT_REGISTRY.register(name, metric); + } catch (IllegalArgumentException e) { + // swallow IllegalArgumentException when the metric exists already + ret = (T) DEFAULT_REGISTRY.getMetrics().get(name); + if (ret == null) { + throw e; + } else { + LOG.warn("Metric {} has already been registered", name); + } + } + return ret; + } + }
