Repository: storm
Updated Branches:
  refs/heads/master 0946048eb -> c2cb25b95


STORM-2038: Disable symlinks with a config option


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1c906375
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1c906375
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1c906375

Branch: refs/heads/master
Commit: 1c9063756fe9e268676234757d54994aacfc0b7b
Parents: 0946048
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Feb 27 15:06:43 2017 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Mar 16 12:07:05 2017 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |  1 +
 docs/windows-users-guide.md                     |  9 ++++-
 storm-core/src/jvm/org/apache/storm/Config.java |  9 +++++
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |  9 +++++
 .../storm/daemon/supervisor/AdvancedFSOps.java  | 14 +++++--
 .../storm/daemon/supervisor/Container.java      | 39 ++++++++++++--------
 .../apache/storm/localizer/AsyncLocalizer.java  | 30 ++++++++-------
 .../org/apache/storm/localizer/Localizer.java   |  7 +++-
 8 files changed, 83 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1c906375/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index facd0bd..9d8c2d5 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -58,6 +58,7 @@ storm.codedistributor.class: 
"org.apache.storm.codedistributor.LocalFileSystemCo
 storm.workers.artifacts.dir: "workers-artifacts"
 storm.health.check.dir: "healthchecks"
 storm.health.check.timeout.ms: 5000
+storm.disable.symlinks: false
 
 ### nimbus.* configs are for the master
 nimbus.seeds : ["localhost"]

http://git-wip-us.apache.org/repos/asf/storm/blob/1c906375/docs/windows-users-guide.md
----------------------------------------------------------------------
diff --git a/docs/windows-users-guide.md b/docs/windows-users-guide.md
index 4ebd929..9c9a850 100644
--- a/docs/windows-users-guide.md
+++ b/docs/windows-users-guide.md
@@ -23,5 +23,10 @@ One tricky point is, `administrator` group already has this 
privilege, but it's
 So if your account belongs to `administrator` group (and you don't want to 
change it), you may want to open `command prompt` with `run as administrator` 
and execute processes within that console.
 If you don't want to execute Storm processes directly (not on command prompt), 
please execute processes with `runas /user:administrator` to run as 
administrator account.
 
-Starting with Windows 10 Creators Update, it will be possible to activate a 
Developer Mode that supports creating symbolic links without `run as 
administrator`
-[Symlinks in Windows 
10!](https://blogs.windows.com/buildingapps/2016/12/02/symlinks-windows-10/)
+Starting with Windows 10 Creators Update, it will be possible to activate a 
Developer Mode that supports creating symbolic links without `run as 
administrator`
+[Symlinks in Windows 
10!](https://blogs.windows.com/buildingapps/2016/12/02/symlinks-windows-10/)
+
+Alternatively you can disable usage of symbolic links by setting the config 
`storm.disable.symlinks` to `true`
+on Nimbus and all of the Supervisor nodes.  This will also disable features 
that require symlinks.  Currently this is only downloading
+dependent blobs, but may change in the future.  Some topologies may rely on 
symbolic links to resources in the current working directory of the worker that 
are
+created as a convienence, so it is not a 100% backwards compatible change.

http://git-wip-us.apache.org/repos/asf/storm/blob/1c906375/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java 
b/storm-core/src/jvm/org/apache/storm/Config.java
index 92f81dc..0379a18 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1412,6 +1412,15 @@ public class Config extends HashMap<String, Object> {
     public static final String SUPERVISOR_CPU_CAPACITY = 
"supervisor.cpu.capacity";
 
     /**
+     * On some systems (windows for example) symlinks require special 
privileges that not everyone wants to
+     * grant a headless user.  You can completely disable the use of symlinks 
by setting this config to true, but
+     * by doing so you may also lose some features from storm.  For example 
the blobstore feature
+     * does not currently work without symlinks enabled.
+     */
+    @isBoolean
+    public static final String DISABLE_SYMLINKS = "storm.disable.symlinks";
+
+    /**
      * The jvm opts provided to workers launched by this supervisor.
      * All "%ID%", "%WORKER-ID%", "%TOPOLOGY-ID%",
      * "%WORKER-PORT%" and "%HEAP-MEM%" substrings are replaced with:

http://git-wip-us.apache.org/repos/asf/storm/blob/1c906375/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java 
b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
index a8fab86..235ca3d 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -2477,6 +2477,15 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                 throw new InvalidTopologyException(ex.getMessage());
             }
             validator.validate(topoName, topoConf, topology);
+            if ((boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
+                @SuppressWarnings("unchecked")
+                Map<String, Object> blobMap = (Map<String, Object>) 
topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+                if (blobMap != null && !blobMap.isEmpty()) {
+                    throw new InvalidTopologyException("symlinks are disabled 
so blobs are not supported but " +
+                  Config.TOPOLOGY_BLOBSTORE_MAP + " = " + blobMap);
+                }
+            }
+
             Utils.validateTopologyBlobStoreMap(topoConf, 
Sets.newHashSet(blobStore.listKeys()));
             long uniqueNum = submittedCount.incrementAndGet();
             String topoId = topoName + "-" + uniqueNum + "-" + 
Time.currentTimeSecs();

http://git-wip-us.apache.org/repos/asf/storm/blob/1c906375/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
index 4b96197..d084720 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
@@ -56,13 +56,14 @@ public class AdvancedFSOps {
         if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), 
false)) {
             return new AdvancedRunAsUserFSOps(conf);
         }
-        return new AdvancedFSOps();
+        return new AdvancedFSOps(conf);
     }
     
     private static class AdvancedRunAsUserFSOps extends AdvancedFSOps {
         private final Map<String, Object> _conf;
         
         public AdvancedRunAsUserFSOps(Map<String, Object> conf) {
+            super(conf);
             if (Utils.isOnWindows()) {
                 throw new UnsupportedOperationException("ERROR: Windows 
doesn't support running workers as different users yet");
             }
@@ -116,6 +117,7 @@ public class AdvancedFSOps {
     private static class AdvancedWindowsFSOps extends AdvancedFSOps {
 
         public AdvancedWindowsFSOps(Map<String, Object> conf) {
+            super(conf);
             if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
                 throw new RuntimeException("ERROR: Windows doesn't support 
running workers as different users yet");
             }
@@ -140,10 +142,11 @@ public class AdvancedFSOps {
             return false;
         }
     }
+
+    protected final boolean _symlinksDisabled;
     
-    
-    protected AdvancedFSOps() {
-        //NOOP, but restricted permissions
+    protected AdvancedFSOps(Map<String, Object> conf) {
+        _symlinksDisabled = 
(boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
     }
 
     /**
@@ -341,6 +344,9 @@ public class AdvancedFSOps {
      * @throws IOException on any error.
      */
     public void createSymlink(File link, File target) throws IOException {
+        if (_symlinksDisabled) {
+            throw new IOException("Symlinks have been disabled, this hsould 
not be called");
+        }
         Path plink = link.toPath().toAbsolutePath();
         Path ptarget = target.toPath().toAbsolutePath();
         LOG.debug("Creating symlink [{}] to [{}]", plink, ptarget);

http://git-wip-us.apache.org/repos/asf/storm/blob/1c906375/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
index 9f41682..e63078c 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
@@ -87,7 +87,8 @@ public abstract class Container implements Killable {
     protected final AdvancedFSOps _ops;
     protected final ResourceIsolationInterface _resourceIsolationManager;
     protected ContainerType _type;
-    
+    protected final boolean _symlinksDisabled;
+
     /**
      * Create a new Container.
      * @param type the type of container being made.
@@ -109,6 +110,8 @@ public abstract class Container implements Killable {
         assert(conf != null);
         assert(supervisorId != null);
         
+        _symlinksDisabled = 
(boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
+        
         if (ops == null) {
             ops = AdvancedFSOps.make(conf);
         }
@@ -380,11 +383,13 @@ public abstract class Container implements Killable {
      */
     protected void createArtifactsLink() throws IOException {
         _type.assertFull();
-        File workerDir = new File(ConfigUtils.workerRoot(_conf, _workerId));
-        File topoDir = new File(ConfigUtils.workerArtifactsRoot(_conf, 
_topologyId, _port));
-        if (_ops.fileExists(workerDir)) {
-            LOG.debug("Creating symlinks for worker-id: {} topology-id: {} to 
its port artifacts directory", _workerId, _topologyId);
-            _ops.createSymlink(new File(workerDir, "artifacts"), topoDir);
+        if (!_symlinksDisabled) {
+            File workerDir = new File(ConfigUtils.workerRoot(_conf, 
_workerId));
+            File topoDir = new File(ConfigUtils.workerArtifactsRoot(_conf, 
_topologyId, _port));
+            if (_ops.fileExists(workerDir)) {
+                LOG.debug("Creating symlinks for worker-id: {} topology-id: {} 
to its port artifacts directory", _workerId, _topologyId);
+                _ops.createSymlink(new File(workerDir, "artifacts"), topoDir);
+            }
         }
     }
     
@@ -421,15 +426,19 @@ public abstract class Container implements Killable {
         }
         resourceFileNames.addAll(blobFileNames);
 
-        LOG.info("Creating symlinks for worker-id: {} storm-id: {} for 
files({}): {}", _workerId, _topologyId, resourceFileNames.size(), 
resourceFileNames);
-        if(targetResourcesDir.exists()) {
-            _ops.createSymlink(new File(workerRoot, 
ConfigUtils.RESOURCES_SUBDIR),  targetResourcesDir );
-        } else {
-            LOG.info("Topology jar for worker-id: {} storm-id: {} does not 
contain re sources directory {}." , _workerId, _topologyId, 
targetResourcesDir.toString() );
-        }
-        for (String fileName : blobFileNames) {
-            _ops.createSymlink(new File(workerRoot, fileName),
-                    new File(stormRoot, fileName));
+        if (!_symlinksDisabled) {
+            LOG.info("Creating symlinks for worker-id: {} storm-id: {} for 
files({}): {}", _workerId, _topologyId, resourceFileNames.size(), 
resourceFileNames);
+            if (targetResourcesDir.exists()) {
+                _ops.createSymlink(new File(workerRoot, 
ConfigUtils.RESOURCES_SUBDIR),  targetResourcesDir );
+            } else {
+                LOG.info("Topology jar for worker-id: {} storm-id: {} does not 
contain re sources directory {}." , _workerId, _topologyId, 
targetResourcesDir.toString() );
+            }
+            for (String fileName : blobFileNames) {
+                _ops.createSymlink(new File(workerRoot, fileName),
+                        new File(stormRoot, fileName));
+            }
+        } else if (blobFileNames.size() > 0) {
+            LOG.warn("Symlinks are disabled, no symlinks created for blobs 
{}", blobFileNames);
         }
     }
     

http://git-wip-us.apache.org/repos/asf/storm/blob/1c906375/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java 
b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
index d3e3925..1fb504f 100644
--- a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
@@ -94,6 +94,7 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
     private final Map<String, LocalDownloadedResource> _basicPending;
     private final Map<String, LocalDownloadedResource> _blobPending;
     private final AdvancedFSOps _fsOps;
+    private final boolean _symlinksDisabled;
 
     private class DownloadBaseBlobsDistributed implements Callable<Void> {
         protected final String _topologyId;
@@ -250,24 +251,26 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
                     }
                     List<LocalizedResource> localizedResources = 
_localizer.getBlobs(localResourceList, user, topoName, userDir);
                     _fsOps.setupBlobPermissions(userDir, user);
-                    for (LocalizedResource localizedResource : 
localizedResources) {
-                        String keyName = localizedResource.getKey();
-                        //The sym link we are pointing to
-                        File rsrcFilePath = new 
File(localizedResource.getCurrentSymlinkPath());
+                    if (!_symlinksDisabled) {
+                        for (LocalizedResource localizedResource : 
localizedResources) {
+                            String keyName = localizedResource.getKey();
+                            //The sym link we are pointing to
+                            File rsrcFilePath = new 
File(localizedResource.getCurrentSymlinkPath());
 
-                        String symlinkName = null;
-                        if (blobstoreMap != null) {
-                            Map<String, Object> blobInfo = 
blobstoreMap.get(keyName);
-                            if (blobInfo != null && 
blobInfo.containsKey("localname")) {
-                                symlinkName = (String) 
blobInfo.get("localname");
+                            String symlinkName = null;
+                            if (blobstoreMap != null) {
+                                Map<String, Object> blobInfo = 
blobstoreMap.get(keyName);
+                                if (blobInfo != null && 
blobInfo.containsKey("localname")) {
+                                    symlinkName = (String) 
blobInfo.get("localname");
+                                } else {
+                                    symlinkName = keyName;
+                                }
                             } else {
+                                // all things are from dependencies
                                 symlinkName = keyName;
                             }
-                        } else {
-                            // all things are from dependencies
-                            symlinkName = keyName;
+                            _fsOps.createSymlink(new File(stormroot, 
symlinkName), rsrcFilePath);
                         }
-                        _fsOps.createSymlink(new File(stormroot, symlinkName), 
rsrcFilePath);
                     }
                 }
 
@@ -282,6 +285,7 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
     //Visible for testing
     AsyncLocalizer(Map<String, Object> conf, Localizer localizer, 
AdvancedFSOps ops) {
         _conf = conf;
+        _symlinksDisabled = 
(boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
         _isLocalMode = ConfigUtils.isLocalMode(conf);
         _localizer = localizer;
         _execService = Executors.newFixedThreadPool(1,  

http://git-wip-us.apache.org/repos/asf/storm/blob/1c906375/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java 
b/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
index 0135397..db6f7ae 100644
--- a/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
+++ b/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
@@ -345,6 +345,9 @@ public class Localizer {
         if (lrsrc == null) {
           LOG.warn("blob requested for update doesn't exist: {}", key);
           continue;
+        } else if ((boolean)_conf.getOrDefault(Config.DISABLE_SYMLINKS, 
false)) {
+          LOG.warn("symlinks are disabled so blobs cannot be downloaded.");
+          continue;
         } else {
           // update it if either the version isn't the latest or if any local 
blob files are missing
           if (!isLocalizedResourceUpToDate(lrsrc, blobstore) ||
@@ -400,7 +403,9 @@ public class Localizer {
   public synchronized List<LocalizedResource> getBlobs(List<LocalResource> 
localResources,
       String user, String topo, File userFileDir)
       throws AuthorizationException, KeyNotFoundException, IOException {
-
+    if ((boolean)_conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
+      throw new KeyNotFoundException("symlinks are disabled so blobs cannot be 
downloaded.");
+    }
     LocalizedResourceSet newSet = new LocalizedResourceSet(user);
     LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet);
     if (lrsrcSet == null) {

Reply via email to