http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java deleted file mode 100644 index f549d0f..0000000 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java +++ /dev/null @@ -1,429 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.daemon.supervisor.workermanager; - -import org.apache.commons.lang.StringUtils; -import org.apache.storm.Config; -import org.apache.storm.ProcessSimulator; -import org.apache.storm.container.cgroup.CgroupManager; -import org.apache.storm.daemon.supervisor.SupervisorUtils; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.generated.WorkerResources; -import org.apache.storm.localizer.Localizer; -import org.apache.storm.utils.ConfigUtils; -import org.apache.storm.utils.Time; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.*; - -public class DefaultWorkerManager implements IWorkerManager { - - private static Logger LOG = LoggerFactory.getLogger(DefaultWorkerManager.class); - - private Map conf; - private CgroupManager resourceIsolationManager; - private boolean runWorkerAsUser; - - @Override - public void prepareWorker(Map conf, Localizer localizer) { - this.conf = conf; - if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) { - try { - this.resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN)); - this.resourceIsolationManager.prepare(conf); - LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager); - } catch (IOException e) { - throw Utils.wrapInRuntime(e); - } - } else { - this.resourceIsolationManager = null; - } - this.runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); - } - - @Override - public void launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources, - Utils.ExitCodeCallable workerExitCallback) { - try { - - String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); - String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options")); - String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file")); - String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId); - - String stormLogDir = ConfigUtils.getLogDir(); - String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR)); - - String stormLog4j2ConfDir; - if (StringUtils.isNotBlank(stormLogConfDir)) { - if (Utils.isAbsolutePath(stormLogConfDir)) { - stormLog4j2ConfDir = stormLogConfDir; - } else { - stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir; - } - } else { - stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2"; - } - - String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); - - String jlp = jlp(stormRoot, conf); - - String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot); - - StormTopology stormTopology = ConfigUtils.readSupervisorTopology(conf, stormId); - - List<String> dependencyLocations = new ArrayList<>(); - if (stormTopology.get_dependency_jars() != null) { - for (String dependency : stormTopology.get_dependency_jars()) { - dependencyLocations.add(new File(stormRoot, dependency).getAbsolutePath()); - } - } - - if (stormTopology.get_dependency_artifacts() != null) { - for (String dependency : stormTopology.get_dependency_artifacts()) { - dependencyLocations.add(new File(stormRoot, dependency).getAbsolutePath()); - } - } - - Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); - - String workerClassPath = getWorkerClassPath(stormJar, stormConf, dependencyLocations); - - Object topGcOptsObject = stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS); - List<String> topGcOpts = new ArrayList<>(); - if (topGcOptsObject instanceof String) { - topGcOpts.add((String) topGcOptsObject); - } else if (topGcOptsObject instanceof List) { - topGcOpts.addAll((List<String>) topGcOptsObject); - } - - int memOnheap = 0; - if (resources.get_mem_on_heap() > 0) { - memOnheap = (int) Math.ceil(resources.get_mem_on_heap()); - } else { - // set the default heap memory size for supervisor-test - memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768); - } - - int memoffheap = (int) Math.ceil(resources.get_mem_off_heap()); - - int cpu = (int) Math.ceil(resources.get_cpu()); - - List<String> gcOpts = null; - - if (topGcOpts.size() > 0) { - gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, memOnheap); - } else { - gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, port, memOnheap); - } - - Object topoWorkerLogwriterObject = stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS); - List<String> topoWorkerLogwriterChildopts = new ArrayList<>(); - if (topoWorkerLogwriterObject instanceof String) { - topoWorkerLogwriterChildopts.add((String) topoWorkerLogwriterObject); - } else if (topoWorkerLogwriterObject instanceof List) { - topoWorkerLogwriterChildopts.addAll((List<String>) topoWorkerLogwriterObject); - } - - String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); - - String logfileName = "worker.log"; - - String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf); - - String loggingSensitivity = (String) stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY); - if (loggingSensitivity == null) { - loggingSensitivity = "S3"; - } - - List<String> workerChildopts = substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, memOnheap); - - List<String> topWorkerChildopts = substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, stormId, port, memOnheap); - - List<String> workerProfilerChildopts = null; - if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) { - workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap); - } else { - workerProfilerChildopts = new ArrayList<>(); - } - - Map<String, String> topEnvironment = new HashMap<String, String>(); - Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT); - if (environment != null) { - topEnvironment.putAll(environment); - } - topEnvironment.put("LD_LIBRARY_PATH", jlp); - - String log4jConfigurationFile = null; - if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) { - log4jConfigurationFile = "file:///" + stormLog4j2ConfDir; - } else { - log4jConfigurationFile = stormLog4j2ConfDir; - } - log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml"; - - List<String> commandList = new ArrayList<>(); - commandList.add(SupervisorUtils.javaCmd("java")); - commandList.add("-cp"); - commandList.add(workerClassPath); - commandList.addAll(topoWorkerLogwriterChildopts); - commandList.add("-Dlogfile.name=" + logfileName); - commandList.add("-Dstorm.home=" + stormHome); - commandList.add("-Dworkers.artifacts=" + workersArtifacets); - commandList.add("-Dstorm.id=" + stormId); - commandList.add("-Dworker.id=" + workerId); - commandList.add("-Dworker.port=" + port); - commandList.add("-Dstorm.log.dir=" + stormLogDir); - commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile); - commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"); - commandList.add("org.apache.storm.LogWriter"); - - commandList.add(SupervisorUtils.javaCmd("java")); - commandList.add("-server"); - commandList.addAll(workerChildopts); - commandList.addAll(topWorkerChildopts); - commandList.addAll(gcOpts); - commandList.addAll(workerProfilerChildopts); - commandList.add("-Djava.library.path=" + jlp); - commandList.add("-Dlogfile.name=" + logfileName); - commandList.add("-Dstorm.home=" + stormHome); - commandList.add("-Dworkers.artifacts=" + workersArtifacets); - commandList.add("-Dstorm.conf.file=" + stormConfFile); - commandList.add("-Dstorm.options=" + stormOptions); - commandList.add("-Dstorm.log.dir=" + stormLogDir); - commandList.add("-Djava.io.tmpdir=" + workerTmpDir); - commandList.add("-Dlogging.sensitivity=" + loggingSensitivity); - commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile); - commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"); - commandList.add("-Dstorm.id=" + stormId); - commandList.add("-Dworker.id=" + workerId); - commandList.add("-Dworker.port=" + port); - commandList.add("-cp"); - commandList.add(workerClassPath); - commandList.add("org.apache.storm.daemon.worker"); - commandList.add(stormId); - commandList.add(assignmentId); - commandList.add(String.valueOf(port)); - commandList.add(workerId); - - // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB)))) - if (resourceIsolationManager != null) { - int cGroupMem = (int) (Math.ceil((double) conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB))); - int memoryValue = memoffheap + memOnheap + cGroupMem; - int cpuValue = cpu; - Map<String, Number> map = new HashMap<>(); - map.put("cpu", cpuValue); - map.put("memory", memoryValue); - resourceIsolationManager.reserveResourcesForWorker(workerId, map); - commandList = resourceIsolationManager.getLaunchCommand(workerId, commandList); - } - - LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList)); - - String logPrefix = "Worker Process " + workerId; - String workerDir = ConfigUtils.workerRoot(conf, workerId); - - if (runWorkerAsUser) { - List<String> args = new ArrayList<>(); - args.add("worker"); - args.add(workerDir); - args.add(Utils.writeScript(workerDir, commandList, topEnvironment)); - List<String> commandPrefix = null; - if (resourceIsolationManager != null) - commandPrefix = resourceIsolationManager.getLaunchCommandPrefix(workerId); - SupervisorUtils.processLauncher(conf, user, commandPrefix, args, null, logPrefix, workerExitCallback, new File(workerDir)); - } else { - Utils.launchProcess(commandList, topEnvironment, logPrefix, workerExitCallback, new File(workerDir)); - } - } catch (IOException e) { - throw Utils.wrapInRuntime(e); - } - } - - @Override - public void shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids) { - try { - LOG.info("Shutting down {}:{}", supervisorId, workerId); - Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId)); - Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)); - String user = ConfigUtils.getWorkerUser(conf, workerId); - String threadPid = workerThreadPids.get(workerId); - if (StringUtils.isNotBlank(threadPid)) { - ProcessSimulator.killProcess(threadPid); - } - - for (String pid : pids) { - if (runWorkerAsUser) { - List<String> commands = new ArrayList<>(); - commands.add("signal"); - commands.add(pid); - commands.add("15"); - String logPrefix = "kill -15 " + pid; - SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix); - } else { - Utils.killProcessWithSigTerm(pid); - } - } - - if (pids.size() > 0) { - LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs); - Time.sleepSecs(shutdownSleepSecs); - } - - for (String pid : pids) { - if (runWorkerAsUser) { - List<String> commands = new ArrayList<>(); - commands.add("signal"); - commands.add(pid); - commands.add("9"); - String logPrefix = "kill -9 " + pid; - SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix); - } else { - Utils.forceKillProcess(pid); - } - String path = ConfigUtils.workerPidPath(conf, workerId, pid); - if (runWorkerAsUser) { - SupervisorUtils.rmrAsUser(conf, workerId, path); - } else { - try { - LOG.debug("Removing path {}", path); - new File(path).delete(); - } catch (Exception e) { - // on windows, the supervisor may still holds the lock on the worker directory - // ignore - } - } - } - LOG.info("Shut down {}:{}", supervisorId, workerId); - } catch (Exception e) { - throw Utils.wrapInRuntime(e); - } - } - - @Override - public boolean cleanupWorker(String workerId) { - try { - //clean up for resource isolation if enabled - if (resourceIsolationManager != null) { - resourceIsolationManager.releaseResourcesForWorker(workerId); - } - //Always make sure to clean up everything else before worker directory - //is removed since that is what is going to trigger the retry for cleanup - String workerRoot = ConfigUtils.workerRoot(conf, workerId); - if (Utils.checkFileExists(workerRoot)) { - if (runWorkerAsUser) { - SupervisorUtils.rmrAsUser(conf, workerId, workerRoot); - } else { - Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId)); - Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId)); - Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId)); - Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId)); - } - ConfigUtils.removeWorkerUserWSE(conf, workerId); - } - return true; - } catch (IOException e) { - LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e); - } catch (RuntimeException e) { - LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e); - } - return false; - } - - protected String jlp(String stormRoot, Map conf) { - String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR; - String os = System.getProperty("os.name").replaceAll("\\s+", "_"); - String arch = System.getProperty("os.arch"); - String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch; - String ret = archResourceRoot + Utils.CLASS_PATH_SEPARATOR + resourceRoot + Utils.CLASS_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH); - return ret; - } - - protected String getWorkerClassPath(String stormJar, Map stormConf, List<String> dependencyLocations) { - List<String> topoClasspath = new ArrayList<>(); - Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH); - - // Will be populated only if STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED is set on Nimbus. - // Allowed for extreme debugging. - Object topologyClasspathFirst = stormConf.get(Config.TOPOLOGY_CLASSPATH_BEGINNING); - List<String> firstClasspathList = new ArrayList<>(); - if(topologyClasspathFirst instanceof List) { - firstClasspathList.addAll((List<String>)topologyClasspathFirst); - } else if (topologyClasspathFirst instanceof String) { - firstClasspathList.add((String) topologyClasspathFirst); - } - LOG.debug("Topology Classpath Prefix: {}", firstClasspathList); - - if (object instanceof List) { - topoClasspath.addAll((List<String>) object); - } else if (object instanceof String) { - topoClasspath.add((String) object); - } - LOG.debug("Topology specific classpath is {}", object); - - String classPath = Utils.addToClasspath(firstClasspathList, Arrays.asList(Utils.workerClasspath())); - String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar)); - String classDepAddedPath = Utils.addToClasspath(classAddPath, dependencyLocations); - return Utils.addToClasspath(classDepAddedPath, topoClasspath); - } - - private static String substituteChildOptsInternal(String string, String workerId, String stormId, Long port, int memOnheap) { - if (StringUtils.isNotBlank(string)){ - string = string.replace("%ID%", String.valueOf(port)); - string = string.replace("%WORKER-ID%", workerId); - string = string.replace("%TOPOLOGY-ID%", stormId); - string = string.replace("%WORKER-PORT%", String.valueOf(port)); - string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap)); - } - return string; - } - - /** - * "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap" - * - * @param value - * @param workerId - * @param stormId - * @param port - * @param memOnheap - */ - public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) { - List<String> rets = new ArrayList<>(); - if (value instanceof String) { - String string = substituteChildOptsInternal((String) value, workerId, stormId, port, memOnheap); - if (StringUtils.isNotBlank(string)){ - String[] strings = string.split("\\s+"); - rets.addAll(Arrays.asList(strings)); - } - } else if (value instanceof List) { - List<Object> objects = (List<Object>) value; - for (Object object : objects) { - String str = substituteChildOptsInternal((String) object, workerId, stormId, port, memOnheap); - if (StringUtils.isNotBlank(str)){ - rets.add(str); - } - } - } - return rets; - } -}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java deleted file mode 100644 index e62b9d8..0000000 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.daemon.supervisor.workermanager; - -import org.apache.storm.generated.WorkerResources; -import org.apache.storm.localizer.Localizer; -import org.apache.storm.utils.Utils; - -import java.util.List; -import java.util.Map; - -public interface IWorkerManager { - void prepareWorker(Map conf, Localizer localizer); - - void launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources, - Utils.ExitCodeCallable workerExitCallback); - void shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids); - - boolean cleanupWorker(String workerId); -} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java new file mode 100644 index 0000000..7887281 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java @@ -0,0 +1,432 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.localizer; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.net.JarURLConnection; +import java.net.URL; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.daemon.Shutdownable; +import org.apache.storm.daemon.supervisor.AdvancedFSOps; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * This is a wrapper around the Localizer class that provides the desired + * async interface to Slot. + */ +public class AsyncLocalizer implements ILocalizer, Shutdownable { + /** + * A future that has already completed. + */ + private static class AllDoneFuture implements Future<Void> { + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public Void get() { + return null; + } + + @Override + public Void get(long timeout, TimeUnit unit) { + return null; + } + + } + + private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizer.class); + + private final Localizer _localizer; + private final ExecutorService _execService; + private final boolean _isLocalMode; + private final Map<String, Object> _conf; + private final Map<String, LocalDownloadedResource> _basicPending; + private final Map<String, LocalDownloadedResource> _blobPending; + private final AdvancedFSOps _fsOps; + + private class DownloadBaseBlobsDistributed implements Callable<Void> { + protected final String _topologyId; + protected final File _stormRoot; + + public DownloadBaseBlobsDistributed(String topologyId) throws IOException { + _topologyId = topologyId; + _stormRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf, _topologyId)); + } + + protected void downloadBaseBlobs(File tmproot) throws Exception { + String stormJarKey = ConfigUtils.masterStormJarKey(_topologyId); + String stormCodeKey = ConfigUtils.masterStormCodeKey(_topologyId); + String stormConfKey = ConfigUtils.masterStormConfKey(_topologyId); + String jarPath = ConfigUtils.supervisorStormJarPath(tmproot.getAbsolutePath()); + String codePath = ConfigUtils.supervisorStormCodePath(tmproot.getAbsolutePath()); + String confPath = ConfigUtils.supervisorStormConfPath(tmproot.getAbsolutePath()); + _fsOps.forceMkdir(tmproot); + _fsOps.restrictDirectoryPermissions(tmproot); + ClientBlobStore blobStore = Utils.getClientBlobStoreForSupervisor(_conf); + try { + Utils.downloadResourcesAsSupervisor(stormJarKey, jarPath, blobStore); + Utils.downloadResourcesAsSupervisor(stormCodeKey, codePath, blobStore); + Utils.downloadResourcesAsSupervisor(stormConfKey, confPath, blobStore); + } finally { + blobStore.shutdown(); + } + Utils.extractDirFromJar(jarPath, ConfigUtils.RESOURCES_SUBDIR, tmproot); + } + + @Override + public Void call() throws Exception { + try { + if (_fsOps.fileExists(_stormRoot)) { + if (!_fsOps.supportsAtomicDirectoryMove()) { + LOG.warn("{} may have partially downloaded blobs, recovering", _topologyId); + _fsOps.deleteIfExists(_stormRoot); + } else { + LOG.warn("{} already downloaded blobs, skipping", _topologyId); + return null; + } + } + boolean deleteAll = true; + String tmproot = ConfigUtils.supervisorTmpDir(_conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid(); + File tr = new File(tmproot); + try { + downloadBaseBlobs(tr); + _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot); + _fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, _topologyId), _stormRoot); + deleteAll = false; + } finally { + if (deleteAll) { + LOG.warn("Failed to download basic resources for topology-id {}", _topologyId); + _fsOps.deleteIfExists(tr); + _fsOps.deleteIfExists(_stormRoot); + } + } + return null; + } catch (Exception e) { + LOG.warn("Caught Exception While Downloading (rethrowing)... ", e); + throw e; + } + } + } + + private class DownloadBaseBlobsLocal extends DownloadBaseBlobsDistributed { + + public DownloadBaseBlobsLocal(String topologyId) throws IOException { + super(topologyId); + } + + @Override + protected void downloadBaseBlobs(File tmproot) throws Exception { + _fsOps.forceMkdir(tmproot); + String stormCodeKey = ConfigUtils.masterStormCodeKey(_topologyId); + String stormConfKey = ConfigUtils.masterStormConfKey(_topologyId); + File codePath = new File(ConfigUtils.supervisorStormCodePath(tmproot.getAbsolutePath())); + File confPath = new File(ConfigUtils.supervisorStormConfPath(tmproot.getAbsolutePath())); + BlobStore blobStore = Utils.getNimbusBlobStore(_conf, null); + try { + try (OutputStream codeOutStream = _fsOps.getOutputStream(codePath)){ + blobStore.readBlobTo(stormCodeKey, codeOutStream, null); + } + try (OutputStream confOutStream = _fsOps.getOutputStream(confPath)) { + blobStore.readBlobTo(stormConfKey, confOutStream, null); + } + } finally { + blobStore.shutdown(); + } + + ClassLoader classloader = Thread.currentThread().getContextClassLoader(); + String resourcesJar = AsyncLocalizer.resourcesJar(); + URL url = classloader.getResource(ConfigUtils.RESOURCES_SUBDIR); + + String targetDir = tmproot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR; + + if (resourcesJar != null) { + LOG.info("Extracting resources from jar at {} to {}", resourcesJar, targetDir); + Utils.extractDirFromJar(resourcesJar, ConfigUtils.RESOURCES_SUBDIR, _stormRoot); + } else if (url != null) { + LOG.info("Copying resources at {} to {} ", url.toString(), targetDir); + if ("jar".equals(url.getProtocol())) { + JarURLConnection urlConnection = (JarURLConnection) url.openConnection(); + Utils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), ConfigUtils.RESOURCES_SUBDIR, _stormRoot); + } else { + _fsOps.copyDirectory(new File(url.getFile()), new File(targetDir)); + } + } + } + } + + private class DownloadBlobs implements Callable<Void> { + private final String _topologyId; + + public DownloadBlobs(String topologyId) { + _topologyId = topologyId; + } + + @Override + public Void call() throws Exception { + try { + String stormroot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId); + Map<String, Object> stormConf = ConfigUtils.readSupervisorStormConf(_conf, _topologyId); + + @SuppressWarnings("unchecked") + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME); + + List<LocalResource> localResourceList = new ArrayList<>(); + if (blobstoreMap != null) { + List<LocalResource> tmp = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); + if (tmp != null) { + localResourceList.addAll(tmp); + } + } + + StormTopology stormCode = ConfigUtils.readSupervisorTopology(_conf, _topologyId, _fsOps); + List<String> dependencies = new ArrayList<>(); + if (stormCode.is_set_dependency_jars()) { + dependencies.addAll(stormCode.get_dependency_jars()); + } + if (stormCode.is_set_dependency_artifacts()) { + dependencies.addAll(stormCode.get_dependency_artifacts()); + } + for (String dependency : dependencies) { + localResourceList.add(new LocalResource(dependency, false)); + } + + if (!localResourceList.isEmpty()) { + File userDir = _localizer.getLocalUserFileCacheDir(user); + if (!_fsOps.fileExists(userDir)) { + _fsOps.forceMkdir(userDir); + } + List<LocalizedResource> localizedResources = _localizer.getBlobs(localResourceList, user, topoName, userDir); + _fsOps.setupBlobPermissions(userDir, user); + for (LocalizedResource localizedResource : localizedResources) { + String keyName = localizedResource.getKey(); + //The sym link we are pointing to + File rsrcFilePath = new File(localizedResource.getCurrentSymlinkPath()); + + String symlinkName = null; + if (blobstoreMap != null) { + Map<String, Object> blobInfo = blobstoreMap.get(keyName); + if (blobInfo != null && blobInfo.containsKey("localname")) { + symlinkName = (String) blobInfo.get("localname"); + } else { + symlinkName = keyName; + } + } else { + // all things are from dependencies + symlinkName = keyName; + } + _fsOps.createSymlink(new File(stormroot, symlinkName), rsrcFilePath); + } + } + + return null; + } catch (Exception e) { + LOG.warn("Caught Exception While Downloading (rethrowing)... ", e); + throw e; + } + } + } + + //Visible for testing + AsyncLocalizer(Map<String, Object> conf, Localizer localizer, AdvancedFSOps ops) { + _conf = conf; + _isLocalMode = ConfigUtils.isLocalMode(conf); + _localizer = localizer; + _execService = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder() + .setNameFormat("Async Localizer") + .build()); + _basicPending = new HashMap<>(); + _blobPending = new HashMap<>(); + _fsOps = ops; + } + + public AsyncLocalizer(Map<String, Object> conf, Localizer localizer) { + this(conf, localizer, AdvancedFSOps.make(conf)); + } + + @Override + public synchronized Future<Void> requestDownloadBaseTopologyBlobs(final LocalAssignment assignment, final int port) throws IOException { + final String topologyId = assignment.get_topology_id(); + LocalDownloadedResource localResource = _basicPending.get(topologyId); + if (localResource == null) { + Callable<Void> c; + if (_isLocalMode) { + c = new DownloadBaseBlobsLocal(topologyId); + } else { + c = new DownloadBaseBlobsDistributed(topologyId); + } + localResource = new LocalDownloadedResource(_execService.submit(c)); + _basicPending.put(topologyId, localResource); + } + Future<Void> ret = localResource.reserve(port, assignment); + LOG.debug("Reserved basic {} {}", topologyId, localResource); + return ret; + } + + private static String resourcesJar() throws IOException { + String path = Utils.currentClasspath(); + if (path == null) { + return null; + } + + for (String jpath : path.split(File.pathSeparator)) { + if (jpath.endsWith(".jar")) { + if (Utils.zipDoesContainDir(jpath, ConfigUtils.RESOURCES_SUBDIR)) { + return jpath; + } + } + } + return null; + } + + @Override + public synchronized void recoverRunningTopology(LocalAssignment assignment, int port) { + final String topologyId = assignment.get_topology_id(); + LocalDownloadedResource localResource = _basicPending.get(topologyId); + if (localResource == null) { + localResource = new LocalDownloadedResource(new AllDoneFuture()); + _basicPending.put(topologyId, localResource); + } + localResource.reserve(port, assignment); + LOG.debug("Recovered basic {} {}", topologyId, localResource); + + localResource = _blobPending.get(topologyId); + if (localResource == null) { + localResource = new LocalDownloadedResource(new AllDoneFuture()); + _blobPending.put(topologyId, localResource); + } + localResource.reserve(port, assignment); + LOG.debug("Recovered blobs {} {}", topologyId, localResource); + } + + @Override + public synchronized Future<Void> requestDownloadTopologyBlobs(LocalAssignment assignment, int port) { + final String topologyId = assignment.get_topology_id(); + LocalDownloadedResource localResource = _blobPending.get(topologyId); + if (localResource == null) { + Callable<Void> c = new DownloadBlobs(topologyId); + localResource = new LocalDownloadedResource(_execService.submit(c)); + _blobPending.put(topologyId, localResource); + } + Future<Void> ret = localResource.reserve(port, assignment); + LOG.debug("Reserved blobs {} {}", topologyId, localResource); + return ret; + } + + @Override + public synchronized void releaseSlotFor(LocalAssignment assignment, int port) throws IOException { + final String topologyId = assignment.get_topology_id(); + LOG.debug("Releasing slot for {} {}", topologyId, port); + LocalDownloadedResource localResource = _blobPending.get(topologyId); + if (localResource == null || !localResource.release(port, assignment)) { + LOG.warn("Released blob reference {} {} for something that we didn't have {}", topologyId, port, localResource); + } else if (localResource.isDone()){ + LOG.info("Released blob reference {} {} Cleaning up BLOB references...", topologyId, port); + _blobPending.remove(topologyId); + Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(_conf, topologyId); + @SuppressWarnings("unchecked") + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + if (blobstoreMap != null) { + String user = (String) topoConf.get(Config.TOPOLOGY_SUBMITTER_USER); + String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME); + + for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) { + String key = entry.getKey(); + Map<String, Object> blobInfo = entry.getValue(); + try { + _localizer.removeBlobReference(key, user, topoName, SupervisorUtils.shouldUncompressBlob(blobInfo)); + } catch (Exception e) { + throw new IOException(e); + } + } + } + } else { + LOG.debug("Released blob reference {} {} still waiting on {}", topologyId, port, localResource); + } + + localResource = _basicPending.get(topologyId); + if (localResource == null || !localResource.release(port, assignment)) { + LOG.warn("Released basic reference {} {} for something that we didn't have {}", topologyId, port, localResource); + } else if (localResource.isDone()){ + LOG.info("Released blob reference {} {} Cleaning up basic files...", topologyId, port); + _basicPending.remove(topologyId); + String path = ConfigUtils.supervisorStormDistRoot(_conf, topologyId); + _fsOps.deleteIfExists(new File(path), null, "rmr "+topologyId); + } else { + LOG.debug("Released basic reference {} {} still waiting on {}", topologyId, port, localResource); + } + } + + @Override + public synchronized void cleanupUnusedTopologies() throws IOException { + File distRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf)); + LOG.info("Cleaning up unused topologies in {}", distRoot); + File[] children = distRoot.listFiles(); + if (children != null) { + for (File topoDir : children) { + String topoId = URLDecoder.decode(topoDir.getName(), "UTF-8"); + if (_basicPending.get(topoId) == null && _blobPending.get(topoId) == null) { + _fsOps.deleteIfExists(topoDir, null, "rmr " + topoId); + } + } + } + } + + @Override + public void shutdown() { + _execService.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/localizer/ILocalizer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/localizer/ILocalizer.java b/storm-core/src/jvm/org/apache/storm/localizer/ILocalizer.java new file mode 100644 index 0000000..7105095 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/localizer/ILocalizer.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.localizer; + +import java.io.IOException; +import java.util.concurrent.Future; + +import org.apache.storm.generated.LocalAssignment; + +/** + * Download blobs from the blob store and keep them up to date. + */ +public interface ILocalizer { + + /** + * Recover a running topology by incrementing references for what it has already downloaded. + * @param assignment the assignment the resources are for + * @param port the port the topology is running in. + */ + void recoverRunningTopology(LocalAssignment assignemnt, int port); + + /** + * Download storm.jar, storm.conf, and storm.ser for this topology if not done so already, + * and inc a reference count on them. + * @param assignment the assignment the resources are for + * @param port the port the topology is running on + * @return a future to let you know when they are done. + * @throws IOException on error + */ + Future<Void> requestDownloadBaseTopologyBlobs(LocalAssignment assignment, int port) throws IOException; + + /** + * Download the blobs for this topology (reading in list in from the config) + * and inc reference count for these blobs. + * PRECONDITION: requestDownloadBaseTopologyBlobs has completed downloading. + * @param assignment the assignment the resources are for + * @param port the port the topology is running on + * @return a future to let you know when they are done. + */ + Future<Void> requestDownloadTopologyBlobs(LocalAssignment assignment, int port); + + /** + * dec reference count on all blobs associated with this topology. + * @param assignment the assignment the resources are for + * @param port the port the topology is running on + * @throws IOException on any error + */ + void releaseSlotFor(LocalAssignment assignment, int port) throws IOException; + + /** + * Clean up any topologies that are not in use right now. + * @throws IOException on any error. + */ + void cleanupUnusedTopologies() throws IOException; +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/localizer/LocalDownloadedResource.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/localizer/LocalDownloadedResource.java b/storm-core/src/jvm/org/apache/storm/localizer/LocalDownloadedResource.java new file mode 100644 index 0000000..9e91a93 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/localizer/LocalDownloadedResource.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.localizer; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.storm.generated.LocalAssignment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LocalDownloadedResource { + private static final Logger LOG = LoggerFactory.getLogger(LocalDownloadedResource.class); + private static class NoCancelFuture<T> implements Future<T> { + private final Future<T> _wrapped; + + public NoCancelFuture(Future<T> wrapped) { + _wrapped = wrapped; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + //cancel not currently supported + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return _wrapped.isDone(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return _wrapped.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return _wrapped.get(timeout, unit); + } + } + private static class PortNAssignment { + private final int _port; + private final LocalAssignment _assignment; + + public PortNAssignment(int port, LocalAssignment assignment) { + _port = port; + _assignment = assignment; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof PortNAssignment)) { + return false; + } + PortNAssignment pna = (PortNAssignment) other; + return pna._port == _port && _assignment.equals(pna._assignment); + } + + @Override + public int hashCode() { + return (17 * _port) + _assignment.hashCode(); + } + + @Override + public String toString() { + return "{"+ _port + " " + _assignment +"}"; + } + } + private final Future<Void> _pending; + private final Set<PortNAssignment> _references; + private boolean _isDone; + + + public LocalDownloadedResource(Future<Void> pending) { + _pending = new NoCancelFuture<>(pending); + _references = new HashSet<>(); + _isDone = false; + } + + /** + * Reserve the resources + * @param port the port this is for + * @param la the assignment this is for + * @return a future that can be used to track it being downloaded. + */ + public synchronized Future<Void> reserve(int port, LocalAssignment la) { + PortNAssignment pna = new PortNAssignment(port, la); + if (!_references.add(pna)) { + LOG.warn("Resources {} already reserved {} for this topology", pna, _references); + } + return _pending; + } + + /** + * Release a port from the reference count, and update isDone if all is done. + * @param port the port to release + * @param la the assignment to release + * @return true if the port was being counted else false + */ + public synchronized boolean release(int port, LocalAssignment la) { + PortNAssignment pna = new PortNAssignment(port, la); + boolean ret = _references.remove(pna); + if (ret && _references.isEmpty()) { + _isDone = true; + } + return ret; + } + + /** + * Is this has been cleaned up completely. + * @return true if it is done else false + */ + public synchronized boolean isDone() { + return _isDone; + } + + @Override + public String toString() { + return _references.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java b/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java index 380e777..9f42b47 100644 --- a/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java +++ b/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java @@ -93,7 +93,7 @@ public class LocalizedResourceRetentionSet { i.remove(); } else { // since it failed to delete add it back so it gets retried - set.addResource(resource.getKey(), resource, resource.isUncompressed()); + set.add(resource.getKey(), resource, resource.isUncompressed()); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java b/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java index b5f00c3..62d5b2f 100644 --- a/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java +++ b/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java @@ -57,7 +57,7 @@ public class LocalizedResourceSet { return _localrsrcFiles.get(name); } - public void updateResource(String resourceName, LocalizedResource updatedResource, + public void putIfAbsent(String resourceName, LocalizedResource updatedResource, boolean uncompress) { if (uncompress) { _localrsrcArchives.putIfAbsent(resourceName, updatedResource); @@ -66,7 +66,7 @@ public class LocalizedResourceSet { } } - public void addResource(String resourceName, LocalizedResource newResource, boolean uncompress) { + public void add(String resourceName, LocalizedResource newResource, boolean uncompress) { if (uncompress) { _localrsrcArchives.put(resourceName, newResource); } else { @@ -76,9 +76,9 @@ public class LocalizedResourceSet { public boolean exists(String resourceName, boolean uncompress) { if (uncompress) { - return (_localrsrcArchives.get(resourceName) != null); + return _localrsrcArchives.containsKey(resourceName); } - return (_localrsrcFiles.get(resourceName) != null); + return _localrsrcFiles.containsKey(resourceName); } public boolean remove(LocalizedResource resource) { http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java b/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java index b91cecb..0135397 100644 --- a/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java +++ b/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java @@ -63,20 +63,6 @@ import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; */ public class Localizer { public static final Logger LOG = LoggerFactory.getLogger(Localizer.class); - - private Map _conf; - private int _threadPoolSize; - // thread pool for initial download - private ExecutorService _execService; - // thread pool for updates - private ExecutorService _updateExecService; - private int _blobDownloadRetries; - - // track resources - user to resourceSet - private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new - ConcurrentHashMap<String, LocalizedResourceSet>(); - - private String _localBaseDir; public static final String USERCACHE = "usercache"; public static final String FILECACHE = "filecache"; @@ -85,13 +71,29 @@ public class Localizer { public static final String ARCHIVESDIR = "archives"; private static final String TO_UNCOMPRESS = "_tmp_"; + + + + private final Map<String, Object> _conf; + private final int _threadPoolSize; + // thread pool for initial download + private final ExecutorService _execService; + // thread pool for updates + private final ExecutorService _updateExecService; + private final int _blobDownloadRetries; + + // track resources - user to resourceSet + private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new + ConcurrentHashMap<String, LocalizedResourceSet>(); + + private final String _localBaseDir; // cleanup private long _cacheTargetSize; private long _cacheCleanupPeriod; private ScheduledExecutorService _cacheCleanupService; - public Localizer(Map conf, String baseDir) { + public Localizer(Map<String, Object> conf, String baseDir) { _conf = conf; _localBaseDir = baseDir; // default cache size 10GB, converted to Bytes @@ -189,7 +191,7 @@ public class Localizer { LOG.debug("local file is: {} path is: {}", rsrc.getPath(), path); LocalizedResource lrsrc = new LocalizedResource(new File(path).getName(), path, uncompress); - lrsrcSet.addResource(lrsrc.getKey(), lrsrc, uncompress); + lrsrcSet.add(lrsrc.getKey(), lrsrc, uncompress); } } } @@ -369,7 +371,7 @@ public class Localizer { if (newlrsrcSet == null) { newlrsrcSet = newSet; } - newlrsrcSet.updateResource(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed()); + newlrsrcSet.putIfAbsent(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed()); results.add(lrsrc); } catch (ExecutionException e) { @@ -451,7 +453,7 @@ public class Localizer { for (Future<LocalizedResource> futureRsrc: futures) { LocalizedResource lrsrc = futureRsrc.get(); lrsrc.addReference(topo); - lrsrcSet.addResource(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed()); + lrsrcSet.add(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed()); results.add(lrsrc); } } catch (ExecutionException e) { http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java b/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java index 7a54d96..4a76012 100644 --- a/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java @@ -26,6 +26,7 @@ import org.apache.storm.multilang.BoltMsg; import org.apache.storm.multilang.ShellMsg; import org.apache.storm.topology.ReportedFailedException; import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ShellBoltMessageQueue; import org.apache.storm.utils.ShellProcess; import clojure.lang.RT; @@ -90,6 +91,7 @@ public class ShellBolt implements IBolt { private ScheduledExecutorService heartBeatExecutorService; private AtomicLong lastHeartbeatTimestamp = new AtomicLong(); private AtomicBoolean sendHeartbeatFlag = new AtomicBoolean(false); + private boolean _isLocalMode = false; public ShellBolt(ShellComponent component) { this(component.get_execution_command(), component.get_script()); @@ -106,6 +108,9 @@ public class ShellBolt implements IBolt { public void prepare(Map stormConf, TopologyContext context, final OutputCollector collector) { + if (ConfigUtils.isLocalMode(stormConf)) { + _isLocalMode = true; + } Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING); if (maxPending != null) { this._pendingWrites = new ShellBoltMessageQueue(((Number)maxPending).intValue()); @@ -298,7 +303,7 @@ public class ShellBolt implements IBolt { processInfo); LOG.error(message, exception); _collector.reportError(exception); - if (_running || (exception instanceof Error)) { //don't exit if not running, unless it is an Error + if (!_isLocalMode && (_running || (exception instanceof Error))) { //don't exit if not running, unless it is an Error System.exit(11); } } http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java b/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java index 2f06102..02872d0 100644 --- a/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java +++ b/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java @@ -17,17 +17,18 @@ */ package org.apache.storm.testing; -import org.apache.storm.topology.OutputFieldsDeclarer; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.UUID; + import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.InprocMessaging; -import java.util.HashMap; -import java.util.List; -import java.util.UUID; public class FeederSpout extends BaseRichSpout { @@ -51,7 +52,15 @@ public class FeederSpout extends BaseRichSpout { public void feed(List<Object> tuple, Object msgId) { InprocMessaging.sendMessage(_id, new Values(tuple, msgId)); - } + } + + public void feedNoWait(List<Object> tuple, Object msgId) { + InprocMessaging.sendMessageNoWait(_id, new Values(tuple, msgId)); + } + + public void waitForReader() { + InprocMessaging.waitForReader(_id); + } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; @@ -63,17 +72,11 @@ public class FeederSpout extends BaseRichSpout { public void nextTuple() { List<Object> toEmit = (List<Object>) InprocMessaging.pollMessage(_id); - if(toEmit!=null) { + if (toEmit!=null) { List<Object> tuple = (List<Object>) toEmit.get(0); Object msgId = toEmit.get(1); _collector.emit(tuple, msgId); - } else { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } } } @@ -97,4 +100,4 @@ public class FeederSpout extends BaseRichSpout { public Map<String, Object> getComponentConfiguration() { return new HashMap<String, Object>(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/trident/util/TridentUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/util/TridentUtils.java b/storm-core/src/jvm/org/apache/storm/trident/util/TridentUtils.java index 8272b3c..f5d317e 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/util/TridentUtils.java +++ b/storm-core/src/jvm/org/apache/storm/trident/util/TridentUtils.java @@ -111,7 +111,7 @@ public class TridentUtils { return Utils.thriftSerialize(t); } - public static <T> T thriftDeserialize(Class c, byte[] b) { + public static <T> T thriftDeserialize(Class<T> c, byte[] b) { return Utils.thriftDeserialize(c,b); } } http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java index a244f6a..e2be8a7 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java @@ -20,20 +20,16 @@ package org.apache.storm.utils; import org.apache.commons.io.FileUtils; import org.apache.storm.Config; +import org.apache.storm.daemon.supervisor.AdvancedFSOps; import org.apache.storm.generated.StormTopology; import org.apache.storm.validation.ConfigValidation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; -import java.io.FileInputStream; import java.io.FileWriter; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; import java.lang.reflect.Field; import java.net.URLEncoder; import java.util.ArrayList; @@ -116,7 +112,7 @@ public class ConfigUtils { return mode; } - public static boolean isLocalMode(Map conf) { + public static boolean isLocalMode(Map<String, Object> conf) { String mode = (String) conf.get(Config.STORM_CLUSTER_MODE); if (mode != null) { if ("local".equals(mode)) { @@ -125,8 +121,9 @@ public class ConfigUtils { if ("distributed".equals(mode)) { return false; } + throw new IllegalArgumentException("Illegal cluster mode in conf: " + mode); } - throw new IllegalArgumentException("Illegal cluster mode in conf: " + mode); + return true; } public static int samplingRate(Map conf) { @@ -161,12 +158,12 @@ public class ConfigUtils { } // we use this "weird" wrapper pattern temporarily for mocking in clojure test - public static Map readStormConfig() { + public static Map<String, Object> readStormConfig() { return _instance.readStormConfigImpl(); } - public Map readStormConfigImpl() { - Map conf = Utils.readStormConfig(); + public Map<String, Object> readStormConfigImpl() { + Map<String, Object> conf = Utils.readStormConfig(); ConfigValidation.validateFields(conf); return conf; } @@ -246,14 +243,14 @@ public class ConfigUtils { return ret + FILE_SEPARATOR + "stormdist"; } - public static Map readSupervisorStormConfGivenPath(Map conf, String stormConfPath) throws IOException { - Map ret = new HashMap(conf); + public static Map<String, Object> readSupervisorStormConfGivenPath(Map<String, Object> conf, String stormConfPath) throws IOException { + Map<String, Object> ret = new HashMap<>(conf); ret.putAll(Utils.fromCompressedJsonConf(FileUtils.readFileToByteArray(new File(stormConfPath)))); return ret; } - public static StormTopology readSupervisorStormCodeGivenPath(String stormCodePath) throws IOException { - return Utils.deserialize(FileUtils.readFileToByteArray(new File(stormCodePath)), StormTopology.class); + public static StormTopology readSupervisorStormCodeGivenPath(String stormCodePath, AdvancedFSOps ops) throws IOException { + return Utils.deserialize(ops.slurp(new File(stormCodePath)), StormTopology.class); } public static String masterStormJarPath(String stormRoot) { @@ -353,25 +350,24 @@ public class ConfigUtils { } // we use this "weird" wrapper pattern temporarily for mocking in clojure test - public static Map readSupervisorStormConf(Map conf, String stormId) throws IOException { + public static Map<String, Object> readSupervisorStormConf(Map<String, Object> conf, String stormId) throws IOException { return _instance.readSupervisorStormConfImpl(conf, stormId); } - public Map readSupervisorStormConfImpl(Map conf, String stormId) throws IOException { + public Map<String, Object> readSupervisorStormConfImpl(Map<String, Object> conf, String stormId) throws IOException { String stormRoot = supervisorStormDistRoot(conf, stormId); String confPath = supervisorStormConfPath(stormRoot); return readSupervisorStormConfGivenPath(conf, confPath); } - // we use this "weird" wrapper pattern temporarily for mocking in clojure test - public static StormTopology readSupervisorTopology(Map conf, String stormId) throws IOException { - return _instance.readSupervisorTopologyImpl(conf, stormId); + public static StormTopology readSupervisorTopology(Map conf, String stormId, AdvancedFSOps ops) throws IOException { + return _instance.readSupervisorTopologyImpl(conf, stormId, ops); } - - public StormTopology readSupervisorTopologyImpl(Map conf, String stormId) throws IOException { + + public StormTopology readSupervisorTopologyImpl(Map conf, String stormId, AdvancedFSOps ops) throws IOException { String stormRoot = supervisorStormDistRoot(conf, stormId); String topologyPath = supervisorStormCodePath(stormRoot); - return readSupervisorStormCodeGivenPath(topologyPath); + return readSupervisorStormCodeGivenPath(topologyPath, ops); } public static String workerUserRoot(Map conf) { @@ -382,27 +378,6 @@ public class ConfigUtils { return (workerUserRoot(conf) + FILE_SEPARATOR + workerId); } - public static String getWorkerUser(Map conf, String workerId) { - LOG.info("GET worker-user for {}", workerId); - File file = new File(workerUserFile(conf, workerId)); - - try (InputStream in = new FileInputStream(file); - Reader reader = new InputStreamReader(in); - BufferedReader br = new BufferedReader(reader);) { - StringBuilder sb = new StringBuilder(); - int r; - while ((r = br.read()) != -1) { - char ch = (char) r; - sb.append(ch); - } - String ret = sb.toString().trim(); - return ret; - } catch (IOException e) { - LOG.error("Failed to get worker user for {}.", workerId); - return null; - } - } - public static String getIdFromBlobKey(String key) { if (key == null) return null; final String STORM_JAR_SUFFIX = "-stormjar.jar"; @@ -421,27 +396,6 @@ public class ConfigUtils { } // we use this "weird" wrapper pattern temporarily for mocking in clojure test - public static void setWorkerUserWSE(Map conf, String workerId, String user) throws IOException { - _instance.setWorkerUserWSEImpl(conf, workerId, user); - } - - public void setWorkerUserWSEImpl(Map conf, String workerId, String user) throws IOException { - LOG.info("SET worker-user {} {}", workerId, user); - File file = new File(workerUserFile(conf, workerId)); - file.getParentFile().mkdirs(); - - try (FileWriter fw = new FileWriter(file); - BufferedWriter writer = new BufferedWriter(fw);) { - writer.write(user); - } - } - - public static void removeWorkerUserWSE(Map conf, String workerId) { - LOG.info("REMOVE worker-user {}", workerId); - new File(workerUserFile(conf, workerId)).delete(); - } - - // we use this "weird" wrapper pattern temporarily for mocking in clojure test public static String workerArtifactsRoot(Map conf) { return _instance.workerArtifactsRootImpl(conf); } @@ -512,6 +466,10 @@ public class ConfigUtils { public static String workerPidPath(Map conf, String id, String pid) { return (workerPidsRoot(conf, id) + FILE_SEPARATOR + pid); } + + public static String workerPidPath(Map<String, Object> conf, String id, long pid) { + return workerPidPath(conf, id, String.valueOf(pid)); + } public static String workerHeartbeatsRoot(Map conf, String id) { return (workerRoot(conf, id) + FILE_SEPARATOR + "heartbeats"); http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/utils/InprocMessaging.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/InprocMessaging.java b/storm-core/src/jvm/org/apache/storm/utils/InprocMessaging.java index 51250f4..8583e0d 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/InprocMessaging.java +++ b/storm-core/src/jvm/org/apache/storm/utils/InprocMessaging.java @@ -19,41 +19,82 @@ package org.apache.storm.utils; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class InprocMessaging { - private static Map<Integer, LinkedBlockingQueue<Object>> _queues = new HashMap<>(); - private static final Object _lock = new Object(); + private static Map<Integer, LinkedBlockingQueue<Object>> _queues = new HashMap<Integer, LinkedBlockingQueue<Object>>(); + private static ConcurrentMap<Integer, AtomicBoolean> _hasReader = new ConcurrentHashMap<>(); private static int port = 1; + private static final Logger LOG = LoggerFactory.getLogger(InprocMessaging.class); - public static int acquireNewPort() { - int ret; - synchronized(_lock) { - ret = port; - port++; - } + public synchronized static int acquireNewPort() { + int ret = port; + port++; return ret; } public static void sendMessage(int port, Object msg) { + waitForReader(port); + getQueue(port).add(msg); + } + + public static void sendMessageNoWait(int port, Object msg) { getQueue(port).add(msg); } public static Object takeMessage(int port) throws InterruptedException { + readerArrived(port); return getQueue(port).take(); } public static Object pollMessage(int port) { + readerArrived(port); return getQueue(port).poll(); - } + } + + private static AtomicBoolean getHasReader(int port) { + AtomicBoolean ab = _hasReader.get(port); + if (ab == null) { + _hasReader.putIfAbsent(port, new AtomicBoolean(false)); + ab = _hasReader.get(port); + } + return ab; + } - private static LinkedBlockingQueue<Object> getQueue(int port) { - synchronized(_lock) { - if(!_queues.containsKey(port)) { - _queues.put(port, new LinkedBlockingQueue<>()); + public static void waitForReader(int port) { + AtomicBoolean ab = getHasReader(port); + long start = Time.currentTimeMillis(); + while (!ab.get()) { + if (Time.isSimulating()) { + Time.advanceTime(100); + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + //Ignored } - return _queues.get(port); + if (Time.currentTimeMillis() - start > 20000) { + LOG.error("DONE WAITING FOR READER AFTER {} ms", Time.currentTimeMillis() - start); + break; + } + } + } + + private static void readerArrived(int port) { + getHasReader(port).set(true); + } + + private synchronized static LinkedBlockingQueue<Object> getQueue(int port) { + if(!_queues.containsKey(port)) { + _queues.put(port, new LinkedBlockingQueue<Object>()); } + return _queues.get(port); } } http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/utils/Time.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/Time.java b/storm-core/src/jvm/org/apache/storm/utils/Time.java index 1b36070..9b656ee 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Time.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Time.java @@ -29,6 +29,7 @@ public class Time { public static final Logger LOG = LoggerFactory.getLogger(Time.class); private static AtomicBoolean simulating = new AtomicBoolean(false); + private static AtomicLong autoAdvanceOnSleep = new AtomicLong(0); //TODO: should probably use weak references here or something private static volatile Map<Thread, AtomicLong> threadSleepTimes; private static final Object sleepTimesLock = new Object(); @@ -43,10 +44,18 @@ public class Time { } } + public static void startSimulatingAutoAdvanceOnSleep(long ms) { + synchronized(sleepTimesLock) { + startSimulating(); + autoAdvanceOnSleep.set(ms); + } + } + public static void stopSimulating() { synchronized(sleepTimesLock) { - simulating.set(false); - threadSleepTimes = null; + simulating.set(false); + autoAdvanceOnSleep.set(0); + threadSleepTimes = null; } } @@ -71,6 +80,10 @@ public class Time { throw new InterruptedException(); } } + long autoAdvance = autoAdvanceOnSleep.get(); + if (autoAdvance > 0) { + advanceTime(autoAdvance); + } Thread.sleep(10); } } finally { @@ -126,9 +139,10 @@ public class Time { } public static void advanceTime(long ms) { - if(!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode"); - if(ms < 0) throw new IllegalArgumentException("advanceTime only accepts positive time as an argument"); - simulatedCurrTimeMs.set(simulatedCurrTimeMs.get() + ms); + if (!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode"); + if (ms < 0) throw new IllegalArgumentException("advanceTime only accepts positive time as an argument"); + long newTime = simulatedCurrTimeMs.addAndGet(ms); + LOG.debug("Advanced simulated time to {}", newTime); } public static boolean isThreadWaiting(Thread t) { @@ -138,5 +152,5 @@ public class Time { time = threadSleepTimes.get(t); } return !t.isAlive() || time!=null && currentTimeMillis() < time.longValue(); - } + } }