Repository: storm
Updated Branches:
  refs/heads/master c5988e83d -> 8ffa920d3


STORM-3026: Upgrade ZK instance for security


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8ffa920d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8ffa920d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8ffa920d

Branch: refs/heads/master
Commit: 8ffa920d3894634aa078f0fdf6b02d270262caf4
Parents: c5988e8
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Apr 13 16:31:57 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Apr 13 16:31:57 2018 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |   2 +
 .../org/apache/storm/cluster/ClusterUtils.java  |  30 +-
 .../org/apache/storm/cluster/DaemonType.java    |  10 +-
 .../storm/cluster/IStormClusterState.java       |  14 +-
 .../storm/cluster/StormClusterStateImpl.java    |  42 +--
 .../apache/storm/cluster/ZKStateStorage.java    |  21 +-
 .../transactional/state/TransactionalState.java |   5 +-
 .../topology/state/TransactionalState.java      |   5 +-
 .../org/apache/storm/utils/CuratorUtils.java    |  38 ++-
 .../src/jvm/org/apache/storm/utils/Utils.java   |  25 +-
 .../apache/storm/zookeeper/ClientZookeeper.java |  34 +-
 .../apache/storm/utils/CuratorUtilsTest.java    |   4 +-
 .../apache/storm/command/shell_submission.clj   |  31 +-
 .../test/clj/org/apache/storm/cluster_test.clj  |  13 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |   4 +-
 .../java/org/apache/storm/DaemonConfig.java     |  15 +
 .../apache/storm/blobstore/BlobStoreUtils.java  |   6 +-
 .../storm/blobstore/LocalFsBlobStore.java       |   3 +-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |  23 +-
 .../daemon/supervisor/SupervisorUtils.java      |   2 -
 .../apache/storm/zookeeper/AclEnforcement.java  | 321 +++++++++++++++++++
 21 files changed, 519 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index c985c12..0e67957 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -48,6 +48,8 @@ storm.messaging.transport: 
"org.apache.storm.messaging.netty.Context"
 storm.nimbus.retry.times: 5
 storm.nimbus.retry.interval.millis: 2000
 storm.nimbus.retry.intervalceiling.millis: 60000
+storm.nimbus.zookeeper.acls.check: true
+storm.nimbus.zookeeper.acls.fixup: true
 storm.auth.simple-white-list.users: []
 storm.cluster.state.store: "org.apache.storm.cluster.ZKStateStorageFactory"
 storm.meta.serialization.delegate: 
"org.apache.storm.serialization.GzipThriftSerializationDelegate"

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java 
b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
index b3dfc7d..147f586 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -46,7 +46,6 @@ public class ClusterUtils {
     public static final String ZK_SEPERATOR = "/";
 
     public static final String ASSIGNMENTS_ROOT = "assignments";
-    public static final String CODE_ROOT = "code";
     public static final String STORMS_ROOT = "storms";
     public static final String SUPERVISORS_ROOT = "supervisors";
     public static final String WORKERBEATS_ROOT = "workerbeats";
@@ -98,15 +97,38 @@ public class ClusterUtils {
         _instance = INSTANCE;
     }
 
-    public static List<ACL> mkTopoOnlyAcls(Map<String, Object> topoConf) 
throws NoSuchAlgorithmException {
+    /**
+     * Get ZK ACLs for a topology to have read/write access.
+     * @param topoConf the topology config.
+     * @return the ACLs.
+     */
+    public static List<ACL> mkTopoReadWriteAcls(Map<String, Object> topoConf) {
+        return mkTopoAcls(topoConf, ZooDefs.Perms.ALL);
+    }
+
+    /**
+     * Get ZK ACLs for a topology to have read only access.
+     * @param topoConf the topology config.
+     * @return the ACLs.
+     */
+    public static List<ACL> mkTopoReadOnlyAcls(Map<String, Object> topoConf) {
+        return mkTopoAcls(topoConf, ZooDefs.Perms.READ);
+    }
+
+    private static List<ACL> mkTopoAcls(Map<String, Object> topoConf, int 
perms) {
         List<ACL> aclList = null;
         String payload = (String) 
topoConf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
         if (Utils.isZkAuthenticationConfiguredTopology(topoConf)) {
             aclList = new ArrayList<>();
             ACL acl1 = ZooDefs.Ids.CREATOR_ALL_ACL.get(0);
             aclList.add(acl1);
-            ACL acl2 = new ACL(ZooDefs.Perms.READ, new Id("digest", 
DigestAuthenticationProvider.generateDigest(payload)));
-            aclList.add(acl2);
+            try {
+                ACL acl2 = new ACL(perms, new Id("digest", 
DigestAuthenticationProvider.generateDigest(payload)));
+                aclList.add(acl2);
+            } catch (NoSuchAlgorithmException e) {
+                //Should only happen on a badly configured system
+                throw new RuntimeException(e);
+            }
         }
         return aclList;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java 
b/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
index 4f3865e..551f347 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
@@ -62,8 +62,11 @@ public enum DaemonType {
                     return ZooDefs.Ids.CREATOR_ALL_ACL;
                 case DRPC:
                     List<ACL> ret = new 
ArrayList<>(ZooDefs.Ids.CREATOR_ALL_ACL);
-                    ret.add(new ACL(ZooDefs.Perms.READ,
-                        
Utils.parseZkId((String)conf.get(Config.STORM_ZOOKEEPER_DRPC_ACL), 
Config.STORM_ZOOKEEPER_DRPC_ACL)));
+                    String drpcAcl = 
(String)conf.get(Config.STORM_ZOOKEEPER_DRPC_ACL);
+                    if (drpcAcl != null) {
+                        ret.add(new ACL(ZooDefs.Perms.READ,
+                            Utils.parseZkId(drpcAcl, 
Config.STORM_ZOOKEEPER_DRPC_ACL)));
+                    } //else we assume it is the same as teh SUPER_ACL which 
is covered by CREATOR_ALL
                     return ret;
                 default:
                     throw new IllegalStateException("WorkerTokens for " + type 
+ " are not currently supported.");
@@ -81,8 +84,7 @@ public enum DaemonType {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DaemonType.class);
     @VisibleForTesting
-    public static final List<ACL> NIMBUS_SUPERVISOR_ZK_ACLS = 
Arrays.asList(ZooDefs.Ids.CREATOR_ALL_ACL.get(0),
-        new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE, 
ZooDefs.Ids.ANYONE_ID_UNSAFE));
+    public static final List<ACL> NIMBUS_SUPERVISOR_ZK_ACLS = 
ZooDefs.Ids.CREATOR_ALL_ACL;
     private static List<ACL> getDefaultNimbusSupervisorZkAcls(Map<String, 
Object> conf) {
         if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
             return NIMBUS_SUPERVISOR_ZK_ACLS;

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java 
b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 8a922ac..d2fccaa 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -131,7 +131,7 @@ public interface IStormClusterState {
 
     SupervisorInfo supervisorInfo(String supervisorId); // returns nil if 
doesn't exist
 
-    void setupHeatbeats(String stormId);
+    void setupHeatbeats(String stormId, Map<String, Object> topoConf);
 
     void teardownHeartbeats(String stormId);
 
@@ -154,7 +154,7 @@ public interface IStormClusterState {
      */
     NimbusInfo getLeader(Runnable callback);
 
-    void setTopologyLogConfig(String stormId, LogConfig logConfig);
+    void setTopologyLogConfig(String stormId, LogConfig logConfig, Map<String, 
Object> topoConf);
 
     LogConfig topologyLogConfig(String stormId, Runnable cb);
 
@@ -170,7 +170,7 @@ public interface IStormClusterState {
 
     /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. 
Will be removed soon. */
     @Deprecated
-    void setupBackpressure(String stormId);
+    void setupBackpressure(String stormId, Map<String, Object> topoConf);
 
     /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. 
Will be removed soon. */
     @Deprecated
@@ -180,13 +180,13 @@ public interface IStormClusterState {
     @Deprecated
     void removeWorkerBackpressure(String stormId, String node, Long port);
 
-    void activateStorm(String stormId, StormBase stormBase);
+    void activateStorm(String stormId, StormBase stormBase, Map<String, 
Object> topoConf);
 
     void updateStorm(String stormId, StormBase newElems);
 
     void removeStormBase(String stormId);
 
-    void setAssignment(String stormId, Assignment info);
+    void setAssignment(String stormId, Assignment info, Map<String, Object> 
topoConf);
 
     void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer 
versionInfo);
 
@@ -202,11 +202,13 @@ public interface IStormClusterState {
 
     void reportError(String stormId, String componentId, String node, Long 
port, Throwable error);
 
+    void setupErrors(String stormId, Map<String, Object> topoConf);
+
     List<ErrorInfo> errors(String stormId, String componentId);
 
     ErrorInfo lastError(String stormId, String componentId);
 
-    void setCredentials(String stormId, Credentials creds, Map<String, Object> 
topoConf) throws NoSuchAlgorithmException;
+    void setCredentials(String stormId, Credentials creds, Map<String, Object> 
topoConf);
 
     Credentials credentials(String stormId, Runnable callback);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java 
b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index fc02b8e..befd09f 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -19,7 +19,6 @@
 package org.apache.storm.cluster;
 
 import java.nio.ByteBuffer;
-import java.security.NoSuchAlgorithmException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.ArrayList;
@@ -294,7 +293,6 @@ public class StormClusterStateImpl implements 
IStormClusterState {
                     
stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
                     
stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), 
Utils.serialize(nimbusSummary), defaultAcls);
                 }
-
             }
         });
 
@@ -435,8 +433,9 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     @Override
-    public void setupHeatbeats(String stormId) {
-        stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), 
defaultAcls);
+    public void setupHeatbeats(String stormId, Map<String, Object> topoConf) {
+        stateStorage.mkdirs(ClusterUtils.WORKERBEATS_SUBTREE, defaultAcls);
+        stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), 
ClusterUtils.mkTopoReadWriteAcls(topoConf));
     }
 
     @Override
@@ -491,8 +490,9 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     @Override
-    public void setTopologyLogConfig(String stormId, LogConfig logConfig) {
-        stateStorage.set_data(ClusterUtils.logConfigPath(stormId), 
Utils.serialize(logConfig), defaultAcls);
+    public void setTopologyLogConfig(String stormId, LogConfig logConfig, 
Map<String, Object> topoConf) {
+        stateStorage.mkdirs(ClusterUtils.LOGCONFIG_SUBTREE, defaultAcls);
+        stateStorage.set_data(ClusterUtils.logConfigPath(stormId), 
Utils.serialize(logConfig), ClusterUtils.mkTopoReadOnlyAcls(topoConf));
     }
 
     @Override
@@ -555,8 +555,9 @@ public class StormClusterStateImpl implements 
IStormClusterState {
         return ret;
     }
     @Override
-    public void setupBackpressure(String stormId) {
-        stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), 
defaultAcls);
+    public void setupBackpressure(String stormId, Map<String, Object> 
topoConf) {
+        stateStorage.mkdirs(ClusterUtils.BACKPRESSURE_SUBTREE, defaultAcls);
+        stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), 
ClusterUtils.mkTopoReadWriteAcls(topoConf));
     }
 
     @Override
@@ -582,9 +583,10 @@ public class StormClusterStateImpl implements 
IStormClusterState {
         }
     }
     @Override
-    public void activateStorm(String stormId, StormBase stormBase) {
+    public void activateStorm(String stormId, StormBase stormBase, Map<String, 
Object> topoConf) {
         String path = ClusterUtils.stormPath(stormId);
-        stateStorage.set_data(path, Utils.serialize(stormBase), defaultAcls);
+        stateStorage.mkdirs(ClusterUtils.STORMS_SUBTREE, defaultAcls);
+        stateStorage.set_data(path, Utils.serialize(stormBase), 
ClusterUtils.mkTopoReadOnlyAcls(topoConf));
         this.assignmentsBackend.keepStormId(stormBase.get_name(), stormId);
     }
 
@@ -596,7 +598,6 @@ public class StormClusterStateImpl implements 
IStormClusterState {
      */
     @Override
     public void updateStorm(String stormId, StormBase newElems) {
-
         StormBase stormBase = stormBase(stormId, null);
         if (stormBase.get_component_executors() != null) {
 
@@ -683,9 +684,10 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     @Override
-    public void setAssignment(String stormId, Assignment info) {
+    public void setAssignment(String stormId, Assignment info, Map<String, 
Object> topoConf) {
         byte[] serAssignment = Utils.serialize(info);
-        stateStorage.set_data(ClusterUtils.assignmentPath(stormId), 
serAssignment, defaultAcls);
+        stateStorage.mkdirs(ClusterUtils.ASSIGNMENTS_SUBTREE, defaultAcls);
+        stateStorage.set_data(ClusterUtils.assignmentPath(stormId), 
Utils.serialize(info), ClusterUtils.mkTopoReadOnlyAcls(topoConf));
         this.assignmentsBackend.keepOrUpdateAssignment(stormId, info);
     }
 
@@ -736,8 +738,13 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     @Override
-    public void reportError(String stormId, String componentId, String node, 
Long port, Throwable error) {
+    public void setupErrors(String stormId, Map<String, Object> topoConf) {
+        stateStorage.mkdirs(ClusterUtils.ERRORS_SUBTREE, defaultAcls);
+        stateStorage.mkdirs(ClusterUtils.errorStormRoot(stormId), 
ClusterUtils.mkTopoReadWriteAcls(topoConf));
+    }
 
+    @Override
+    public void reportError(String stormId, String componentId, String node, 
Long port, Throwable error) {
         String path = ClusterUtils.errorPath(stormId, componentId);
         String lastErrorPath = ClusterUtils.lastErrorPath(stormId, 
componentId);
         ErrorInfo errorInfo = new 
ErrorInfo(ClusterUtils.stringifyError(error), Time.currentTimeSecs());
@@ -795,7 +802,6 @@ public class StormClusterStateImpl implements 
IStormClusterState {
 
     @Override
     public ErrorInfo lastError(String stormId, String componentId) {
-
         String path = ClusterUtils.lastErrorPath(stormId, componentId);
         if (stateStorage.node_exists(path, false)) {
             ErrorInfo errorInfo = 
ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), 
ErrorInfo.class);
@@ -806,11 +812,10 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     @Override
-    public void setCredentials(String stormId, Credentials creds, Map<String, 
Object> topoConf) throws NoSuchAlgorithmException {
-        List<ACL> aclList = ClusterUtils.mkTopoOnlyAcls(topoConf);
+    public void setCredentials(String stormId, Credentials creds, Map<String, 
Object> topoConf) {
+        List<ACL> aclList = ClusterUtils.mkTopoReadOnlyAcls(topoConf);
         String path = ClusterUtils.credentialsPath(stormId);
         stateStorage.set_data(path, Utils.serialize(creds), aclList);
-
     }
 
     @Override
@@ -841,6 +846,7 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     @Override
     public void addPrivateWorkerKey(WorkerTokenServiceType type, String 
topologyId, long keyVersion, PrivateWorkerKey key) {
         assert context.getDaemonType() == DaemonType.NIMBUS;
+        stateStorage.mkdirs(ClusterUtils.SECRET_KEYS_SUBTREE, defaultAcls);
         List<ACL> secretAcls = context.getZkSecretAcls(type);
         String path = ClusterUtils.secretKeysPath(type, topologyId, 
keyVersion);
         LOG.debug("Storing private key for {} connecting to a {} at {} with 
ACL {}\n\n", topologyId, type, path, secretAcls);

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java 
b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
index 8ae6c5f..fb76169 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -77,19 +77,20 @@ public class ZKStateStorage implements IStateStorage {
     public ZKStateStorage(Map<String, Object> conf, Map<String, Object> 
authConf, ClusterStateContext context) throws Exception {
         this.conf = conf;
         this.authConf = authConf;
-        if (context.getDaemonType().equals(DaemonType.NIMBUS))
+        if (context.getDaemonType().equals(DaemonType.NIMBUS)) {
             this.isNimbus = true;
+        }
 
         // just mkdir STORM_ZOOKEEPER_ROOT dir
-        CuratorFramework zkTemp = mkZk();
+        CuratorFramework zkTemp = mkZk(context.getDaemonType());
         String rootPath = 
String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT));
         ClientZookeeper.mkdirs(zkTemp, rootPath, context.getDefaultZkAcls());
         zkTemp.close();
 
         active = new AtomicBoolean(true);
-        zkWriter = mkZk(new ZkWatcherCallBack());
+        zkWriter = mkZk(new ZkWatcherCallBack(), context.getDaemonType());
         if (isNimbus) {
-            zkReader = mkZk(new ZkWatcherCallBack());
+            zkReader = mkZk(new ZkWatcherCallBack(), context.getDaemonType());
         } else {
             zkReader = zkWriter;
         }
@@ -97,15 +98,15 @@ public class ZKStateStorage implements IStateStorage {
     }
 
     @SuppressWarnings("unchecked")
-    private CuratorFramework mkZk() throws IOException {
-        return ClientZookeeper.mkClient(conf, (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS), 
conf.get(Config.STORM_ZOOKEEPER_PORT), "",
-                new DefaultWatcherCallBack(), authConf);
+    private CuratorFramework mkZk(DaemonType type) {
+        return ClientZookeeper.mkClient(conf, (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS),
+            conf.get(Config.STORM_ZOOKEEPER_PORT), "", new 
DefaultWatcherCallBack(), authConf, type);
     }
 
     @SuppressWarnings("unchecked")
-    private CuratorFramework mkZk(WatcherCallBack watcher) throws 
NumberFormatException, IOException {
-        return ClientZookeeper.mkClient(conf, (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT),
-                String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), 
watcher, authConf);
+    private CuratorFramework mkZk(WatcherCallBack watcher, DaemonType type) 
throws NumberFormatException {
+        return ClientZookeeper.mkClient(conf, (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS),
+            conf.get(Config.STORM_ZOOKEEPER_PORT), 
String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcher, authConf, type);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
 
b/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
index c940b3f..21e2b5d 100644
--- 
a/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
+++ 
b/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
@@ -18,6 +18,7 @@
 package org.apache.storm.transactional.state;
 
 import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.serialization.KryoValuesDeserializer;
 import org.apache.storm.serialization.KryoValuesSerializer;
 import org.apache.storm.utils.Utils;
@@ -65,7 +66,7 @@ public class TransactionalState {
             List<String> servers = (List<String>) getWithBackup(conf, 
Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
             Object port = getWithBackup(conf, 
Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
             ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf);
-            CuratorFramework initter = CuratorUtils.newCuratorStarted(conf, 
servers, port, auth);
+            CuratorFramework initter = CuratorUtils.newCuratorStarted(conf, 
servers, port, auth, DaemonType.WORKER.getDefaultZkAcls(conf));
             _zkAcls = Utils.getWorkerACL(conf);
             try {
                 TransactionalState.createNode(initter, transactionalRoot, 
null, null, null);
@@ -77,7 +78,7 @@ public class TransactionalState {
             }
             initter.close();
                                     
-            _curator = CuratorUtils.newCuratorStarted(conf, servers, port, 
rootDir, auth);
+            _curator = CuratorUtils.newCuratorStarted(conf, servers, port, 
rootDir, auth, DaemonType.WORKER.getDefaultZkAcls(conf));
             _ser = new KryoValuesSerializer(conf);
             _des = new KryoValuesDeserializer(conf);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
 
b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index 61d50f8..6204a3f 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.PathAndBytesable;
 import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable;
 import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.CuratorUtils;
 import org.apache.storm.utils.ZookeeperAuthInfo;
@@ -65,7 +66,7 @@ public class TransactionalState {
             List<String> servers = (List<String>) getWithBackup(conf, 
Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
             Object port = getWithBackup(conf, 
Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
             ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf);
-            CuratorFramework initter = CuratorUtils.newCuratorStarted(conf, 
servers, port, auth);
+            CuratorFramework initter = CuratorUtils.newCuratorStarted(conf, 
servers, port, auth, DaemonType.WORKER.getDefaultZkAcls(conf));
             _zkAcls = Utils.getWorkerACL(conf);
             try {
                 TransactionalState.createNode(initter, transactionalRoot, 
null, null, null);
@@ -77,7 +78,7 @@ public class TransactionalState {
             }
             initter.close();
                                     
-            _curator = CuratorUtils.newCuratorStarted(conf, servers, port, 
rootDir, auth);
+            _curator = CuratorUtils.newCuratorStarted(conf, servers, port, 
rootDir, auth, DaemonType.WORKER.getDefaultZkAcls(conf));
         } catch (Exception e) {
            throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-client/src/jvm/org/apache/storm/utils/CuratorUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/CuratorUtils.java 
b/storm-client/src/jvm/org/apache/storm/utils/CuratorUtils.java
index a3c2558..d894a9c 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/CuratorUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/CuratorUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm.utils;
 
+import org.apache.curator.framework.api.ACLProvider;
 import org.apache.storm.Config;
 
 import org.apache.commons.lang.StringUtils;
@@ -26,6 +27,7 @@ import 
org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
 import org.apache.curator.ensemble.exhibitor.Exhibitors;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,15 +38,18 @@ import java.util.Map;
 public class CuratorUtils {
     public static final Logger LOG = 
LoggerFactory.getLogger(CuratorUtils.class);
 
-    public static CuratorFramework newCurator(Map<String, Object> conf, 
List<String> servers, Object port, String root) {
-        return newCurator(conf, servers, port, root, null);
+    public static CuratorFramework newCurator(Map<String, Object> conf, 
List<String> servers, Object port, String root,
+                                              List<ACL> defaultAcl) {
+        return newCurator(conf, servers, port, root, null, defaultAcl);
     }
 
-    public static CuratorFramework newCurator(Map<String, Object> conf, 
List<String> servers, Object port, ZookeeperAuthInfo auth) {
-        return newCurator(conf, servers, port, "", auth);
+    public static CuratorFramework newCurator(Map<String, Object> conf, 
List<String> servers, Object port, ZookeeperAuthInfo auth,
+                                              List<ACL> defaultAcl) {
+        return newCurator(conf, servers, port, "", auth, defaultAcl);
     }
 
-    public static CuratorFramework newCurator(Map<String, Object> conf, 
List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
+    public static CuratorFramework newCurator(Map<String, Object> conf, 
List<String> servers, Object port, String root,
+                                              ZookeeperAuthInfo auth, final 
List<ACL> defaultAcl) {
         List<String> serverPorts = new ArrayList<>();
         for (String zkServer : servers) {
             serverPorts.add(zkServer + ":" + ObjectReader.getInt(port));
@@ -53,6 +58,19 @@ public class CuratorUtils {
         CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder();
 
         setupBuilder(builder, zkStr, conf, auth);
+        if (defaultAcl != null) {
+            builder.aclProvider(new ACLProvider() {
+                @Override
+                public List<ACL> getDefaultAcl() {
+                    return defaultAcl;
+                }
+
+                @Override
+                public List<ACL> getAclForPath(String s) {
+                    return null;
+                }
+            });
+        }
 
         return builder.build();
     }
@@ -99,15 +117,17 @@ public class CuratorUtils {
         setupBuilder(builder, zkStr, conf, auth);
     }
 
-    public static CuratorFramework newCuratorStarted(Map<String, Object> conf, 
List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
-        CuratorFramework ret = newCurator(conf, servers, port, root, auth);
+    public static CuratorFramework newCuratorStarted(Map<String, Object> conf, 
List<String> servers, Object port,
+                                                     String root, 
ZookeeperAuthInfo auth, List<ACL> defaultAcl) {
+        CuratorFramework ret = newCurator(conf, servers, port, root, auth, 
defaultAcl);
         LOG.info("Starting Utils Curator...");
         ret.start();
         return ret;
     }
 
-    public static CuratorFramework newCuratorStarted(Map<String, Object> conf, 
List<String> servers, Object port, ZookeeperAuthInfo auth) {
-        CuratorFramework ret = newCurator(conf, servers, port, auth);
+    public static CuratorFramework newCuratorStarted(Map<String, Object> conf, 
List<String> servers, Object port,
+                                                     ZookeeperAuthInfo auth, 
List<ACL> defaultAcl) {
+        CuratorFramework ret = newCurator(conf, servers, port, auth, 
defaultAcl);
         LOG.info("Starting Utils Curator (2)...");
         ret.start();
         return ret;

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java 
b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 780d55b..f33f9ed 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -541,17 +541,30 @@ public class Utils {
         return new Id(split[0], split[1]);
     }
 
-    public static List<ACL> getWorkerACL(Map<String, Object> conf) {
-        //This is a work around to an issue with ZK where a sasl super user is 
not super unless there is an open SASL ACL so we are trying to give the correct 
perms
-        if (!isZkAuthenticationConfiguredTopology(conf)) {
-            return null;
-        }
+    /**
+     * Get the ACL for nimbus/supervisor.  The Super User ACL. This assumes 
that security is enabled.
+     * @param conf the config to get the super User ACL from
+     * @return the super user ACL.
+     */
+    public static ACL getSuperUserAcl(Map<String, Object> conf) {
         String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
         if (stormZKUser == null) {
             throw new IllegalArgumentException("Authentication is enabled but 
" + Config.STORM_ZOOKEEPER_SUPERACL + " is not set");
         }
+        return new ACL(ZooDefs.Perms.ALL, parseZkId(stormZKUser, 
Config.STORM_ZOOKEEPER_SUPERACL));
+    }
+
+    /**
+     * Get the ZK ACLs that a worker should use when writing to ZK.
+     * @param conf the config for the topology.
+     * @return the ACLs
+     */
+    public static List<ACL> getWorkerACL(Map<String, Object> conf) {
+        if (!isZkAuthenticationConfiguredTopology(conf)) {
+            return null;
+        }
         ArrayList<ACL> ret = new ArrayList<>(ZooDefs.Ids.CREATOR_ALL_ACL);
-        ret.add(new ACL(ZooDefs.Perms.ALL, parseZkId(stormZKUser, 
Config.STORM_ZOOKEEPER_SUPERACL)));
+        ret.add(getSuperUserAcl(conf));
         return ret;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java 
b/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java
index 42d4417..f8d2c15 100644
--- a/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java
+++ b/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.api.CuratorListener;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.storm.callback.DefaultWatcherCallBack;
 import org.apache.storm.callback.WatcherCallBack;
+import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.cluster.VersionedData;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.CuratorUtils;
@@ -73,8 +74,9 @@ public class ClientZookeeper {
         _instance.mkdirsImpl(zk, path, acls);
     }
 
-    public static CuratorFramework mkClient(Map<String, Object> conf, 
List<String> servers, Object port, String root, final WatcherCallBack watcher, 
Map<String, Object> authConf) {
-        return _instance.mkClientImpl(conf, servers, port, root, watcher, 
authConf);
+    public static CuratorFramework mkClient(Map<String, Object> conf, 
List<String> servers, Object port,
+                                            String root, final WatcherCallBack 
watcher, Map<String, Object> authConf, DaemonType type) {
+        return _instance.mkClientImpl(conf, servers, port, root, watcher, 
authConf, type);
     }
 
     // Deletes the state inside the zookeeper for a key, for which the
@@ -300,33 +302,13 @@ public class ClientZookeeper {
         }
     }
 
-    /**
-     * connect ZK, register Watch/unhandle Watch
-     *
-     * @return
-     */
-    public  CuratorFramework mkClientImpl(Map<String, Object> conf, 
List<String> servers, Object port, String root, final WatcherCallBack watcher) {
-        return mkClientImpl(conf, servers, port, root, watcher, null);
-    }
-
-    public  CuratorFramework mkClientImpl(Map<String, Object> conf, 
List<String> servers, Object port, String root) {
-        return mkClientImpl(conf, servers, port, root, new 
DefaultWatcherCallBack());
-    }
-
-    public  CuratorFramework mkClientImpl(Map<String, Object> conf, 
List<String> servers, Object port, Map<String, Object> authConf) {
-        return mkClientImpl(conf, servers, port, "", new 
DefaultWatcherCallBack(), authConf);
-    }
-
-    public  CuratorFramework mkClientImpl(Map<String, Object> conf, 
List<String> servers, Object port, String root, Map<String, Object> authConf) {
-        return mkClientImpl(conf, servers, port, root, new 
DefaultWatcherCallBack(), authConf);
-    }
-
-    public  CuratorFramework mkClientImpl(Map<String, Object> conf, 
List<String> servers, Object port, String root, final WatcherCallBack watcher, 
Map<String, Object> authConf) {
+    public  CuratorFramework mkClientImpl(Map<String, Object> conf, 
List<String> servers, Object port, String root,
+                                          final WatcherCallBack watcher, 
Map<String, Object> authConf, DaemonType type) {
         CuratorFramework fk;
         if (authConf != null) {
-            fk = CuratorUtils.newCurator(conf, servers, port, root, new 
ZookeeperAuthInfo(authConf));
+            fk = CuratorUtils.newCurator(conf, servers, port, root, new 
ZookeeperAuthInfo(authConf), type.getDefaultZkAcls(conf));
         } else {
-            fk = CuratorUtils.newCurator(conf, servers, port, root);
+            fk = CuratorUtils.newCurator(conf, servers, port, root, null, 
type.getDefaultZkAcls(conf));
         }
 
         fk.getCuratorListenable().addListener(new CuratorListener() {

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-client/test/jvm/org/apache/storm/utils/CuratorUtilsTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/utils/CuratorUtilsTest.java 
b/storm-client/test/jvm/org/apache/storm/utils/CuratorUtilsTest.java
index efd7054..5ec507c 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/CuratorUtilsTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/CuratorUtilsTest.java
@@ -24,6 +24,7 @@ import org.apache.curator.framework.AuthInfo;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -44,7 +45,8 @@ public class CuratorUtilsTest {
         config.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, expectedRetries);
         config.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING, 
expectedCeiling);
 
-        CuratorFramework curator = CuratorUtils.newCurator(config, 
Arrays.asList("bogus_server"), 42 /*port*/, "");
+        CuratorFramework curator = CuratorUtils.newCurator(config, 
Arrays.asList("bogus_server"), 42, "",
+            DaemonType.WORKER.getDefaultZkAcls(config));
         StormBoundedExponentialBackoffRetry policy =
                 (StormBoundedExponentialBackoffRetry) 
curator.getZookeeperClient().getRetryPolicy();
         Assert.assertEquals(policy.getBaseSleepTimeMs(), expectedInterval);

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj 
b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 4805729..6bd5570 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -15,28 +15,19 @@
 ;; limitations under the License.
 (ns org.apache.storm.command.shell-submission
   (:import [org.apache.storm Config StormSubmitter]
-           [org.apache.storm.utils ServerUtils]
-           [org.apache.storm.zookeeper Zookeeper])
+           [org.apache.storm.utils ServerUtils])
   (:use [org.apache.storm util config log])
-  (:require [clojure.string :as str])
-  (:import [org.apache.storm.callback DefaultWatcherCallBack]
-           [org.apache.storm.utils ConfigUtils]
-           [org.apache.storm.zookeeper Zookeeper ClientZookeeper])
+  (:import [org.apache.storm.utils ConfigUtils NimbusClient])
   (:gen-class))
 
 
 (defn -main [^String tmpjarpath & args]
-  (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
-        servers (.get conf Config/STORM_ZOOKEEPER_SERVERS)
-        port (.get conf Config/STORM_ZOOKEEPER_PORT)
-        root (.get conf Config/STORM_ZOOKEEPER_ROOT)
-        zk (ClientZookeeper/mkClient conf servers port root 
(DefaultWatcherCallBack.) conf)
-        ; since this is not a purpose to add to leader lock queue, passing nil 
as blob-store and topo cache is ok
-        zk-leader-elector (Zookeeper/zkLeaderElector conf zk nil nil nil nil)
-        leader-nimbus (.getLeader zk-leader-elector)
-        host (.getHost leader-nimbus)
-        port (.getPort leader-nimbus)
-        no-op (.close zk-leader-elector)
-        jarpath (StormSubmitter/submitJar conf tmpjarpath)
-        args (concat args [host port jarpath])]
-    (ServerUtils/execCommand args)))
+  (let [conf (clojurify-structure (ConfigUtils/readStormConfig))]
+    (with-open [client (NimbusClient/getConfiguredClient conf)]
+      (let [c (.getClient client)
+            ns (.getLeader c)
+            host (.get_host ns)
+            port (.get_port ns)
+            jarpath (StormSubmitter/submitJar conf tmpjarpath)
+            args (concat args [host port jarpath])]
+        (ServerUtils/execCommand args)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj 
b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index 1b2f3a2..31b74b5 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -205,21 +205,21 @@
           base1 (mkStormBase "/tmp/storm1" 1 TopologyStatus/ACTIVE 2)
           base2 (mkStormBase "/tmp/storm2" 2 TopologyStatus/ACTIVE 2)]
       (is (= [] (.assignments state nil)))
-      (.setAssignment state "storm1" assignment1)
+      (.setAssignment state "storm1" assignment1 {})
       (is (= assignment1 (.assignmentInfo state "storm1" nil)))
       (is (= nil (.assignmentInfo state "storm3" nil)))
-      (.setAssignment state "storm1" assignment2)
-      (.setAssignment state "storm3" assignment1)
+      (.setAssignment state "storm1" assignment2 {})
+      (.setAssignment state "storm3" assignment1 {})
       (is (= #{"storm1" "storm3"} (set (.assignments state nil))))
       (is (= assignment2 (.assignmentInfo state "storm1" nil)))
       (is (= assignment1 (.assignmentInfo state "storm3" nil)))
 
       (is (= [] (.activeStorms state)))
-      (.activateStorm state "storm1" base1)
+      (.activateStorm state "storm1" base1 {})
       (is (= ["storm1"] (.activeStorms state)))
       (is (= base1 (.stormBase state "storm1" nil)))
       (is (= nil (.stormBase state "storm2" nil)))
-      (.activateStorm state "storm2" base2)
+      (.activateStorm state "storm2" base2 {})
       (is (= base1 (.stormBase state "storm1" nil)))
       (is (= base2 (.stormBase state "storm2" nil)))
       (is (= #{"storm1" "storm2"} (set (.activeStorms state))))
@@ -353,7 +353,8 @@
           curator-frameworke (reify CuratorFramework (^void close [this] nil))]
       ;; No need for when clauses because we just want to return nil
       (with-open [_ (MockedClientZookeeper. zk-mock)]
-        (. (Mockito/when (.mkClientImpl zk-mock (Mockito/anyMap) (Mockito/any) 
(Mockito/any) (Mockito/anyString) (Mockito/any) (Mockito/any))) (thenReturn 
curator-frameworke))
+        (. (Mockito/when (.mkClientImpl zk-mock (Mockito/anyMap) (Mockito/any) 
(Mockito/any) (Mockito/anyString) (Mockito/any)
+         (Mockito/any) (Mockito/any))) (thenReturn curator-frameworke))
         (ClusterUtils/mkStateStorage {} nil (ClusterStateContext.))
         (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) 
(Mockito/anyString) (Mockito/eq nil))))
     (let [distributed-state-storage (reify IStateStorage

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj 
b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index d2066e8..a974231 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1777,7 +1777,7 @@
         (.thenReturn (Mockito/when (.topologyLogConfig cluster-state 
(Mockito/any String) (Mockito/anyObject))) previous-config)
 
         (.setLogConfig nimbus "foo" mock-config)
-        (.setTopologyLogConfig (Mockito/verify cluster-state) (Mockito/any 
String) (Mockito/eq expected-config))))))
+        (.setTopologyLogConfig (Mockito/verify cluster-state) (Mockito/any 
String) (Mockito/eq expected-config) (Mockito/any Map))))))
 
 (deftest log-level-update-merges-and-flags-existent-log-level
   (let [cluster-state (Mockito/mock IStormClusterState)
@@ -1826,7 +1826,7 @@
         (.thenReturn (Mockito/when (.topologyLogConfig cluster-state 
(Mockito/any String) (Mockito/anyObject))) previous-config)
 
         (.setLogConfig nimbus "foo" mock-config)
-        (.setTopologyLogConfig (Mockito/verify cluster-state) (Mockito/any 
String) (Mockito/eq expected-config))))))
+        (.setTopologyLogConfig (Mockito/verify cluster-state) (Mockito/any 
String) (Mockito/eq expected-config) (Mockito/any Map))))))
 
 (defn teardown-heartbeats [id])
 (defn teardown-topo-errors [id])

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java 
b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index c6cff6b..be26ece 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -1094,6 +1094,21 @@ public class DaemonConfig implements Validated {
     @isPositiveNumber
     public static String STORM_WORKER_TOKEN_LIFE_TIME_HOURS = 
"storm.worker.token.life.time.hours";
 
+    /**
+     * In nimbus on startup check if all of the zookeeper ACLs are correct 
before starting.  If not
+     * don't start nimbus.
+     */
+    @isBoolean
+    public static final String STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK = 
"storm.nimbus.zookeeper.acls.check";
+
+    /**
+     * In nimbus on startup check if all of the zookeeper ACLs are correct 
before starting.  If not do
+     * your best to fix them before nimbus starts, if it cannot fix them 
nimbus will not start.
+     * This overrides any value set for storm.nimbus.zookeeper.acls.check.
+     */
+    @isBoolean
+    public static final String STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP = 
"storm.nimbus.zookeeper.acls.fixup";
+
     // VALIDATION ONLY CONFIGS
     // Some configs inside Config.java may reference classes we don't want to 
expose in storm-client, but we still want to validate
     // That they reference a valid class.  To allow this to happen we do part 
of the validation on the client side with annotations on

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java 
b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
index 67b7caf..f3842b0 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -28,6 +28,7 @@ import javax.security.auth.Subject;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.KeyAlreadyExistsException;
 import org.apache.storm.generated.KeyNotFoundException;
@@ -52,12 +53,13 @@ public class BlobStoreUtils {
         return BLOBSTORE_SUBTREE;
     }
 
-    public static CuratorFramework createZKClient(Map<String, Object> conf) {
+    public static CuratorFramework createZKClient(Map<String, Object> conf, 
DaemonType type) {
         @SuppressWarnings("unchecked")
         List<String> zkServers = (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS);
         Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
         ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf);
-        CuratorFramework zkClient = CuratorUtils.newCurator(conf, zkServers, 
port, (String) conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo);
+        CuratorFramework zkClient = CuratorUtils.newCurator(conf, zkServers, 
port,
+            (String) conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo, 
type.getDefaultZkAcls(conf));
         zkClient.start();
         return zkClient;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java 
b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
index 2f73334..831fb78 100644
--- 
a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ 
b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.blobstore;
 
+import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.KeyAlreadyExistsException;
@@ -83,7 +84,7 @@ public class LocalFsBlobStore extends BlobStore {
     public void prepare(Map<String, Object> conf, String overrideBase, 
NimbusInfo nimbusInfo) {
         this.conf = conf;
         this.nimbusInfo = nimbusInfo;
-        zkClient = BlobStoreUtils.createZKClient(conf);
+        zkClient = BlobStoreUtils.createZKClient(conf, DaemonType.NIMBUS);
         if (overrideBase == null) {
             overrideBase = ConfigUtils.absoluteStormBlobStoreDir(conf);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index deeffc2..6581e04 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -200,6 +200,7 @@ import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.Utils.UptimeComputer;
 import org.apache.storm.utils.VersionInfo;
 import org.apache.storm.validation.ConfigValidation;
+import org.apache.storm.zookeeper.AclEnforcement;
 import org.apache.storm.zookeeper.ClientZookeeper;
 import org.apache.storm.zookeeper.Zookeeper;
 import org.apache.thrift.TException;
@@ -1056,6 +1057,11 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     public static Nimbus launch(INimbus inimbus) throws Exception {
         Map<String, Object> conf = Utils.merge(Utils.readStormConfig(),
                 ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false));
+        boolean fixupAcl = (boolean) 
conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP);
+        boolean checkAcl = fixupAcl || (boolean) 
conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK);
+        if (checkAcl) {
+            AclEnforcement.verifyAcls(conf, fixupAcl);
+        }
         return launchServer(conf, inimbus);
     }
     
@@ -1121,7 +1127,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         String root = (String)conf.get(Config.STORM_ZOOKEEPER_ROOT);
         CuratorFramework ret = null;
         if (servers != null && port != null) {
-            ret = ClientZookeeper.mkClient(conf, servers, port, root, new 
DefaultWatcherCallBack(), conf);
+            ret = ClientZookeeper.mkClient(conf, servers, port, root, new 
DefaultWatcherCallBack(), conf, DaemonType.NIMBUS);
         }
         return ret;
     }
@@ -2082,7 +2088,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                     TopologyDetails td = tds.get(id);
                     if (td != null) {
                         currentAssignment.set_owner(td.getTopologySubmitter());
-                        state.setAssignment(id, currentAssignment);
+                        state.setAssignment(id, currentAssignment, 
td.getConf());
                     }
                 }
                 existingAssignments.put(id, currentAssignment);
@@ -2191,12 +2197,12 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                 String topoId = entry.getKey();
                 Assignment assignment = entry.getValue();
                 Assignment existingAssignment = 
existingAssignments.get(topoId);
-                //NOT Used TopologyDetails topologyDetails = 
topologies.getById(topoId);
+                TopologyDetails td = topologies.getById(topoId);
                 if (assignment.equals(existingAssignment)) {
                     LOG.debug("Assignment for {} hasn't changed", topoId);
                 } else {
                     LOG.info("Setting new assignment for topology id {}: {}", 
topoId, assignment);
-                    state.setAssignment(topoId, assignment);
+                    state.setAssignment(topoId, assignment, td.getConf());
                 }
             }
 
@@ -2269,7 +2275,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         base.set_owner(owner);
         base.set_principal(principal);
         base.set_component_debug(new HashMap<>());
-        state.activateStorm(topoId, base);
+        state.activateStorm(topoId, base, topoConf);
         notifyTopologyActionListener(topoName, "activate");
     }
     
@@ -3075,9 +3081,10 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                 LOG.info("uploadedJar {}", uploadedJarLocation);
                 setupStormCode(conf, topoId, uploadedJarLocation, 
totalConfToSave, topology);
                 waitForDesiredCodeReplication(totalConf, topoId);
-                state.setupHeatbeats(topoId);
+                state.setupHeatbeats(topoId, topoConf);
+                state.setupErrors(topoId, topoConf);
                 if 
(ObjectReader.getBoolean(totalConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), 
false)) {
-                    state.setupBackpressure(topoId);
+                    state.setupBackpressure(topoId, topoConf);
                 }
                 notifyTopologyActionListener(topoName, "submitTopology");
                 TopologyStatus status = null;
@@ -3267,7 +3274,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                 }
             }
             LOG.info("Setting log config for {}:{}", topoName, 
mergedLogConfig);
-            state.setTopologyLogConfig(topoId, mergedLogConfig);
+            state.setTopologyLogConfig(topoId, mergedLogConfig, topoConf);
         } catch (Exception e) {
             LOG.warn("set log config topology exception. (topology id='{}')", 
topoId, e);
             if (e instanceof TException) {

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 90e5451..43c125d 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -33,8 +33,6 @@ import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.LocalState;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/8ffa920d/storm-server/src/main/java/org/apache/storm/zookeeper/AclEnforcement.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/zookeeper/AclEnforcement.java 
b/storm-server/src/main/java/org/apache/storm/zookeeper/AclEnforcement.java
new file mode 100644
index 0000000..96bce15
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/zookeeper/AclEnforcement.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.zookeeper;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.security.auth.Subject;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.callback.DefaultWatcherCallBack;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.WorkerTokenServiceType;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.security.auth.NimbusPrincipal;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServerUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is code intended to enforce ZK ACLs.
+ */
+public class AclEnforcement {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AclEnforcement.class);
+
+    /**
+     * Verify the ZK ACLs are correct and optionally fix them if needed.
+     * @param conf the cluster config.
+     * @param fixUp true if we want to fix the ACLs else false.
+     * @throws Exception on any error.
+     */
+    public static void verifyAcls(Map<String, Object> conf, final boolean 
fixUp) throws Exception {
+        if (!Utils.isZkAuthenticationConfiguredStormServer(conf)) {
+            LOG.info("SECURITY IS DISABLED NO FURTHER CHECKS...");
+            //There is no security so we are done.
+            return;
+        }
+        ACL superUserAcl = Utils.getSuperUserAcl(conf);
+        List<ACL> superAcl = new ArrayList<>(1);
+        superAcl.add(superUserAcl);
+
+        List<ACL> drpcFullAcl = new ArrayList<>(2);
+        drpcFullAcl.add(superUserAcl);
+
+        String drpcAclString = 
(String)conf.get(Config.STORM_ZOOKEEPER_DRPC_ACL);
+        if (drpcAclString != null) {
+            Id drpcAclId = Utils.parseZkId(drpcAclString, 
Config.STORM_ZOOKEEPER_DRPC_ACL);
+            ACL drpcUserAcl = new ACL(ZooDefs.Perms.READ, drpcAclId);
+            drpcFullAcl.add(drpcUserAcl);
+        }
+
+        List<String> zkServers = (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+        int port = ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_PORT));
+        String stormRoot = (String) conf.get(Config.STORM_ZOOKEEPER_ROOT);
+
+        try (CuratorFramework zk = ClientZookeeper.mkClient(conf, zkServers, 
port, "",
+            new DefaultWatcherCallBack(), conf, DaemonType.NIMBUS)) {
+            if (zk.checkExists().forPath(stormRoot) != null) {
+                //First off we want to verify that ROOT is good
+                verifyAclStrict(zk, superAcl, stormRoot, fixUp);
+            } else {
+                LOG.warn("{} does not exist no need to check any more...", 
stormRoot);
+                return;
+            }
+        }
+
+        // Now that the root is fine we can start to look at the other paths 
under it.
+        try (CuratorFramework zk = ClientZookeeper.mkClient(conf, zkServers, 
port, stormRoot,
+            new DefaultWatcherCallBack(), conf, DaemonType.NIMBUS)) {
+            //Next verify that the blob store is correct before we start it up.
+            if (zk.checkExists().forPath(ClusterUtils.BLOBSTORE_SUBTREE) != 
null) {
+                verifyAclStrictRecursive(zk, superAcl, 
ClusterUtils.BLOBSTORE_SUBTREE, fixUp);
+            }
+
+            if 
(zk.checkExists().forPath(ClusterUtils.BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE)
 != null) {
+                verifyAclStrict(zk, superAcl, 
ClusterUtils.BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE, fixUp);
+            }
+
+            //The blobstore is good, now lets get the list of all topo Ids
+            Set<String> topoIds = new HashSet<>();
+            if (zk.checkExists().forPath(ClusterUtils.STORMS_SUBTREE) != null) 
{
+                
topoIds.addAll(zk.getChildren().forPath(ClusterUtils.STORMS_SUBTREE));
+            }
+
+            Map<String, Id> topoToZkCreds = new HashMap<>();
+            //Now lets get the creds for the topos so we can verify those as 
well.
+            BlobStore bs = ServerUtils.getNimbusBlobStore(conf, 
NimbusInfo.fromConf(conf));
+            try {
+                Subject nimbusSubject = new Subject();
+                nimbusSubject.getPrincipals().add(new NimbusPrincipal());
+                for (String topoId: topoIds) {
+                    try {
+                        String blobKey = topoId + "-stormconf.ser";
+                        Map<String, Object> topoConf = 
Utils.fromCompressedJsonConf(bs.readBlob(blobKey, nimbusSubject));
+                        String payload = (String) 
topoConf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
+                        try {
+                            topoToZkCreds.put(topoId, new Id("digest", 
DigestAuthenticationProvider.generateDigest(payload)));
+                        } catch (NoSuchAlgorithmException e) {
+                            throw new RuntimeException(e);
+                        }
+                    } catch (KeyNotFoundException knf) {
+                        LOG.debug("topo removed {}", topoId, knf);
+                    }
+                }
+            } finally {
+                if (bs != null) {
+                    bs.shutdown();
+                }
+            }
+
+            verifyParentWithReadOnlyTopoChildren(zk, superUserAcl, 
ClusterUtils.STORMS_SUBTREE, topoToZkCreds, fixUp);
+            verifyParentWithReadOnlyTopoChildren(zk, superUserAcl, 
ClusterUtils.ASSIGNMENTS_SUBTREE, topoToZkCreds, fixUp);
+            //There is a race on credentials where they can be leaked in some 
versions of storm.
+            verifyParentWithReadOnlyTopoChildrenDeleteDead(zk, superUserAcl, 
ClusterUtils.CREDENTIALS_SUBTREE, topoToZkCreds, fixUp);
+            //There is a race on logconfig where they can be leaked in some 
versions of storm.
+            verifyParentWithReadOnlyTopoChildrenDeleteDead(zk, superUserAcl, 
ClusterUtils.LOGCONFIG_SUBTREE, topoToZkCreds, fixUp);
+            //There is a race on backpressure too...
+            verifyParentWithReadWriteTopoChildrenDeleteDead(zk, superUserAcl, 
ClusterUtils.BACKPRESSURE_SUBTREE, topoToZkCreds, fixUp);
+
+            if (zk.checkExists().forPath(ClusterUtils.ERRORS_SUBTREE) != null) 
{
+                //errors is a bit special because in older versions of storm 
the worker created the parent directories lazily
+                // because of this it means we need to auto create at least 
the topo-id directory for all running topos.
+                for (String topoId : topoToZkCreds.keySet()) {
+                    String path = ClusterUtils.errorStormRoot(topoId);
+                    if (zk.checkExists().forPath(path) == null) {
+                        LOG.warn("Creating missing errors location {}", path);
+                        zk.create().withACL(getTopoReadWrite(path, topoId, 
topoToZkCreds, superUserAcl, fixUp)).forPath(path);
+                    }
+                }
+            }
+            //Error should not be leaked according to the code, but they are 
not important enough to fail the build if
+            // for some odd reason they are leaked.
+            verifyParentWithReadWriteTopoChildrenDeleteDead(zk, superUserAcl, 
ClusterUtils.ERRORS_SUBTREE, topoToZkCreds, fixUp);
+
+            if (zk.checkExists().forPath(ClusterUtils.SECRET_KEYS_SUBTREE) != 
null) {
+                verifyAclStrict(zk, superAcl, 
ClusterUtils.SECRET_KEYS_SUBTREE, fixUp);
+                verifyAclStrictRecursive(zk, superAcl, 
ClusterUtils.secretKeysPath(WorkerTokenServiceType.NIMBUS), fixUp);
+                verifyAclStrictRecursive(zk, drpcFullAcl, 
ClusterUtils.secretKeysPath(WorkerTokenServiceType.DRPC), fixUp);
+            }
+
+            if (zk.checkExists().forPath(ClusterUtils.NIMBUSES_SUBTREE) != 
null) {
+                verifyAclStrictRecursive(zk, superAcl, 
ClusterUtils.NIMBUSES_SUBTREE, fixUp);
+            }
+
+            if (zk.checkExists().forPath("/leader-lock") != null) {
+                verifyAclStrictRecursive(zk, superAcl, "/leader-lock", fixUp);
+            }
+
+            if (zk.checkExists().forPath(ClusterUtils.PROFILERCONFIG_SUBTREE) 
!= null) {
+                verifyAclStrictRecursive(zk, superAcl, 
ClusterUtils.PROFILERCONFIG_SUBTREE, fixUp);
+            }
+
+            if (zk.checkExists().forPath(ClusterUtils.SUPERVISORS_SUBTREE) != 
null) {
+                verifyAclStrictRecursive(zk, superAcl, 
ClusterUtils.SUPERVISORS_SUBTREE, fixUp);
+            }
+
+            // When moving to pacemaker workerbeats can be leaked too...
+            verifyParentWithReadWriteTopoChildrenDeleteDead(zk, superUserAcl, 
ClusterUtils.WORKERBEATS_SUBTREE, topoToZkCreds, fixUp);
+        }
+    }
+
+    private static List<ACL> getTopoAcl(String path, String topoId, 
Map<String, Id> topoToZkCreds, ACL superAcl, boolean fixUp, int perms) {
+        Id id = topoToZkCreds.get(topoId);
+        if (id == null) {
+            String error = "Could not find credentials for topology " + topoId 
+ " at path " + path + ".";
+            if (fixUp) {
+                error += " Don't know how to fix this automatically. Please 
add needed ACLs, or delete the path.";
+            }
+            throw new IllegalStateException(error);
+        }
+        List<ACL> ret = new ArrayList<>(2);
+        ret.add(superAcl);
+        ret.add(new ACL(perms, id));
+        return ret;
+    }
+
+    private static List<ACL> getTopoReadWrite(String path, String topoId, 
Map<String, Id> topoToZkCreds, ACL superAcl, boolean fixUp) {
+        return getTopoAcl(path, topoId, topoToZkCreds, superAcl, fixUp, 
ZooDefs.Perms.ALL);
+    }
+
+    private static void 
verifyParentWithTopoChildrenDeleteDead(CuratorFramework zk, ACL superUserAcl, 
String path,
+                                                     Map<String, Id> 
topoToZkCreds, boolean fixUp, int perms) throws Exception {
+        if (zk.checkExists().forPath(path) != null) {
+            verifyAclStrict(zk, Arrays.asList(superUserAcl), path, fixUp);
+            Set<String> possiblyBadIds = new HashSet<>();
+            for (String topoId : zk.getChildren().forPath(path)) {
+                String childPath = path + ClusterUtils.ZK_SEPERATOR + topoId;
+                if (!topoToZkCreds.containsKey(topoId)) {
+                    //Save it to try again later...
+                    possiblyBadIds.add(topoId);
+                } else {
+                    List<ACL> rwAcl = getTopoAcl(path, topoId, topoToZkCreds, 
superUserAcl, fixUp, perms);
+                    verifyAclStrictRecursive(zk, rwAcl, childPath, fixUp);
+                }
+            }
+
+            if (!possiblyBadIds.isEmpty()) {
+                //Lets reread the children in STORMS as the source of truth 
and see if a new one was created in the background
+                
possiblyBadIds.removeAll(zk.getChildren().forPath(ClusterUtils.STORMS_SUBTREE));
+                for (String topoId: possiblyBadIds) {
+                    //Now we know for sure that this is a bad id
+                    String childPath = path + ClusterUtils.ZK_SEPERATOR + 
topoId;
+                    zk.delete().deletingChildrenIfNeeded().forPath(childPath);
+                }
+            }
+        }
+    }
+
+    private static void 
verifyParentWithReadOnlyTopoChildrenDeleteDead(CuratorFramework zk, ACL 
superUserAcl, String path,
+                                                             Map<String, Id> 
topoToZkCreds, boolean fixUp) throws Exception {
+        verifyParentWithTopoChildrenDeleteDead(zk, superUserAcl, path, 
topoToZkCreds, fixUp, ZooDefs.Perms.READ);
+    }
+
+    private static void 
verifyParentWithReadWriteTopoChildrenDeleteDead(CuratorFramework zk, ACL 
superUserAcl, String path,
+                                                              Map<String, Id> 
topoToZkCreds, boolean fixUp) throws Exception {
+        verifyParentWithTopoChildrenDeleteDead(zk, superUserAcl, path, 
topoToZkCreds, fixUp, ZooDefs.Perms.ALL);
+    }
+
+    private static void verifyParentWithTopoChildren(CuratorFramework zk, ACL 
superUserAcl, String path,
+                                                     Map<String, Id> 
topoToZkCreds, boolean fixUp, int perms) throws Exception {
+        if (zk.checkExists().forPath(path) != null) {
+            verifyAclStrict(zk, Arrays.asList(superUserAcl), path, fixUp);
+            for (String topoId : zk.getChildren().forPath(path)) {
+                String childPath = path + ClusterUtils.ZK_SEPERATOR + topoId;
+                List<ACL> rwAcl = getTopoAcl(path, topoId, topoToZkCreds, 
superUserAcl, fixUp, perms);
+                verifyAclStrictRecursive(zk, rwAcl, childPath, fixUp);
+            }
+        }
+    }
+
+    private static void verifyParentWithReadOnlyTopoChildren(CuratorFramework 
zk, ACL superUserAcl, String path,
+                                                             Map<String, Id> 
topoToZkCreds, boolean fixUp) throws Exception {
+        verifyParentWithTopoChildren(zk, superUserAcl, path, topoToZkCreds, 
fixUp, ZooDefs.Perms.READ);
+    }
+
+    private static void verifyParentWithReadWriteTopoChildren(CuratorFramework 
zk, ACL superUserAcl, String path,
+                                                              Map<String, Id> 
topoToZkCreds, boolean fixUp) throws Exception {
+        verifyParentWithTopoChildren(zk, superUserAcl, path, topoToZkCreds, 
fixUp, ZooDefs.Perms.ALL);
+    }
+
+    private static void verifyAclStrictRecursive(CuratorFramework zk, 
List<ACL> strictAcl, String path, boolean fixUp) throws Exception {
+        verifyAclStrict(zk, strictAcl, path, fixUp);
+        for (String child: zk.getChildren().forPath(path)) {
+            String newPath = path + ClusterUtils.ZK_SEPERATOR + child;
+            verifyAclStrictRecursive(zk, strictAcl, newPath, fixUp);
+        }
+    }
+
+    private static void verifyAclStrict(CuratorFramework zk, List<ACL> 
strictAcl, String path, boolean fixUp) throws Exception {
+        try {
+            List<ACL> foundAcl = zk.getACL().forPath(path);
+            if (!equivalent(foundAcl, strictAcl)) {
+                if (fixUp) {
+                    LOG.warn("{} expected to have ACL {}, but has {}.  
Fixing...", path, strictAcl, foundAcl);
+                    zk.setACL().withACL(strictAcl).forPath(path);
+                } else {
+                    throw new IllegalStateException(path + " did not have the 
correct ACL found " + foundAcl + " expected " + strictAcl);
+                }
+            }
+        } catch (KeeperException.NoNodeException ne) {
+            LOG.debug("{} removed in the middle of checking it", ne);
+        }
+    }
+
+    private static boolean equivalent(List<ACL> a, List<ACL> b) {
+        if (a.size() == b.size()) {
+            for (ACL aAcl: a) {
+                if (!b.contains(aAcl)) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
+    public static void main(String [] args) throws Exception {
+        Map<String, Object> conf = Utils.readStormConfig();
+        boolean fixUp = false;
+        for (String arg: args) {
+            String a = arg.toLowerCase();
+            if ("-f".equals(a) || "--fixup".equals(a)) {
+                fixUp = true;
+            } else {
+                throw new IllegalArgumentException("Unsupported argument " + 
arg + " only -f or --fixup is supported.");
+            }
+        }
+        verifyAcls(conf, fixUp);
+    }
+}
\ No newline at end of file

Reply via email to