Repository: storm Updated Branches: refs/heads/master 0946048eb -> c2cb25b95
STORM-2038: Disable symlinks with a config option Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1c906375 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1c906375 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1c906375 Branch: refs/heads/master Commit: 1c9063756fe9e268676234757d54994aacfc0b7b Parents: 0946048 Author: Robert (Bobby) Evans <ev...@yahoo-inc.com> Authored: Mon Feb 27 15:06:43 2017 -0600 Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com> Committed: Thu Mar 16 12:07:05 2017 -0500 ---------------------------------------------------------------------- conf/defaults.yaml | 1 + docs/windows-users-guide.md | 9 ++++- storm-core/src/jvm/org/apache/storm/Config.java | 9 +++++ .../org/apache/storm/daemon/nimbus/Nimbus.java | 9 +++++ .../storm/daemon/supervisor/AdvancedFSOps.java | 14 +++++-- .../storm/daemon/supervisor/Container.java | 39 ++++++++++++-------- .../apache/storm/localizer/AsyncLocalizer.java | 30 ++++++++------- .../org/apache/storm/localizer/Localizer.java | 7 +++- 8 files changed, 83 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/1c906375/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index facd0bd..9d8c2d5 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -58,6 +58,7 @@ storm.codedistributor.class: "org.apache.storm.codedistributor.LocalFileSystemCo storm.workers.artifacts.dir: "workers-artifacts" storm.health.check.dir: "healthchecks" storm.health.check.timeout.ms: 5000 +storm.disable.symlinks: false ### nimbus.* configs are for the master nimbus.seeds : ["localhost"] http://git-wip-us.apache.org/repos/asf/storm/blob/1c906375/docs/windows-users-guide.md ---------------------------------------------------------------------- diff --git a/docs/windows-users-guide.md b/docs/windows-users-guide.md index 4ebd929..9c9a850 100644 --- a/docs/windows-users-guide.md +++ b/docs/windows-users-guide.md @@ -23,5 +23,10 @@ One tricky point is, `administrator` group already has this privilege, but it's So if your account belongs to `administrator` group (and you don't want to change it), you may want to open `command prompt` with `run as administrator` and execute processes within that console. If you don't want to execute Storm processes directly (not on command prompt), please execute processes with `runas /user:administrator` to run as administrator account. -Starting with Windows 10 Creators Update, it will be possible to activate a Developer Mode that supports creating symbolic links without `run as administrator` -[Symlinks in Windows 10!](https://blogs.windows.com/buildingapps/2016/12/02/symlinks-windows-10/) +Starting with Windows 10 Creators Update, it will be possible to activate a Developer Mode that supports creating symbolic links without `run as administrator` +[Symlinks in Windows 10!](https://blogs.windows.com/buildingapps/2016/12/02/symlinks-windows-10/) + +Alternatively you can disable usage of symbolic links by setting the config `storm.disable.symlinks` to `true` +on Nimbus and all of the Supervisor nodes. This will also disable features that require symlinks. Currently this is only downloading +dependent blobs, but may change in the future. Some topologies may rely on symbolic links to resources in the current working directory of the worker that are +created as a convienence, so it is not a 100% backwards compatible change. http://git-wip-us.apache.org/repos/asf/storm/blob/1c906375/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 92f81dc..0379a18 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -1412,6 +1412,15 @@ public class Config extends HashMap<String, Object> { public static final String SUPERVISOR_CPU_CAPACITY = "supervisor.cpu.capacity"; /** + * On some systems (windows for example) symlinks require special privileges that not everyone wants to + * grant a headless user. You can completely disable the use of symlinks by setting this config to true, but + * by doing so you may also lose some features from storm. For example the blobstore feature + * does not currently work without symlinks enabled. + */ + @isBoolean + public static final String DISABLE_SYMLINKS = "storm.disable.symlinks"; + + /** * The jvm opts provided to workers launched by this supervisor. * All "%ID%", "%WORKER-ID%", "%TOPOLOGY-ID%", * "%WORKER-PORT%" and "%HEAP-MEM%" substrings are replaced with: http://git-wip-us.apache.org/repos/asf/storm/blob/1c906375/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java index a8fab86..235ca3d 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java @@ -2477,6 +2477,15 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { throw new InvalidTopologyException(ex.getMessage()); } validator.validate(topoName, topoConf, topology); + if ((boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) { + @SuppressWarnings("unchecked") + Map<String, Object> blobMap = (Map<String, Object>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + if (blobMap != null && !blobMap.isEmpty()) { + throw new InvalidTopologyException("symlinks are disabled so blobs are not supported but " + + Config.TOPOLOGY_BLOBSTORE_MAP + " = " + blobMap); + } + } + Utils.validateTopologyBlobStoreMap(topoConf, Sets.newHashSet(blobStore.listKeys())); long uniqueNum = submittedCount.incrementAndGet(); String topoId = topoName + "-" + uniqueNum + "-" + Time.currentTimeSecs(); http://git-wip-us.apache.org/repos/asf/storm/blob/1c906375/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java index 4b96197..d084720 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java @@ -56,13 +56,14 @@ public class AdvancedFSOps { if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { return new AdvancedRunAsUserFSOps(conf); } - return new AdvancedFSOps(); + return new AdvancedFSOps(conf); } private static class AdvancedRunAsUserFSOps extends AdvancedFSOps { private final Map<String, Object> _conf; public AdvancedRunAsUserFSOps(Map<String, Object> conf) { + super(conf); if (Utils.isOnWindows()) { throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet"); } @@ -116,6 +117,7 @@ public class AdvancedFSOps { private static class AdvancedWindowsFSOps extends AdvancedFSOps { public AdvancedWindowsFSOps(Map<String, Object> conf) { + super(conf); if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { throw new RuntimeException("ERROR: Windows doesn't support running workers as different users yet"); } @@ -140,10 +142,11 @@ public class AdvancedFSOps { return false; } } + + protected final boolean _symlinksDisabled; - - protected AdvancedFSOps() { - //NOOP, but restricted permissions + protected AdvancedFSOps(Map<String, Object> conf) { + _symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false); } /** @@ -341,6 +344,9 @@ public class AdvancedFSOps { * @throws IOException on any error. */ public void createSymlink(File link, File target) throws IOException { + if (_symlinksDisabled) { + throw new IOException("Symlinks have been disabled, this hsould not be called"); + } Path plink = link.toPath().toAbsolutePath(); Path ptarget = target.toPath().toAbsolutePath(); LOG.debug("Creating symlink [{}] to [{}]", plink, ptarget); http://git-wip-us.apache.org/repos/asf/storm/blob/1c906375/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java index 9f41682..e63078c 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java @@ -87,7 +87,8 @@ public abstract class Container implements Killable { protected final AdvancedFSOps _ops; protected final ResourceIsolationInterface _resourceIsolationManager; protected ContainerType _type; - + protected final boolean _symlinksDisabled; + /** * Create a new Container. * @param type the type of container being made. @@ -109,6 +110,8 @@ public abstract class Container implements Killable { assert(conf != null); assert(supervisorId != null); + _symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false); + if (ops == null) { ops = AdvancedFSOps.make(conf); } @@ -380,11 +383,13 @@ public abstract class Container implements Killable { */ protected void createArtifactsLink() throws IOException { _type.assertFull(); - File workerDir = new File(ConfigUtils.workerRoot(_conf, _workerId)); - File topoDir = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port)); - if (_ops.fileExists(workerDir)) { - LOG.debug("Creating symlinks for worker-id: {} topology-id: {} to its port artifacts directory", _workerId, _topologyId); - _ops.createSymlink(new File(workerDir, "artifacts"), topoDir); + if (!_symlinksDisabled) { + File workerDir = new File(ConfigUtils.workerRoot(_conf, _workerId)); + File topoDir = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port)); + if (_ops.fileExists(workerDir)) { + LOG.debug("Creating symlinks for worker-id: {} topology-id: {} to its port artifacts directory", _workerId, _topologyId); + _ops.createSymlink(new File(workerDir, "artifacts"), topoDir); + } } } @@ -421,15 +426,19 @@ public abstract class Container implements Killable { } resourceFileNames.addAll(blobFileNames); - LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", _workerId, _topologyId, resourceFileNames.size(), resourceFileNames); - if(targetResourcesDir.exists()) { - _ops.createSymlink(new File(workerRoot, ConfigUtils.RESOURCES_SUBDIR), targetResourcesDir ); - } else { - LOG.info("Topology jar for worker-id: {} storm-id: {} does not contain re sources directory {}." , _workerId, _topologyId, targetResourcesDir.toString() ); - } - for (String fileName : blobFileNames) { - _ops.createSymlink(new File(workerRoot, fileName), - new File(stormRoot, fileName)); + if (!_symlinksDisabled) { + LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", _workerId, _topologyId, resourceFileNames.size(), resourceFileNames); + if (targetResourcesDir.exists()) { + _ops.createSymlink(new File(workerRoot, ConfigUtils.RESOURCES_SUBDIR), targetResourcesDir ); + } else { + LOG.info("Topology jar for worker-id: {} storm-id: {} does not contain re sources directory {}." , _workerId, _topologyId, targetResourcesDir.toString() ); + } + for (String fileName : blobFileNames) { + _ops.createSymlink(new File(workerRoot, fileName), + new File(stormRoot, fileName)); + } + } else if (blobFileNames.size() > 0) { + LOG.warn("Symlinks are disabled, no symlinks created for blobs {}", blobFileNames); } } http://git-wip-us.apache.org/repos/asf/storm/blob/1c906375/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java index d3e3925..1fb504f 100644 --- a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java +++ b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java @@ -94,6 +94,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { private final Map<String, LocalDownloadedResource> _basicPending; private final Map<String, LocalDownloadedResource> _blobPending; private final AdvancedFSOps _fsOps; + private final boolean _symlinksDisabled; private class DownloadBaseBlobsDistributed implements Callable<Void> { protected final String _topologyId; @@ -250,24 +251,26 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { } List<LocalizedResource> localizedResources = _localizer.getBlobs(localResourceList, user, topoName, userDir); _fsOps.setupBlobPermissions(userDir, user); - for (LocalizedResource localizedResource : localizedResources) { - String keyName = localizedResource.getKey(); - //The sym link we are pointing to - File rsrcFilePath = new File(localizedResource.getCurrentSymlinkPath()); + if (!_symlinksDisabled) { + for (LocalizedResource localizedResource : localizedResources) { + String keyName = localizedResource.getKey(); + //The sym link we are pointing to + File rsrcFilePath = new File(localizedResource.getCurrentSymlinkPath()); - String symlinkName = null; - if (blobstoreMap != null) { - Map<String, Object> blobInfo = blobstoreMap.get(keyName); - if (blobInfo != null && blobInfo.containsKey("localname")) { - symlinkName = (String) blobInfo.get("localname"); + String symlinkName = null; + if (blobstoreMap != null) { + Map<String, Object> blobInfo = blobstoreMap.get(keyName); + if (blobInfo != null && blobInfo.containsKey("localname")) { + symlinkName = (String) blobInfo.get("localname"); + } else { + symlinkName = keyName; + } } else { + // all things are from dependencies symlinkName = keyName; } - } else { - // all things are from dependencies - symlinkName = keyName; + _fsOps.createSymlink(new File(stormroot, symlinkName), rsrcFilePath); } - _fsOps.createSymlink(new File(stormroot, symlinkName), rsrcFilePath); } } @@ -282,6 +285,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { //Visible for testing AsyncLocalizer(Map<String, Object> conf, Localizer localizer, AdvancedFSOps ops) { _conf = conf; + _symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false); _isLocalMode = ConfigUtils.isLocalMode(conf); _localizer = localizer; _execService = Executors.newFixedThreadPool(1, http://git-wip-us.apache.org/repos/asf/storm/blob/1c906375/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java b/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java index 0135397..db6f7ae 100644 --- a/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java +++ b/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java @@ -345,6 +345,9 @@ public class Localizer { if (lrsrc == null) { LOG.warn("blob requested for update doesn't exist: {}", key); continue; + } else if ((boolean)_conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) { + LOG.warn("symlinks are disabled so blobs cannot be downloaded."); + continue; } else { // update it if either the version isn't the latest or if any local blob files are missing if (!isLocalizedResourceUpToDate(lrsrc, blobstore) || @@ -400,7 +403,9 @@ public class Localizer { public synchronized List<LocalizedResource> getBlobs(List<LocalResource> localResources, String user, String topo, File userFileDir) throws AuthorizationException, KeyNotFoundException, IOException { - + if ((boolean)_conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) { + throw new KeyNotFoundException("symlinks are disabled so blobs cannot be downloaded."); + } LocalizedResourceSet newSet = new LocalizedResourceSet(user); LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet); if (lrsrcSet == null) {