[
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192876#comment-15192876
]
ASF GitHub Bot commented on STORM-1279:
---------------------------------------
Github user abhishekagarwal87 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1184#discussion_r55965454
--- Diff:
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
---
@@ -0,0 +1,631 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.*;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.LocalizedResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.*;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SyncSupervisorEvent implements Runnable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SyncSupervisorEvent.class);
+
+ private EventManager syncSupEventManager;
+ private EventManager syncProcessManager;
+ private IStormClusterState stormClusterState;
+ private LocalState localState;
+ private SyncProcessEvent syncProcesses;
+ private SupervisorData supervisorData;
+
+ public SyncSupervisorEvent(SupervisorData supervisorData,
SyncProcessEvent syncProcesses, EventManager syncSupEventManager,
+ EventManager syncProcessManager) {
+
+ this.syncProcesses = syncProcesses;
+ this.syncSupEventManager = syncSupEventManager;
+ this.syncProcessManager = syncProcessManager;
+ this.stormClusterState = supervisorData.getStormClusterState();
+ this.localState = supervisorData.getLocalState();
+ this.supervisorData = supervisorData;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Map conf = supervisorData.getConf();
+ Runnable syncCallback = new EventManagerPushCallback(this,
syncSupEventManager);
+ List<String> stormIds =
stormClusterState.assignments(syncCallback);
+ Map<String, Map<String, Object>> assignmentsSnapshot =
+ getAssignmentsSnapshot(stormClusterState, stormIds,
supervisorData.getAssignmentVersions().get(), syncCallback);
+ Map<String, List<ProfileRequest>> stormIdToProfilerActions =
getProfileActions(stormClusterState, stormIds);
+
+ Set<String> allDownloadedTopologyIds =
SupervisorUtils.readDownLoadedStormIds(conf);
+ Map<String, String> stormcodeMap =
readStormCodeLocations(assignmentsSnapshot);
+ Map<Integer, LocalAssignment> existingAssignment =
localState.getLocalAssignmentsMap();
+ if (existingAssignment == null) {
+ existingAssignment = new HashMap<>();
+ }
+
+ Map<Integer, LocalAssignment> allAssignment =
+ readAssignments(assignmentsSnapshot,
existingAssignment, supervisorData.getAssignmentId(),
supervisorData.getSyncRetry());
+
+ Map<Integer, LocalAssignment> newAssignment = new HashMap<>();
+ Set<String> assignedStormIds = new HashSet<>();
+
+ for (Map.Entry<Integer, LocalAssignment> entry :
allAssignment.entrySet()) {
+ if
(supervisorData.getiSupervisor().confirmAssigned(entry.getKey())) {
+ newAssignment.put(entry.getKey(), entry.getValue());
+
assignedStormIds.add(entry.getValue().get_topology_id());
+ }
+ }
+
+ Set<String> srashStormIds = verifyDownloadedFiles(conf,
supervisorData.getLocalizer(), assignedStormIds, allDownloadedTopologyIds);
+ Set<String> downloadedStormIds = new HashSet<>();
+ downloadedStormIds.addAll(allDownloadedTopologyIds);
+ downloadedStormIds.removeAll(srashStormIds);
+
+ LOG.debug("Synchronizing supervisor");
+ LOG.debug("Storm code map: {}", stormcodeMap);
+ LOG.debug("All assignment: {}", allAssignment);
+ LOG.debug("New assignment: {}", newAssignment);
+ LOG.debug("Assigned Storm Ids {}", assignedStormIds);
+ LOG.debug("All Downloaded Ids {}", allDownloadedTopologyIds);
+ LOG.debug("Checked Downloaded Ids {}", srashStormIds);
+ LOG.debug("Downloaded Ids {}", downloadedStormIds);
+ LOG.debug("Storm Ids Profiler Actions {}",
stormIdToProfilerActions);
+ // download code first
+ // This might take awhile
+ // - should this be done separately from usual monitoring?
+ // should we only download when topology is assigned to this
supervisor?
+ for (Map.Entry<String, String> entry :
stormcodeMap.entrySet()) {
+ String stormId = entry.getKey();
+ if (!downloadedStormIds.contains(stormId) &&
assignedStormIds.contains(stormId)) {
+ LOG.info("Downloading code for storm id {}.", stormId);
+ try {
+ downloadStormCode(conf, stormId, entry.getValue(),
supervisorData.getLocalizer());
+ } catch (Exception e) {
+ if
(Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
+ LOG.warn("Nimbus leader was not available.",
e);
+ } else if
(Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
+ LOG.warn("There was a connection problem with
nimbus.", e);
+ } else {
+ throw e;
+ }
+ }
+ LOG.info("Finished downloading code for storm id {}",
stormId);
+ }
+ }
+
+ LOG.debug("Writing new assignment {}", newAssignment);
+
+ Set<Integer> killWorkers = new HashSet<>();
+ killWorkers.addAll(existingAssignment.keySet());
+ killWorkers.removeAll(newAssignment.keySet());
+ for (Integer port : killWorkers) {
+ supervisorData.getiSupervisor().killedWorker(port);
+ }
+
+ killExistingWorkersWithChangeInComponents(supervisorData,
existingAssignment, newAssignment);
+
+
supervisorData.getiSupervisor().assigned(newAssignment.keySet());
+ localState.setLocalAssignmentsMap(newAssignment);
+ supervisorData.setAssignmentVersions(assignmentsSnapshot);
+
supervisorData.setStormIdToProfileActions(stormIdToProfilerActions);
+
+ Map<Long, LocalAssignment> convertNewAssignment = new
HashMap<>();
+ for (Map.Entry<Integer, LocalAssignment> entry :
newAssignment.entrySet()) {
+ convertNewAssignment.put(entry.getKey().longValue(),
entry.getValue());
+ }
+ supervisorData.setCurrAssignment(convertNewAssignment);
+ // remove any downloaded code that's no longer assigned or
active
+ // important that this happens after setting the local
assignment so that
+ // synchronize-supervisor doesn't try to launch workers for
which the
+ // resources don't exist
+ if (Utils.isOnWindows()) {
+ shutdownDisallowedWorkers();
+ }
+ for (String stormId : allDownloadedTopologyIds) {
+ if (!stormcodeMap.containsKey(stormId)) {
+ LOG.info("Removing code for storm id {}.", stormId);
+ rmTopoFiles(conf, stormId,
supervisorData.getLocalizer(), true);
+ }
+ }
+ syncProcessManager.add(syncProcesses);
+ } catch (Exception e) {
+ LOG.error("Failed to Sync Supervisor", e);
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ private void killExistingWorkersWithChangeInComponents(SupervisorData
supervisorData, Map<Integer, LocalAssignment> existingAssignment,
+ Map<Integer, LocalAssignment> newAssignment) throws Exception {
+ LocalState localState = supervisorData.getLocalState();
+ Map<Integer, LocalAssignment> assignedExecutors =
localState.getLocalAssignmentsMap();
+ if (assignedExecutors == null) {
+ assignedExecutors = new HashMap<>();
+ }
+ int now = Time.currentTimeSecs();
+ Map<String, StateHeartbeat> workerIdHbstate =
syncProcesses.getLocalWorkerStats(supervisorData, assignedExecutors, now);
+ Map<Integer, String> vaildPortToWorkerIds = new HashMap<>();
+ for (Map.Entry<String, StateHeartbeat> entry :
workerIdHbstate.entrySet()) {
+ String workerId = entry.getKey();
+ StateHeartbeat stateHeartbeat = entry.getValue();
+ if (stateHeartbeat != null && stateHeartbeat.getState() ==
State.VALID) {
+
vaildPortToWorkerIds.put(stateHeartbeat.getHeartbeat().get_port(), workerId);
+ }
+ }
+
+ Map<Integer, LocalAssignment> intersectAssignment = new
HashMap<>();
+ for (Map.Entry<Integer, LocalAssignment> entry :
newAssignment.entrySet()) {
+ Integer port = entry.getKey();
+ if (existingAssignment.containsKey(port)) {
+ intersectAssignment.put(port, entry.getValue());
+ }
+ }
+
+ for (Integer port : intersectAssignment.keySet()) {
+ List<ExecutorInfo> existExecutors =
existingAssignment.get(port).get_executors();
+ List<ExecutorInfo> newExecutors =
newAssignment.get(port).get_executors();
+ if (newExecutors.size() != existExecutors.size()) {
+ syncProcesses.shutWorker(supervisorData,
vaildPortToWorkerIds.get(port));
+ continue;
+ }
+ for (ExecutorInfo executorInfo : newExecutors) {
+ if (!existExecutors.contains(executorInfo)) {
+ syncProcesses.shutWorker(supervisorData,
vaildPortToWorkerIds.get(port));
+ break;
+ }
+ }
+
+ }
+ }
+
+ protected Map<String, Map<String, Object>>
getAssignmentsSnapshot(IStormClusterState stormClusterState, List<String>
stormIds,
+ Map<String, Map<String, Object>> localAssignmentVersion,
Runnable callback) throws Exception {
+ Map<String, Map<String, Object>> updateAssignmentVersion = new
HashMap<>();
+ for (String stormId : stormIds) {
+ Integer recordedVersion = -1;
+ Integer version = stormClusterState.assignmentVersion(stormId,
callback);
+ if (localAssignmentVersion.containsKey(stormId) &&
localAssignmentVersion.get(stormId) != null) {
+ recordedVersion = (Integer)
localAssignmentVersion.get(stormId).get(IStateStorage.VERSION);
+ }
+ if (version == null) {
+ // ignore
+ } else if (version == recordedVersion) {
+ updateAssignmentVersion.put(stormId,
localAssignmentVersion.get(stormId));
+ } else {
+ Map<String, Object> assignmentVersion = (Map<String,
Object>) stormClusterState.assignmentInfoWithVersion(stormId, callback);
+ updateAssignmentVersion.put(stormId, assignmentVersion);
+ }
+ }
+ return updateAssignmentVersion;
+ }
+
+ protected Map<String, List<ProfileRequest>>
getProfileActions(IStormClusterState stormClusterState, List<String> stormIds)
throws Exception {
+ Map<String, List<ProfileRequest>> ret = new HashMap<String,
List<ProfileRequest>>();
+ for (String stormId : stormIds) {
+ List<ProfileRequest> profileRequests =
stormClusterState.getTopologyProfileRequests(stormId);
+ ret.put(stormId, profileRequests);
+ }
+ return ret;
+ }
+
+ protected Map<String, String> readStormCodeLocations(Map<String,
Map<String, Object>> assignmentsSnapshot) {
+ Map<String, String> stormcodeMap = new HashMap<>();
+ for (Map.Entry<String, Map<String, Object>> entry :
assignmentsSnapshot.entrySet()) {
+ Assignment assignment = (Assignment)
(entry.getValue().get(IStateStorage.DATA));
+ if (assignment != null) {
+ stormcodeMap.put(entry.getKey(),
assignment.get_master_code_dir());
+ }
+ }
+ return stormcodeMap;
+ }
+
+ /**
+ * Remove a reference to a blob when its no longer needed.
+ *
+ * @param localizer
+ * @param stormId
+ * @param conf
+ */
+ protected void removeBlobReferences(Localizer localizer, String
stormId, Map conf) throws Exception {
+ Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+ Map<String, Map<String, Object>> blobstoreMap = (Map<String,
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+ String user = (String)
stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+ String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
+ if (blobstoreMap != null) {
+ for (Map.Entry<String, Map<String, Object>> entry :
blobstoreMap.entrySet()) {
+ String key = entry.getKey();
+ Map<String, Object> blobInfo = entry.getValue();
+ localizer.removeBlobReference(key, user, topoName,
SupervisorUtils.shouldUncompressBlob(blobInfo));
+ }
+ }
+ }
+
+ protected void rmTopoFiles(Map conf, String stormId, Localizer
localizer, boolean isrmBlobRefs) throws IOException {
+ String path = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+ try {
+ if (isrmBlobRefs) {
+ removeBlobReferences(localizer, stormId, conf);
+ }
+ if
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ SupervisorUtils.rmrAsUser(conf, stormId, path);
+ } else {
+
Utils.forceDelete(ConfigUtils.supervisorStormDistRoot(conf, stormId));
+ }
+ } catch (Exception e) {
+ LOG.info("Exception removing: {} ", stormId, e);
+ }
+ }
+
+ /**
+ * Check for the files exists to avoid supervisor crashing Also makes
sure there is no necessity for locking"
+ *
+ * @param conf
+ * @param localizer
+ * @param assignedStormIds
+ * @param allDownloadedTopologyIds
+ * @return
+ */
+ protected Set<String> verifyDownloadedFiles(Map conf, Localizer
localizer, Set<String> assignedStormIds, Set<String> allDownloadedTopologyIds)
+ throws IOException {
+ Set<String> srashStormIds = new HashSet<>();
+ for (String stormId : allDownloadedTopologyIds) {
+ if (assignedStormIds.contains(stormId)) {
+ if (!SupervisorUtils.doRequiredTopoFilesExist(conf,
stormId)) {
+ LOG.debug("Files not present in topology directory");
+ rmTopoFiles(conf, stormId, localizer, false);
+ srashStormIds.add(stormId);
+ }
+ }
+ }
+ return srashStormIds;
+ }
+
+ /**
+ * download code ; two cluster mode: local and distributed
+ *
+ * @param conf
+ * @param stormId
+ * @param masterCodeDir
+ * @throws IOException
+ */
+ private void downloadStormCode(Map conf, String stormId, String
masterCodeDir, Localizer localizer) throws Exception {
+ String clusterMode = ConfigUtils.clusterMode(conf);
+
+ if (clusterMode.endsWith("distributed")) {
+ downloadDistributeStormCode(conf, stormId, masterCodeDir,
localizer);
+ } else if (clusterMode.endsWith("local")) {
+ downloadLocalStormCode(conf, stormId, masterCodeDir,
localizer);
+ }
+ }
+
+ private void downloadLocalStormCode(Map conf, String stormId, String
masterCodeDir, Localizer localizer) throws Exception {
+
+ String tmproot = ConfigUtils.supervisorTmpDir(conf) +
Utils.FILE_PATH_SEPARATOR + Utils.uuid();
+ String stormroot = ConfigUtils.supervisorStormDistRoot(conf,
stormId);
+ BlobStore blobStore = Utils.getNimbusBlobStore(conf,
masterCodeDir, null);
+ try {
+ FileUtils.forceMkdir(new File(tmproot));
+ String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId);
+ String stormConfKey = ConfigUtils.masterStormConfKey(stormId);
+ String codePath = ConfigUtils.supervisorStormCodePath(tmproot);
+ String confPath = ConfigUtils.supervisorStormConfPath(tmproot);
+ blobStore.readBlobTo(stormCodeKey, new
FileOutputStream(codePath), null);
+ blobStore.readBlobTo(stormConfKey, new
FileOutputStream(confPath), null);
+ } finally {
+ blobStore.shutdown();
+ }
+ FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
+ SupervisorUtils.setupStormCodeDir(conf,
ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
+ ClassLoader classloader =
Thread.currentThread().getContextClassLoader();
+
+ String resourcesJar = resourcesJar();
+
+ URL url = classloader.getResource(ConfigUtils.RESOURCES_SUBDIR);
+
+ String targetDir = stormroot + Utils.FILE_PATH_SEPARATOR +
ConfigUtils.RESOURCES_SUBDIR;
+
+ if (resourcesJar != null) {
+ LOG.info("Extracting resources from jar at {} to {}",
resourcesJar, targetDir);
+ Utils.extractDirFromJar(resourcesJar,
ConfigUtils.RESOURCES_SUBDIR, stormroot);
+ } else if (url != null) {
+
+ LOG.info("Copying resources at {} to {} ", url.toString(),
targetDir);
+ if (url.getProtocol() == "jar") {
+ JarURLConnection urlConnection = (JarURLConnection)
url.openConnection();
+
Utils.extractDirFromJar(urlConnection.getJarFileURL().getFile(),
ConfigUtils.RESOURCES_SUBDIR, stormroot);
+ } else {
+ FileUtils.copyDirectory(new File(url.getFile()), (new
File(targetDir)));
+ }
+ }
+ }
+
+ /**
+ * Downloading to permanent location is atomic
+ *
+ * @param conf
+ * @param stormId
+ * @param masterCodeDir
+ * @param localizer
+ * @throws Exception
+ */
+ private void downloadDistributeStormCode(Map conf, String stormId,
String masterCodeDir, Localizer localizer) throws Exception {
+
+ String tmproot = ConfigUtils.supervisorTmpDir(conf) +
Utils.FILE_PATH_SEPARATOR + Utils.uuid();
+ String stormroot = ConfigUtils.supervisorStormDistRoot(conf,
stormId);
+ ClientBlobStore blobStore =
Utils.getClientBlobStoreForSupervisor(conf);
+ FileUtils.forceMkdir(new File(tmproot));
+ if (Utils.isOnWindows()) {
+ if
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ throw new RuntimeException("ERROR: Windows doesn't
implement setting the correct permissions");
+ }
+ } else {
+ Utils.restrictPermissions(tmproot);
+ }
+ String stormJarKey = ConfigUtils.masterStormJarKey(stormId);
+ String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId);
+ String stormConfKey = ConfigUtils.masterStormConfKey(stormId);
+ String jarPath = ConfigUtils.supervisorStormJarPath(tmproot);
+ String codePath = ConfigUtils.supervisorStormCodePath(tmproot);
+ String confPath = ConfigUtils.supervisorStormConfPath(tmproot);
+ Utils.downloadResourcesAsSupervisor(stormJarKey, jarPath,
blobStore);
+ Utils.downloadResourcesAsSupervisor(stormCodeKey, codePath,
blobStore);
+ Utils.downloadResourcesAsSupervisor(stormConfKey, confPath,
blobStore);
+ blobStore.shutdown();
+ Utils.extractDirFromJar(jarPath, ConfigUtils.RESOURCES_SUBDIR,
tmproot);
+ downloadBlobsForTopology(conf, confPath, localizer, tmproot);
+ if (IsDownloadBlobsForTopologySucceed(confPath, tmproot)) {
+ LOG.info("Successfully downloaded blob resources for storm-id
{}", stormId);
+ FileUtils.forceMkdir(new File(stormroot));
+ Files.move(new File(tmproot).toPath(), new
File(stormroot).toPath(), StandardCopyOption.ATOMIC_MOVE);
+ SupervisorUtils.setupStormCodeDir(conf,
ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
+ } else {
+ LOG.info("Failed to download blob resources for storm-id ",
stormId);
+ Utils.forceDelete(tmproot);
+ }
+ }
+
+ /**
+ * Assert if all blobs are downloaded for the given topology
+ *
+ * @param stormconfPath
+ * @param targetDir
+ * @return
+ */
+ protected boolean IsDownloadBlobsForTopologySucceed(String
stormconfPath, String targetDir) throws IOException {
+ Map stormConf =
Utils.fromCompressedJsonConf(FileUtils.readFileToByteArray(new
File(stormconfPath)));
+ Map<String, Map<String, Object>> blobstoreMap = (Map<String,
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+ List<String> blobFileNames = new ArrayList<>();
+ if (blobstoreMap != null) {
+ for (Map.Entry<String, Map<String, Object>> entry :
blobstoreMap.entrySet()) {
+ String key = entry.getKey();
+ Map<String, Object> blobInfo = entry.getValue();
+ String ret = null;
+ if (blobInfo != null && blobInfo.containsKey("localname"))
{
+ ret = (String) blobInfo.get("localname");
+ } else {
+ ret = key;
+ }
+ blobFileNames.add(ret);
+ }
+ }
+ for (String string : blobFileNames) {
+ if (!Utils.checkFileExists(string))
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Download all blobs listed in the topology configuration for a given
topology.
+ *
+ * @param conf
+ * @param stormconfPath
+ * @param localizer
+ * @param tmpRoot
+ */
+ protected void downloadBlobsForTopology(Map conf, String
stormconfPath, Localizer localizer, String tmpRoot) throws IOException {
+ Map stormConf = ConfigUtils.readSupervisorStormConfGivenPath(conf,
stormconfPath);
+ Map<String, Map<String, Object>> blobstoreMap = (Map<String,
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+ String user = (String)
stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+ String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
+ File userDir = localizer.getLocalUserFileCacheDir(user);
+ List<LocalResource> localResourceList =
SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+ if (localResourceList.size() > 0) {
+ if (!userDir.exists()) {
+ FileUtils.forceMkdir(userDir);
+ }
+ try {
+ List<LocalizedResource> localizedResources =
localizer.getBlobs(localResourceList, user, topoName, userDir);
+ setupBlobPermission(conf, user, userDir.toString());
+ for (LocalizedResource localizedResource :
localizedResources) {
+ File rsrcFilePath = new
File(localizedResource.getFilePath());
+ String keyName = rsrcFilePath.getName();
+ String blobSymlinkTargetName = new
File(localizedResource.getCurrentSymlinkPath()).getName();
+
+ String symlinkName = null;
+ if (blobstoreMap != null) {
+ Map<String, Object> blobInfo =
blobstoreMap.get(keyName);
+ if (blobInfo != null &&
blobInfo.containsKey("localname")) {
--- End diff --
this logic was wrapped inside get-blob-localname in clojure. In java, it is
duplicated in three places.
> port backtype.storm.daemon.supervisor to java
> ---------------------------------------------
>
> Key: STORM-1279
> URL: https://issues.apache.org/jira/browse/STORM-1279
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: John Fang
> Labels: java-migration, jstorm-merger
> Attachments: Discussion about supervisor.pdf
>
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor
> as an example
> backtype.storm.event usage should be replaced with built-in java threadpools.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)