Repository: storm
Updated Branches:
  refs/heads/1.1.x-branch 58f7aefb8 -> 22a962073


STORM-3026: Upgrade ZK instance for security


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/22a96207
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/22a96207
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/22a96207

Branch: refs/heads/1.1.x-branch
Commit: 22a962073c5f12dc5ab281a15d93eb5efc31ab6b
Parents: 58f7aef
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Apr 13 16:46:42 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Apr 13 16:46:42 2018 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |   2 +
 storm-core/src/clj/org/apache/storm/cluster.clj |  61 ++--
 .../cluster_state/zookeeper_state_factory.clj   |   4 +-
 .../apache/storm/command/shell_submission.clj   |  24 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  41 ++-
 .../src/clj/org/apache/storm/zookeeper.clj      |   8 +-
 storm-core/src/jvm/org/apache/storm/Config.java |  15 +
 .../apache/storm/blobstore/BlobStoreUtils.java  |   9 +-
 .../storm/blobstore/BlobSynchronizer.java       |   3 +-
 .../storm/blobstore/KeySequenceNumber.java      |   5 +-
 .../storm/blobstore/LocalFsBlobStore.java       |   8 +-
 .../org/apache/storm/cluster/ClusterUtils.java  |  30 +-
 .../storm/cluster/IStormClusterState.java       |  14 +-
 .../storm/cluster/StormClusterStateImpl.java    |  38 ++-
 .../apache/storm/cluster/ZKStateStorage.java    |  14 +-
 .../daemon/supervisor/SupervisorUtils.java      |   5 +-
 .../transactional/state/TransactionalState.java |   4 +-
 .../topology/state/TransactionalState.java      |   4 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |  69 +++--
 .../apache/storm/zookeeper/AclEnforcement.java  | 301 +++++++++++++++++++
 .../org/apache/storm/zookeeper/Zookeeper.java   |  37 +--
 .../test/clj/org/apache/storm/cluster_test.clj  |  10 +-
 .../test/clj/org/apache/storm/utils_test.clj    |   2 +-
 23 files changed, 568 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 46a4d87..d8029ff 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -48,6 +48,8 @@ storm.messaging.transport: 
"org.apache.storm.messaging.netty.Context"
 storm.nimbus.retry.times: 5
 storm.nimbus.retry.interval.millis: 2000
 storm.nimbus.retry.intervalceiling.millis: 60000
+storm.nimbus.zookeeper.acls.check: true
+storm.nimbus.zookeeper.acls.fixup: true
 storm.auth.simple-white-list.users: []
 storm.auth.simple-acl.users: []
 storm.auth.simple-acl.users.commands: []

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/clj/org/apache/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj 
b/storm-core/src/clj/org/apache/storm/cluster.clj
index 810b3c3..f1a0412 100644
--- a/storm-core/src/clj/org/apache/storm/cluster.clj
+++ b/storm-core/src/clj/org/apache/storm/cluster.clj
@@ -30,13 +30,22 @@
   (:require [org.apache.storm [zookeeper :as zk]])
   (:require [org.apache.storm.daemon [common :as common]]))
 
-(defn mk-topo-only-acls
-  [topo-conf]
+(defn mk-topo-acls
+  [topo-conf type]
   (let [payload (.get topo-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)]
     (when (Utils/isZkAuthenticationConfiguredTopology topo-conf)
       [(first ZooDefs$Ids/CREATOR_ALL_ACL)
-       (ACL. ZooDefs$Perms/READ (Id. "digest" 
(DigestAuthenticationProvider/generateDigest payload)))])))
- 
+       (ACL. type (Id. "digest" (DigestAuthenticationProvider/generateDigest 
payload)))])))
+
+(defn mk-topo-read-write-acls
+  [topo-conf]
+  (mk-topo-acls topo-conf ZooDefs$Perms/ALL))
+
+(defn mk-topo-read-only-acls
+  [topo-conf]
+  [topo-conf]
+  (mk-topo-acls topo-conf ZooDefs$Perms/READ))
+
 (defnk mk-distributed-cluster-state
   [conf :auth-conf nil :acls nil :context (ClusterStateContext.)]
   (let [clazz (Class/forName (or (conf STORM-CLUSTER-STATE-STORE)
@@ -68,26 +77,27 @@
   (executor-beats [this storm-id executor->node+port])
   (supervisors [this callback])
   (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
-  (setup-heartbeats! [this storm-id])
+  (setup-heartbeats! [this storm-id topo-conf])
   (teardown-heartbeats! [this storm-id])
   (teardown-topology-errors! [this storm-id])
   (heartbeat-storms [this])
   (error-topologies [this])
   (backpressure-topologies [this])
-  (set-topology-log-config! [this storm-id log-config])
+  (set-topology-log-config! [this storm-id log-config topo-conf])
   (topology-log-config [this storm-id cb])
   (worker-heartbeat! [this storm-id node port info])
   (remove-worker-heartbeat! [this storm-id node port])
   (supervisor-heartbeat! [this supervisor-id info])
   (worker-backpressure! [this storm-id node port info])
   (topology-backpressure [this storm-id callback])
-  (setup-backpressure! [this storm-id])
+  (setup-backpressure! [this storm-id topo-conf])
   (remove-backpressure! [this storm-id])
   (remove-worker-backpressure! [this storm-id node port])
-  (activate-storm! [this storm-id storm-base])
+  (activate-storm! [this storm-id storm-base topo-conf])
   (update-storm! [this storm-id new-elems])
   (remove-storm-base! [this storm-id])
-  (set-assignment! [this storm-id info])
+  (setup-errors! [this storm-id topo-conf])
+  (set-assignment! [this storm-id info topo-conf])
   ;; sets up information related to key consisting of nimbus
   ;; host:port and version info of the blob
   (setup-blobstore! [this key nimbusInfo versionInfo])
@@ -416,8 +426,9 @@
         (maybe-deserialize (.get_data cluster-state (log-config-path storm-id) 
(not-nil? cb)) LogConfig))
 
       (set-topology-log-config!
-        [this storm-id log-config]
-        (.set_data cluster-state (log-config-path storm-id) (Utils/serialize 
log-config) acls))
+        [this storm-id log-config topo-conf]
+        (.mkdirs cluster-state LOGCONFIG-SUBTREE acls)
+        (.set_data cluster-state (log-config-path storm-id) (Utils/serialize 
log-config) (mk-topo-read-only-acls topo-conf)))
 
       (set-worker-profile-request
         [this storm-id profile-request]
@@ -472,8 +483,9 @@
         (.delete_worker_hb cluster-state (workerbeat-path storm-id node port)))
 
       (setup-heartbeats!
-        [this storm-id]
-        (.mkdirs cluster-state (workerbeat-storm-root storm-id) acls))
+        [this storm-id topo-conf]
+        (.mkdirs cluster-state WORKERBEATS-SUBTREE acls)
+        (.mkdirs cluster-state (workerbeat-storm-root storm-id) 
(mk-topo-read-write-acls topo-conf)))
 
       (teardown-heartbeats!
         [this storm-id]
@@ -506,8 +518,9 @@
               (> (count children) 0)))
       
       (setup-backpressure!
-        [this storm-id]
-        (.mkdirs cluster-state (backpressure-storm-root storm-id) acls))
+        [this storm-id topo-conf]
+        (.mkdirs cluster-state BACKPRESSURE-SUBTREE acls)
+        (.mkdirs cluster-state (backpressure-storm-root storm-id) 
(mk-topo-read-write-acls topo-conf)))
 
       (remove-backpressure!
         [this storm-id]
@@ -533,9 +546,10 @@
           (.set_ephemeral_node cluster-state (supervisor-path supervisor-id) 
(Utils/serialize thrift-supervisor-info) acls)))
 
       (activate-storm!
-        [this storm-id storm-base]
+        [this storm-id storm-base topo-conf]
         (let [thrift-storm-base (thriftify-storm-base storm-base)]
-          (.set_data cluster-state (storm-path storm-id) (Utils/serialize 
thrift-storm-base) acls)))
+          (.mkdirs cluster-state STORMS-SUBTREE acls)
+          (.set_data cluster-state (storm-path storm-id) (Utils/serialize 
thrift-storm-base) (mk-topo-read-only-acls topo-conf))))
 
       (update-storm!
         [this storm-id new-elems]
@@ -562,9 +576,10 @@
         (.delete_node cluster-state (storm-path storm-id)))
 
       (set-assignment!
-        [this storm-id info]
+        [this storm-id info topo-conf]
+        (.mkdirs cluster-state ASSIGNMENTS-SUBTREE acls)
         (let [thrift-assignment (thriftify-assignment info)]
-          (.set_data cluster-state (assignment-path storm-id) (Utils/serialize 
thrift-assignment) acls)))
+          (.set_data cluster-state (assignment-path storm-id) (Utils/serialize 
thrift-assignment) (mk-topo-read-only-acls topo-conf))))
 
       (remove-blobstore-key!
         [this blob-key]
@@ -585,9 +600,10 @@
 
       (set-credentials!
          [this storm-id creds topo-conf]
-         (let [topo-acls (mk-topo-only-acls topo-conf)
+         (let [topo-acls (mk-topo-read-only-acls topo-conf)
                path (credentials-path storm-id)
                thriftified-creds (thriftify-credentials creds)]
+           (.mkdirs cluster-state CREDENTIALS-SUBTREE acls)
            (.set_data cluster-state path (Utils/serialize thriftified-creds) 
topo-acls)))
 
       (credentials
@@ -596,6 +612,11 @@
           (swap! credentials-callback assoc storm-id callback))
         (clojurify-crdentials (maybe-deserialize (.get_data cluster-state 
(credentials-path storm-id) (not-nil? callback)) Credentials)))
 
+      (setup-errors!
+         [this storm-id topo-conf]
+         (.mkdirs cluster-state ERRORS-SUBTREE acls)
+         (.mkdirs cluster-state (error-storm-root storm-id) 
(mk-topo-read-write-acls topo-conf)))
+
       (report-error
          [this storm-id component-id node port error]
          (let [path (error-path storm-id component-id)

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
----------------------------------------------------------------------
diff --git 
a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj 
b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
index fdef972..d2915eb 100644
--- 
a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
+++ 
b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
@@ -28,7 +28,7 @@
   (ZKStateStorage. conf, auth-conf, acls, context))
 
 (defn -mkState [this conf auth-conf acls context]
-  (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf 
STORM-ZOOKEEPER-PORT) :auth-conf auth-conf)]
+  (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf 
STORM-ZOOKEEPER-PORT) acls :auth-conf auth-conf)]
     (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls)
     (.close zk))
   (let [callbacks (atom {})
@@ -36,6 +36,7 @@
         zk-writer (zk/mk-client conf
                          (conf STORM-ZOOKEEPER-SERVERS)
                          (conf STORM-ZOOKEEPER-PORT)
+                         acls
                          :auth-conf auth-conf
                          :root (conf STORM-ZOOKEEPER-ROOT)
                          :watcher (fn [state type path]
@@ -50,6 +51,7 @@
                     (zk/mk-client conf
                          (conf STORM-ZOOKEEPER-SERVERS)
                          (conf STORM-ZOOKEEPER-PORT)
+                         acls
                          :auth-conf auth-conf
                          :root (conf STORM-ZOOKEEPER-ROOT)
                          :watcher (fn [state type path]

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj 
b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 887ab3b..1bb3d10 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -15,20 +15,18 @@
 ;; limitations under the License.
 (ns org.apache.storm.command.shell-submission
   (:import [org.apache.storm StormSubmitter])
-  (:use [org.apache.storm thrift util config log zookeeper])
+  (:use [org.apache.storm util config log])
   (:require [clojure.string :as str])
+  (:import [org.apache.storm.utils ConfigUtils NimbusClient])
   (:gen-class))
 
-
 (defn -main [^String tmpjarpath & args]
-  (let [conf (read-storm-config)
-        ; since this is not a purpose to add to leader lock queue, passing nil 
as blob-store is ok
-        zk-leader-elector (zk-leader-elector conf nil)
-        leader-nimbus (.getLeader zk-leader-elector)
-        host (.getHost leader-nimbus)
-        port (.getPort leader-nimbus)
-        no-op (.close zk-leader-elector)
-        jarpath (StormSubmitter/submitJar conf tmpjarpath)
-        args (concat args [host port jarpath])]
-    (exec-command! (str/join " " args))
-    ))
+  (let [conf (clojurify-structure (ConfigUtils/readStormConfig))]
+    (with-open [client (NimbusClient/getConfiguredClient conf)]
+      (let [c (.getClient client)
+            ns (.getLeader c)
+            host (.get_host ns)
+            port (.get_port ns)
+            jarpath (StormSubmitter/submitJar conf tmpjarpath)
+            args (concat args [host port jarpath])]
+        (exec-command! (str/join " " args))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj 
b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 7607b1b..94ce7fd 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -56,6 +56,7 @@
                             [stats :as stats]])
   (:require [org.apache.storm.ui.core :as ui])
   (:require [clojure.set :as set])
+  (:import [org.apache.storm.zookeeper AclEnforcement])
   (:import [org.apache.storm.daemon.common StormBase Assignment])
   (:use [org.apache.storm.daemon common])
   (:use [org.apache.storm config])
@@ -133,6 +134,16 @@
     scheduler
     ))
 
+(def NIMBUS-ZK-ACLS ZooDefs$Ids/CREATOR_ALL_ACL)
+
+(defn mk-zk-client [conf]
+  (let [zk-servers (conf STORM-ZOOKEEPER-SERVERS)
+        zk-port (conf STORM-ZOOKEEPER-PORT)
+        zk-root (conf STORM-ZOOKEEPER-ROOT)]
+    (if (and zk-servers zk-port)
+      (mk-client conf zk-servers zk-port (if 
(Utils/isZkAuthenticationConfiguredStormServer conf) NIMBUS-ZK-ACLS nil) :root 
zk-root
+      :auth-conf conf))))
+
 (defmulti blob-sync cluster-mode)
 
 (defnk is-leader [nimbus :throw-exception true]
@@ -142,10 +153,6 @@
         (let [leader-address (.getLeader leader-elector)]
           (throw (RuntimeException. (str "not a leader, current leader is " 
leader-address))))))))
 
-(def NIMBUS-ZK-ACLS
-  [(first ZooDefs$Ids/CREATOR_ALL_ACL)
-   (ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) 
ZooDefs$Ids/ANYONE_ID_UNSAFE)])
-
 (defn mk-blob-cache-map
   "Constructs a TimeCacheMap instance with a blob store timeout whose
   expiration callback invokes cancel on the value held by an expired entry when
@@ -213,7 +220,7 @@
                                  (exit-process! 20 "Error when processing an 
event")
                                  ))
      :scheduler (mk-scheduler conf inimbus)
-     :leader-elector (zk-leader-elector conf blob-store)
+     :leader-elector (zk-leader-elector conf blob-store (if 
(Utils/isZkAuthenticationConfiguredStormServer conf) NIMBUS-ZK-ACLS nil))
      :id->sched-status (atom {})
      :node-id->resources (atom {}) ;;resources of supervisors
      :id->resources (atom {}) ;;resources of topologies
@@ -454,7 +461,7 @@
 
 (defn- get-version-for-key [key nimbus-host-port-info conf]
   (let [version (KeySequenceNumber. key nimbus-host-port-info)]
-    (.getKeySequenceNumber version conf)))
+    (.getKeySequenceNumber version conf (if 
(Utils/isZkAuthenticationConfiguredStormServer conf) NIMBUS-ZK-ACLS nil))))
 
 (defn get-key-seq-from-blob-store [blob-store]
   (let [key-iter (.listKeys blob-store)]
@@ -1002,7 +1009,7 @@
                                                 td (.get tds tid)
                                                 assignment (if (and (not 
(:owner assignment)) (not (nil? td)))
                                                              (let 
[new-assignment (fixup-assignment assignment td)]
-                                                               
(.set-assignment! storm-cluster-state tid new-assignment)
+                                                               
(.set-assignment! storm-cluster-state tid new-assignment (.getConf td))
                                                                new-assignment)
                                                              assignment)]
                                             {tid assignment}))))]
@@ -1060,7 +1067,7 @@
             (log-debug "Assignment for " topology-id " hasn't changed")
             (do
               (log-message "Setting new assignment for topology id " 
topology-id ": " (pr-str assignment))
-              (.set-assignment! storm-cluster-state topology-id assignment)
+              (.set-assignment! storm-cluster-state topology-id assignment 
(.getConf topology-details))
               )))
         (->> new-assignments
             (map (fn [[topology-id assignment]]
@@ -1098,7 +1105,8 @@
                                   nil
                                   nil
                                   {}
-                                  principal))
+                                  principal)
+                      storm-conf)
     (notify-topology-action-listener nimbus storm-name "activate")))
 
 (defn storm-active? [storm-cluster-state storm-name]
@@ -1744,9 +1752,10 @@
             (log-message "uploadedJar " uploadedJarLocation)
             (setup-storm-code nimbus conf storm-id uploadedJarLocation 
total-storm-conf topology)
             (wait-for-desired-code-replication nimbus total-storm-conf 
storm-id)
-            (.setup-heartbeats! storm-cluster-state storm-id)
+            (.setup-heartbeats! storm-cluster-state storm-id total-storm-conf)
+            (.setup-errors! storm-cluster-state storm-id total-storm-conf)
             (if (total-storm-conf TOPOLOGY-BACKPRESSURE-ENABLE)
-              (.setup-backpressure! storm-cluster-state storm-id))
+              (.setup-backpressure! storm-cluster-state storm-id 
total-storm-conf))
             (notify-topology-action-listener nimbus storm-name 
"submitTopology")
             (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE 
:inactive
                                             TopologyInitialStatus/ACTIVE 
:active}]
@@ -1892,7 +1901,7 @@
                          (.containsKey named-loggers logger-name))
                   (.remove named-loggers logger-name))))))
         (log-message "Setting log config for " storm-name ":" 
merged-log-config)
-        (.set-topology-log-config! storm-cluster-state id merged-log-config)))
+        (.set-topology-log-config! storm-cluster-state id merged-log-config 
topology-conf)))
 
     (uploadNewCredentials [this storm-name credentials]
       (mark! nimbus:num-uploadNewCredentials-calls)
@@ -2565,8 +2574,12 @@
 (defn -launch [nimbus]
   (let [conf (merge
                (read-storm-config)
-               (read-yaml-config "storm-cluster-auth.yaml" false))]
-  (launch-server! conf nimbus)))
+               (read-yaml-config "storm-cluster-auth.yaml" false))
+        fixup-acl (conf STORM-NIMBUS-ZOOKEEPER-ACLS-FIXUP)
+        check-acl (or fixup-acl (conf STORM-NIMBUS-ZOOKEEPER-ACLS-CHECK))]
+    (when check-acl
+      (AclEnforcement/verifyAcls conf fixup-acl))
+    (launch-server! conf nimbus)))
 
 (defn standalone-nimbus []
   (reify INimbus

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/clj/org/apache/storm/zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/zookeeper.clj 
b/storm-core/src/clj/org/apache/storm/zookeeper.clj
index ca41093..43795e5 100644
--- a/storm-core/src/clj/org/apache/storm/zookeeper.clj
+++ b/storm-core/src/clj/org/apache/storm/zookeeper.clj
@@ -54,11 +54,11 @@
   (log-message "Zookeeper state update: " state type path))
 
 (defnk mk-client
-  [conf servers port
+  [conf servers port default-acl
    :root ""
    :watcher default-watcher
    :auth-conf nil]
-  (let [fk (Utils/newCurator conf servers port root (when auth-conf 
(ZookeeperAuthInfo. auth-conf)))]
+  (let [fk (Utils/newCurator conf servers port root (when auth-conf 
(ZookeeperAuthInfo. auth-conf)) default-acl)]
     (.. fk
         (getCuratorListenable)
         (addListener
@@ -252,9 +252,9 @@
 
 (defn zk-leader-elector
   "Zookeeper Implementation of ILeaderElector."
-  [conf blob-store]
+  [conf blob-store default-acl]
   (let [servers (conf STORM-ZOOKEEPER-SERVERS)
-        zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf 
STORM-ZOOKEEPER-PORT) :auth-conf conf)
+        zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf 
STORM-ZOOKEEPER-PORT) default-acl :auth-conf conf)
         leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock")
         id (.toHostPortString (NimbusInfo/fromConf conf))
         leader-latch (atom (LeaderLatch. zk leader-lock-path id))

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java 
b/storm-core/src/jvm/org/apache/storm/Config.java
index 370d027..988540f 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -53,6 +53,21 @@ public class Config extends HashMap<String, Object> {
     private static final long serialVersionUID = -1550278723792864455L;
 
     /**
+     * In nimbus on startup check if all of the zookeeper ACLs are correct 
before starting.  If not
+     * don't start nimbus.
+     */
+    @isBoolean
+    public static final String STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK = 
"storm.nimbus.zookeeper.acls.check";
+
+    /**
+     * In nimbus on startup check if all of the zookeeper ACLs are correct 
before starting.  If not do
+     * your best to fix them before nimbus starts, if it cannot fix them 
nimbus will not start.
+     * This overrides any value set for storm.nimbus.zookeeper.acls.check.
+     */
+    @isBoolean
+    public static final String STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP = 
"storm.nimbus.zookeeper.acls.fixup";
+
+    /**
      * This is part of a temporary workaround to a ZK bug, it is the 
'scheme:acl' for
      * the user Nimbus and Supervisors use to authenticate with ZK.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java 
b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
index f1eb2f4..89bc610 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -33,6 +33,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,11 +52,15 @@ public class BlobStoreUtils {
     private static final String BLOB_DEPENDENCIES_PREFIX = "dep-";
     private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreUtils.class);
 
-    public static CuratorFramework createZKClient(Map conf) {
+    public static String getBlobStoreSubtree() {
+        return BLOBSTORE_SUBTREE;
+    }
+
+    public static CuratorFramework createZKClient(Map conf, List<ACL> 
defaultAcls) {
         List<String> zkServers = (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS);
         Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
         ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf);
-        CuratorFramework zkClient = Utils.newCurator(conf, zkServers, port, 
(String) conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo);
+        CuratorFramework zkClient = Utils.newCurator(conf, zkServers, port, 
(String) conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo, defaultAcls);
         zkClient.start();
         return zkClient;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java 
b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
index f035709..9e888e8 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
@@ -20,6 +20,7 @@ package org.apache.storm.blobstore;
 import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.ZooDefs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +73,7 @@ public class BlobSynchronizer {
     public synchronized void syncBlobs() {
         try {
             LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys 
{}",getBlobStoreKeySet(), getZookeeperKeySet());
-            zkClient = BlobStoreUtils.createZKClient(conf);
+            zkClient = BlobStoreUtils.createZKClient(conf, 
ZooDefs.Ids.CREATOR_ALL_ACL);
             deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), 
getZookeeperKeySet());
             updateKeySetForBlobStore(getBlobStoreKeySet());
             Set<String> keySetToDownload = 
getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet());

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java 
b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
index adbd4c4..c175a97 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -132,9 +133,9 @@ public class KeySequenceNumber {
         this.nimbusInfo = nimbusInfo;
     }
 
-    public synchronized int getKeySequenceNumber(Map conf) throws 
KeyNotFoundException {
+    public synchronized int getKeySequenceNumber(Map conf, List<ACL> 
defaultAcls) throws KeyNotFoundException {
         TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>();
-        CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
+        CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf, 
defaultAcls);
         try {
             // Key has not been created yet and it is the first time it is 
being created
             if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) 
== null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java 
b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
index c9326e2..be4c3ad 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -28,6 +28,8 @@ import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.Utils;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,7 +85,11 @@ public class LocalFsBlobStore extends BlobStore {
     public void prepare(Map conf, String overrideBase, NimbusInfo nimbusInfo) {
         this.conf = conf;
         this.nimbusInfo = nimbusInfo;
-        zkClient = BlobStoreUtils.createZKClient(conf);
+        List<ACL> acl = null;
+        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
+            acl = ZooDefs.Ids.CREATOR_ALL_ACL;
+        }
+        zkClient = BlobStoreUtils.createZKClient(conf, acl);
         if (overrideBase == null) {
             overrideBase = ConfigUtils.absoluteStormBlobStoreDir(conf);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java 
b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
index 96c177b..11ab035 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -43,7 +43,6 @@ public class ClusterUtils {
     public static final String ZK_SEPERATOR = "/";
 
     public static final String ASSIGNMENTS_ROOT = "assignments";
-    public static final String CODE_ROOT = "code";
     public static final String STORMS_ROOT = "storms";
     public static final String SUPERVISORS_ROOT = "supervisors";
     public static final String WORKERBEATS_ROOT = "workerbeats";
@@ -91,15 +90,38 @@ public class ClusterUtils {
         _instance = INSTANCE;
     }
 
-    public static List<ACL> mkTopoOnlyAcls(Map topoConf) throws 
NoSuchAlgorithmException {
+    /**
+     * Get ZK ACLs for a topology to have read/write access.
+     * @param topoConf the topology config.
+     * @return the ACLs.
+     */
+    public static List<ACL> mkTopoReadWriteAcls(Map<String, Object> topoConf) {
+        return mkTopoAcls(topoConf, ZooDefs.Perms.ALL);
+    }
+
+    /**
+     * Get ZK ACLs for a topology to have read only access.
+     * @param topoConf the topology config.
+     * @return the ACLs.
+     */
+    public static List<ACL> mkTopoReadOnlyAcls(Map<String, Object> topoConf) {
+        return mkTopoAcls(topoConf, ZooDefs.Perms.READ);
+    }
+
+    private static List<ACL> mkTopoAcls(Map<String, Object> topoConf, int 
perms) {
         List<ACL> aclList = null;
         String payload = (String) 
topoConf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
         if (Utils.isZkAuthenticationConfiguredTopology(topoConf)) {
             aclList = new ArrayList<>();
             ACL acl1 = ZooDefs.Ids.CREATOR_ALL_ACL.get(0);
             aclList.add(acl1);
-            ACL acl2 = new ACL(ZooDefs.Perms.READ, new Id("digest", 
DigestAuthenticationProvider.generateDigest(payload)));
-            aclList.add(acl2);
+            try {
+                ACL acl2 = new ACL(perms, new Id("digest", 
DigestAuthenticationProvider.generateDigest(payload)));
+                aclList.add(acl2);
+            } catch (NoSuchAlgorithmException e) {
+                //Should only happen on a badly configured system
+                throw new RuntimeException(e);
+            }
         }
         return aclList;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java 
b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index a6f07ed..c93a6b2 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -59,7 +59,7 @@ public interface IStormClusterState {
 
     public SupervisorInfo supervisorInfo(String supervisorId); // returns nil 
if doesn't exist
 
-    public void setupHeatbeats(String stormId);
+    public void setupHeatbeats(String stormId, Map<String, Object> topoConf);
 
     public void teardownHeartbeats(String stormId);
 
@@ -71,7 +71,7 @@ public interface IStormClusterState {
 
     public List<String> backpressureTopologies();
 
-    public void setTopologyLogConfig(String stormId, LogConfig logConfig);
+    public void setTopologyLogConfig(String stormId, LogConfig logConfig, 
Map<String, Object> topoConf);
 
     public LogConfig topologyLogConfig(String stormId, Runnable cb);
 
@@ -85,19 +85,19 @@ public interface IStormClusterState {
 
     public boolean topologyBackpressure(String stormId, Runnable callback);
 
-    public void setupBackpressure(String stormId);
+    public void setupBackpressure(String stormId, Map<String, Object> 
topoConf);
 
     public void removeBackpressure(String stormId);
 
     public void removeWorkerBackpressure(String stormId, String node, Long 
port);
 
-    public void activateStorm(String stormId, StormBase stormBase);
+    public void activateStorm(String stormId, StormBase stormBase, Map<String, 
Object> topoConf);
 
     public void updateStorm(String stormId, StormBase newElems);
 
     public void removeStormBase(String stormId);
 
-    public void setAssignment(String stormId, Assignment info);
+    public void setAssignment(String stormId, Assignment info, Map<String, 
Object> topoConf);
 
     public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer 
versionInfo);
 
@@ -113,11 +113,13 @@ public interface IStormClusterState {
 
     public void reportError(String stormId, String componentId, String node, 
Long port, Throwable error);
 
+    void setupErrors(String stormId, Map<String, Object> topoConf);
+
     public List<ErrorInfo> errors(String stormId, String componentId);
 
     public ErrorInfo lastError(String stormId, String componentId);
 
-    public void setCredentials(String stormId, Credentials creds, Map 
topoConf) throws NoSuchAlgorithmException;
+    public void setCredentials(String stormId, Credentials creds, Map 
topoConf);
 
     public Credentials credentials(String stormId, Runnable callback);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java 
b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index f9952cf..2d26c2f 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -33,7 +33,6 @@ import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
@@ -243,7 +242,6 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String 
node, Long port) {
         byte[] bytes = 
stateStorage.get_worker_hb(ClusterUtils.workerbeatPath(stormId, node, port), 
false);
         return ClusterUtils.maybeDeserialize(bytes, 
ClusterWorkerHeartbeat.class);
-
     }
 
     @Override
@@ -338,8 +336,9 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     @Override
-    public void setupHeatbeats(String stormId) {
-        stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), acls);
+    public void setupHeatbeats(String stormId, Map<String, Object> topoConf) {
+        stateStorage.mkdirs(ClusterUtils.WORKERBEATS_SUBTREE, acls);
+        stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), 
ClusterUtils.mkTopoReadWriteAcls(topoConf));
     }
 
     @Override
@@ -386,8 +385,9 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     @Override
-    public void setTopologyLogConfig(String stormId, LogConfig logConfig) {
-        stateStorage.set_data(ClusterUtils.logConfigPath(stormId), 
Utils.serialize(logConfig), acls);
+    public void setTopologyLogConfig(String stormId, LogConfig logConfig, 
Map<String, Object> topoConf) {
+        stateStorage.mkdirs(ClusterUtils.LOGCONFIG_SUBTREE, acls);
+        stateStorage.set_data(ClusterUtils.logConfigPath(stormId), 
Utils.serialize(logConfig), ClusterUtils.mkTopoReadOnlyAcls(topoConf));
     }
 
     @Override
@@ -467,8 +467,9 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     @Override
-    public void setupBackpressure(String stormId) {
-        stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), acls);
+    public void setupBackpressure(String stormId, Map<String, Object> 
topoConf) {
+        stateStorage.mkdirs(ClusterUtils.BACKPRESSURE_SUBTREE, acls);
+        stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), 
ClusterUtils.mkTopoReadWriteAcls(topoConf));
     }
 
     @Override
@@ -495,9 +496,10 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     @Override
-    public void activateStorm(String stormId, StormBase stormBase) {
+    public void activateStorm(String stormId, StormBase stormBase, Map<String, 
Object> topoConf) {
         String path = ClusterUtils.stormPath(stormId);
-        stateStorage.set_data(path, Utils.serialize(stormBase), acls);
+        stateStorage.mkdirs(ClusterUtils.STORMS_SUBTREE, acls);
+        stateStorage.set_data(path, Utils.serialize(stormBase), 
ClusterUtils.mkTopoReadOnlyAcls(topoConf));
     }
 
     /**
@@ -589,8 +591,9 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     @Override
-    public void setAssignment(String stormId, Assignment info) {
-        stateStorage.set_data(ClusterUtils.assignmentPath(stormId), 
Utils.serialize(info), acls);
+    public void setAssignment(String stormId, Assignment info, Map<String, 
Object> topoConf) {
+        stateStorage.mkdirs(ClusterUtils.ASSIGNMENTS_SUBTREE, acls);
+        stateStorage.set_data(ClusterUtils.assignmentPath(stormId), 
Utils.serialize(info), ClusterUtils.mkTopoReadOnlyAcls(topoConf));
     }
 
     @Override
@@ -639,6 +642,12 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     @Override
+    public void setupErrors(String stormId, Map<String, Object> topoConf) {
+        stateStorage.mkdirs(ClusterUtils.ERRORS_SUBTREE, acls);
+        stateStorage.mkdirs(ClusterUtils.errorStormRoot(stormId), 
ClusterUtils.mkTopoReadWriteAcls(topoConf));
+    }
+
+    @Override
     public void reportError(String stormId, String componentId, String node, 
Long port, Throwable error) {
 
         String path = ClusterUtils.errorPath(stormId, componentId);
@@ -708,11 +717,10 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     @Override
-    public void setCredentials(String stormId, Credentials creds, Map 
topoConf) throws NoSuchAlgorithmException {
-        List<ACL> aclList = ClusterUtils.mkTopoOnlyAcls(topoConf);
+    public void setCredentials(String stormId, Credentials creds, Map 
topoConf) {
+        List<ACL> aclList = ClusterUtils.mkTopoReadOnlyAcls(topoConf);
         String path = ClusterUtils.credentialsPath(stormId);
         stateStorage.set_data(path, Utils.serialize(creds), aclList);
-
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java 
b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
index e337b1f..af15dd9 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -80,15 +80,15 @@ public class ZKStateStorage implements IStateStorage {
             this.isNimbus = true;
 
         // just mkdir STORM_ZOOKEEPER_ROOT dir
-        CuratorFramework zkTemp = mkZk();
+        CuratorFramework zkTemp = mkZk(acls);
         String rootPath = 
String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT));
         Zookeeper.mkdirs(zkTemp, rootPath, acls);
         zkTemp.close();
 
         active = new AtomicBoolean(true);
-        zkWriter = mkZk(new ZkWatcherCallBack());
+        zkWriter = mkZk(acls, new ZkWatcherCallBack());
         if (isNimbus) {
-            zkReader = mkZk(new ZkWatcherCallBack());
+            zkReader = mkZk(acls, new ZkWatcherCallBack());
         } else {
             zkReader = zkWriter;
         }
@@ -96,15 +96,15 @@ public class ZKStateStorage implements IStateStorage {
     }
 
     @SuppressWarnings("unchecked")
-    private CuratorFramework mkZk() throws IOException {
+    private CuratorFramework mkZk(List<ACL> acls) throws IOException {
         return Zookeeper.mkClient(conf, (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS), 
conf.get(Config.STORM_ZOOKEEPER_PORT), "",
-                new DefaultWatcherCallBack(), authConf);
+                new DefaultWatcherCallBack(), authConf, acls);
     }
 
     @SuppressWarnings("unchecked")
-    private CuratorFramework mkZk(WatcherCallBack watcher) throws 
NumberFormatException, IOException {
+    private CuratorFramework mkZk(List<ACL> acls, WatcherCallBack watcher) 
throws NumberFormatException, IOException {
         return Zookeeper.mkClient(conf, (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT),
-                String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), 
watcher, authConf);
+                String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), 
watcher, authConf, acls);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index ef4b54d..31d1130 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -312,9 +312,6 @@ public class SupervisorUtils {
     }
     
     static List<ACL> supervisorZkAcls() {
-        final List<ACL> acls = new ArrayList<>();
-        acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
-        acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), 
ZooDefs.Ids.ANYONE_ID_UNSAFE));
-        return acls;
+        return ZooDefs.Ids.CREATOR_ALL_ACL;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
 
b/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
index 7662daa..e470348 100644
--- 
a/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
+++ 
b/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
@@ -66,7 +66,7 @@ public class TransactionalState {
             List<String> servers = (List<String>) getWithBackup(conf, 
Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
             Object port = getWithBackup(conf, 
Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
             ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf);
-            CuratorFramework initter = Utils.newCuratorStarted(conf, servers, 
port, auth);
+            CuratorFramework initter = Utils.newCuratorStarted(conf, servers, 
port, auth, null);
             _zkAcls = Utils.getWorkerACL(conf);
             try {
                 TransactionalState.createNode(initter, transactionalRoot, 
null, null, null);
@@ -78,7 +78,7 @@ public class TransactionalState {
             }
             initter.close();
                                     
-            _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, 
auth);
+            _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, 
auth, null);
             _ser = new KryoValuesSerializer(conf);
             _des = new KryoValuesDeserializer(conf);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
 
b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index 71068dc..5bdb16c 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -63,7 +63,7 @@ public class TransactionalState {
             List<String> servers = (List<String>) getWithBackup(conf, 
Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
             Object port = getWithBackup(conf, 
Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
             ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf);
-            CuratorFramework initter = Utils.newCuratorStarted(conf, servers, 
port, auth);
+            CuratorFramework initter = Utils.newCuratorStarted(conf, servers, 
port, auth, null);
             _zkAcls = Utils.getWorkerACL(conf);
             try {
                 TransactionalState.createNode(initter, transactionalRoot, 
null, null, null);
@@ -75,7 +75,7 @@ public class TransactionalState {
             }
             initter.close();
                                     
-            _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, 
auth);
+            _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, 
auth, null);
         } catch (Exception e) {
            throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java 
b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index b9ced2c..846205a 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -32,6 +32,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;
 import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
 import org.apache.curator.ensemble.exhibitor.Exhibitors;
+import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.storm.Config;
@@ -1079,15 +1080,16 @@ public class Utils {
         return false;
     }
 
-    public static CuratorFramework newCurator(Map conf, List<String> servers, 
Object port, String root) {
-        return newCurator(conf, servers, port, root, null);
+    public static CuratorFramework newCurator(Map conf, List<String> servers, 
Object port, String root, List<ACL> defaultAcl) {
+        return newCurator(conf, servers, port, root, null, defaultAcl);
     }
 
-    public static CuratorFramework newCurator(Map conf, List<String> servers, 
Object port, ZookeeperAuthInfo auth) {
-        return newCurator(conf, servers, port, "", auth);
+    public static CuratorFramework newCurator(Map conf, List<String> servers, 
Object port, ZookeeperAuthInfo auth, List<ACL> defaultAcl) {
+        return newCurator(conf, servers, port, "", auth, defaultAcl);
     }
 
-    public static CuratorFramework newCurator(Map conf, List<String> servers, 
Object port, String root, ZookeeperAuthInfo auth) {
+    public static CuratorFramework newCurator(Map conf, List<String> servers, 
Object port, String root, ZookeeperAuthInfo auth,
+                                              final List<ACL> defaultAcl) {
         List<String> serverPorts = new ArrayList<String>();
         for (String zkServer : servers) {
             serverPorts.add(zkServer + ":" + Utils.getInt(port));
@@ -1096,6 +1098,19 @@ public class Utils {
         CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder();
 
         setupBuilder(builder, zkStr, conf, auth);
+        if (defaultAcl != null) {
+            builder.aclProvider(new ACLProvider() {
+                @Override
+                public List<ACL> getDefaultAcl() {
+                    return defaultAcl;
+                }
+
+                @Override
+                public List<ACL> getAclForPath(String s) {
+                    return null;
+                }
+            });
+        }
 
         return builder.build();
     }
@@ -1142,15 +1157,15 @@ public class Utils {
         setupBuilder(builder, zkStr, conf, auth);
     }
 
-    public static CuratorFramework newCuratorStarted(Map conf, List<String> 
servers, Object port, String root, ZookeeperAuthInfo auth) {
-        CuratorFramework ret = newCurator(conf, servers, port, root, auth);
+    public static CuratorFramework newCuratorStarted(Map conf, List<String> 
servers, Object port, String root, ZookeeperAuthInfo auth, List<ACL> 
defaultAcl) {
+        CuratorFramework ret = newCurator(conf, servers, port, root, auth, 
defaultAcl);
         LOG.info("Starting Utils Curator...");
         ret.start();
         return ret;
     }
 
-    public static CuratorFramework newCuratorStarted(Map conf, List<String> 
servers, Object port, ZookeeperAuthInfo auth) {
-        CuratorFramework ret = newCurator(conf, servers, port, auth);
+    public static CuratorFramework newCuratorStarted(Map conf, List<String> 
servers, Object port, ZookeeperAuthInfo auth, List<ACL> defaultAcl) {
+        CuratorFramework ret = newCurator(conf, servers, port, auth, 
defaultAcl);
         LOG.info("Starting Utils Curator (2)...");
         ret.start();
         return ret;
@@ -1227,23 +1242,39 @@ public class Utils {
                 && 
!((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
     }
 
-
-    public static List<ACL> getWorkerACL(Map conf) {
-        //This is a work around to an issue with ZK where a sasl super user is 
not super unless there is an open SASL ACL so we are trying to give the correct 
perms
-        if (!isZkAuthenticationConfiguredTopology(conf)) {
-            return null;
+    public static Id parseZkId(String id, String configName) {
+        String[] split = id.split(":", 2);
+        if (split.length != 2) {
+            throw new IllegalArgumentException(configName + " does not appear 
to be in the form scheme:acl, i.e. sasl:storm-user");
         }
+        return new Id(split[0], split[1]);
+    }
+
+    /**
+     * Get the ACL for nimbus/supervisor.  The Super User ACL. This assumes 
that security is enabled.
+     * @param conf the config to get the super User ACL from
+     * @return the super user ACL.
+     */
+    public static ACL getSuperUserAcl(Map<String, Object> conf) {
         String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
         if (stormZKUser == null) {
             throw new IllegalArgumentException("Authentication is enabled but 
" + Config.STORM_ZOOKEEPER_SUPERACL + " is not set");
         }
-        String[] split = stormZKUser.split(":", 2);
-        if (split.length != 2) {
-            throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL 
+ " does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
+        return new ACL(ZooDefs.Perms.ALL, parseZkId(stormZKUser, 
Config.STORM_ZOOKEEPER_SUPERACL));
+    }
+
+    /**
+     * Get the ZK ACLs that a worker should use when writing to ZK.
+     * @param conf the config for the topology.
+     * @return the ACLs
+     */
+    public static List<ACL> getWorkerACL(Map<String, Object> conf) {
+        if (!isZkAuthenticationConfiguredTopology(conf)) {
+            return null;
         }
         ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
-        ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
-        return ret;
+        ret.add(getSuperUserAcl(conf));
+       return ret;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/zookeeper/AclEnforcement.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/AclEnforcement.java 
b/storm-core/src/jvm/org/apache/storm/zookeeper/AclEnforcement.java
new file mode 100644
index 0000000..b39db99
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/AclEnforcement.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.zookeeper;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.security.auth.Subject;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.callback.DefaultWatcherCallBack;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.security.auth.NimbusPrincipal;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is code intended to enforce ZK ACLs.
+ */
+public class AclEnforcement {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AclEnforcement.class);
+
+    /**
+     * Verify the ZK ACLs are correct and optionally fix them if needed.
+     * @param conf the cluster config.
+     * @param fixUp true if we want to fix the ACLs else false.
+     * @throws Exception on any error.
+     */
+    public static void verifyAcls(Map<String, Object> conf, final boolean 
fixUp) throws Exception {
+        if (!Utils.isZkAuthenticationConfiguredStormServer(conf)) {
+            LOG.info("SECURITY IS DISABLED NO FURTHER CHECKS...");
+            //There is no security so we are done.
+            return;
+        }
+        ACL superUserAcl = Utils.getSuperUserAcl(conf);
+        List<ACL> superAcl = new ArrayList<>(1);
+        superAcl.add(superUserAcl);
+
+        List<String> zkServers = (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+        int port = Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_PORT));
+        String stormRoot = (String) conf.get(Config.STORM_ZOOKEEPER_ROOT);
+
+        try (CuratorFramework zk = Zookeeper.mkClient(conf, zkServers, port, 
"",
+            new DefaultWatcherCallBack(), conf, superAcl)) {
+            if (zk.checkExists().forPath(stormRoot) != null) {
+                //First off we want to verify that ROOT is good
+                verifyAclStrict(zk, superAcl, stormRoot, fixUp);
+            } else {
+                LOG.warn("{} does not exist no need to check any more...", 
stormRoot);
+                return;
+            }
+        }
+
+        // Now that the root is fine we can start to look at the other paths 
under it.
+        try (CuratorFramework zk = Zookeeper.mkClient(conf, zkServers, port, 
stormRoot,
+            new DefaultWatcherCallBack(), conf, superAcl)) {
+            //Next verify that the blob store is correct before we start it up.
+            if (zk.checkExists().forPath(ClusterUtils.BLOBSTORE_SUBTREE) != 
null) {
+                verifyAclStrictRecursive(zk, superAcl, 
ClusterUtils.BLOBSTORE_SUBTREE, fixUp);
+            }
+
+            if 
(zk.checkExists().forPath(ClusterUtils.BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE)
 != null) {
+                verifyAclStrict(zk, superAcl, 
ClusterUtils.BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE, fixUp);
+            }
+
+            //The blobstore is good, now lets get the list of all topo Ids
+            Set<String> topoIds = new HashSet<>();
+            if (zk.checkExists().forPath(ClusterUtils.STORMS_SUBTREE) != null) 
{
+                
topoIds.addAll(zk.getChildren().forPath(ClusterUtils.STORMS_SUBTREE));
+            }
+
+            Map<String, Id> topoToZkCreds = new HashMap<>();
+            //Now lets get the creds for the topos so we can verify those as 
well.
+            BlobStore bs = Utils.getNimbusBlobStore(conf, 
NimbusInfo.fromConf(conf));
+            try {
+                Subject nimbusSubject = new Subject();
+                nimbusSubject.getPrincipals().add(new NimbusPrincipal());
+                for (String topoId: topoIds) {
+                    try {
+                        String blobKey = topoId + "-stormconf.ser";
+                        Map<String, Object> topoConf = 
Utils.fromCompressedJsonConf(bs.readBlob(blobKey, nimbusSubject));
+                        String payload = (String) 
topoConf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
+                        try {
+                            topoToZkCreds.put(topoId, new Id("digest", 
DigestAuthenticationProvider.generateDigest(payload)));
+                        } catch (NoSuchAlgorithmException e) {
+                            throw new RuntimeException(e);
+                        }
+                    } catch (KeyNotFoundException knf) {
+                        LOG.debug("topo removed {}", topoId, knf);
+                    }
+                }
+            } finally {
+                if (bs != null) {
+                    bs.shutdown();
+                }
+            }
+
+            verifyParentWithReadOnlyTopoChildren(zk, superUserAcl, 
ClusterUtils.STORMS_SUBTREE, topoToZkCreds, fixUp);
+            verifyParentWithReadOnlyTopoChildren(zk, superUserAcl, 
ClusterUtils.ASSIGNMENTS_SUBTREE, topoToZkCreds, fixUp);
+            //There is a race on credentials where they can be leaked in some 
versions of storm.
+            verifyParentWithReadOnlyTopoChildrenDeleteDead(zk, superUserAcl, 
ClusterUtils.CREDENTIALS_SUBTREE, topoToZkCreds, fixUp);
+            //There is a race on logconfig where they can be leaked in some 
versions of storm.
+            verifyParentWithReadOnlyTopoChildrenDeleteDead(zk, superUserAcl, 
ClusterUtils.LOGCONFIG_SUBTREE, topoToZkCreds, fixUp);
+            //There is a race on backpressure too...
+            verifyParentWithReadWriteTopoChildrenDeleteDead(zk, superUserAcl, 
ClusterUtils.BACKPRESSURE_SUBTREE, topoToZkCreds, fixUp);
+
+            if (zk.checkExists().forPath(ClusterUtils.ERRORS_SUBTREE) != null) 
{
+                //errors is a bit special because in older versions of storm 
the worker created the parent directories lazily
+                // because of this it means we need to auto create at least 
the topo-id directory for all running topos.
+                for (String topoId : topoToZkCreds.keySet()) {
+                    String path = ClusterUtils.errorStormRoot(topoId);
+                    if (zk.checkExists().forPath(path) == null) {
+                        LOG.warn("Creating missing errors location {}", path);
+                        zk.create().withACL(getTopoReadWrite(path, topoId, 
topoToZkCreds, superUserAcl, fixUp)).forPath(path);
+                    }
+                }
+            }
+            //Error should not be leaked according to the code, but they are 
not important enough to fail the build if
+            // for some odd reason they are leaked.
+            verifyParentWithReadWriteTopoChildrenDeleteDead(zk, superUserAcl, 
ClusterUtils.ERRORS_SUBTREE, topoToZkCreds, fixUp);
+
+            if (zk.checkExists().forPath(ClusterUtils.NIMBUSES_SUBTREE) != 
null) {
+                verifyAclStrictRecursive(zk, superAcl, 
ClusterUtils.NIMBUSES_SUBTREE, fixUp);
+            }
+
+            if (zk.checkExists().forPath("/leader-lock") != null) {
+                verifyAclStrictRecursive(zk, superAcl, "/leader-lock", fixUp);
+            }
+
+            if (zk.checkExists().forPath(ClusterUtils.PROFILERCONFIG_SUBTREE) 
!= null) {
+                verifyAclStrictRecursive(zk, superAcl, 
ClusterUtils.PROFILERCONFIG_SUBTREE, fixUp);
+            }
+
+            if (zk.checkExists().forPath(ClusterUtils.SUPERVISORS_SUBTREE) != 
null) {
+                verifyAclStrictRecursive(zk, superAcl, 
ClusterUtils.SUPERVISORS_SUBTREE, fixUp);
+            }
+
+            // When moving to pacemaker workerbeats can be leaked too...
+            verifyParentWithReadWriteTopoChildrenDeleteDead(zk, superUserAcl, 
ClusterUtils.WORKERBEATS_SUBTREE, topoToZkCreds, fixUp);
+        }
+    }
+
+    private static List<ACL> getTopoAcl(String path, String topoId, 
Map<String, Id> topoToZkCreds, ACL superAcl, boolean fixUp, int perms) {
+        Id id = topoToZkCreds.get(topoId);
+        if (id == null) {
+            String error = "Could not find credentials for topology " + topoId 
+ " at path " + path + ".";
+            if (fixUp) {
+                error += " Don't know how to fix this automatically. Please 
add needed ACLs, or delete the path.";
+            }
+            throw new IllegalStateException(error);
+        }
+        List<ACL> ret = new ArrayList<>(2);
+        ret.add(superAcl);
+        ret.add(new ACL(perms, id));
+        return ret;
+    }
+
+    private static List<ACL> getTopoReadWrite(String path, String topoId, 
Map<String, Id> topoToZkCreds, ACL superAcl, boolean fixUp) {
+        return getTopoAcl(path, topoId, topoToZkCreds, superAcl, fixUp, 
ZooDefs.Perms.ALL);
+    }
+
+    private static void 
verifyParentWithTopoChildrenDeleteDead(CuratorFramework zk, ACL superUserAcl, 
String path,
+                                                     Map<String, Id> 
topoToZkCreds, boolean fixUp, int perms) throws Exception {
+        if (zk.checkExists().forPath(path) != null) {
+            verifyAclStrict(zk, Arrays.asList(superUserAcl), path, fixUp);
+            Set<String> possiblyBadIds = new HashSet<>();
+            for (String topoId : zk.getChildren().forPath(path)) {
+                String childPath = path + ClusterUtils.ZK_SEPERATOR + topoId;
+                if (!topoToZkCreds.containsKey(topoId)) {
+                    //Save it to try again later...
+                    possiblyBadIds.add(topoId);
+                } else {
+                    List<ACL> rwAcl = getTopoAcl(path, topoId, topoToZkCreds, 
superUserAcl, fixUp, perms);
+                    verifyAclStrictRecursive(zk, rwAcl, childPath, fixUp);
+                }
+            }
+
+            if (!possiblyBadIds.isEmpty()) {
+                //Lets reread the children in STORMS as the source of truth 
and see if a new one was created in the background
+                
possiblyBadIds.removeAll(zk.getChildren().forPath(ClusterUtils.STORMS_SUBTREE));
+                for (String topoId: possiblyBadIds) {
+                    //Now we know for sure that this is a bad id
+                    String childPath = path + ClusterUtils.ZK_SEPERATOR + 
topoId;
+                    zk.delete().deletingChildrenIfNeeded().forPath(childPath);
+                }
+            }
+        }
+    }
+
+    private static void 
verifyParentWithReadOnlyTopoChildrenDeleteDead(CuratorFramework zk, ACL 
superUserAcl, String path,
+                                                             Map<String, Id> 
topoToZkCreds, boolean fixUp) throws Exception {
+        verifyParentWithTopoChildrenDeleteDead(zk, superUserAcl, path, 
topoToZkCreds, fixUp, ZooDefs.Perms.READ);
+    }
+
+    private static void 
verifyParentWithReadWriteTopoChildrenDeleteDead(CuratorFramework zk, ACL 
superUserAcl, String path,
+                                                              Map<String, Id> 
topoToZkCreds, boolean fixUp) throws Exception {
+        verifyParentWithTopoChildrenDeleteDead(zk, superUserAcl, path, 
topoToZkCreds, fixUp, ZooDefs.Perms.ALL);
+    }
+
+    private static void verifyParentWithTopoChildren(CuratorFramework zk, ACL 
superUserAcl, String path,
+                                                     Map<String, Id> 
topoToZkCreds, boolean fixUp, int perms) throws Exception {
+        if (zk.checkExists().forPath(path) != null) {
+            verifyAclStrict(zk, Arrays.asList(superUserAcl), path, fixUp);
+            for (String topoId : zk.getChildren().forPath(path)) {
+                String childPath = path + ClusterUtils.ZK_SEPERATOR + topoId;
+                List<ACL> rwAcl = getTopoAcl(path, topoId, topoToZkCreds, 
superUserAcl, fixUp, perms);
+                verifyAclStrictRecursive(zk, rwAcl, childPath, fixUp);
+            }
+        }
+    }
+
+    private static void verifyParentWithReadOnlyTopoChildren(CuratorFramework 
zk, ACL superUserAcl, String path,
+                                                             Map<String, Id> 
topoToZkCreds, boolean fixUp) throws Exception {
+        verifyParentWithTopoChildren(zk, superUserAcl, path, topoToZkCreds, 
fixUp, ZooDefs.Perms.READ);
+    }
+
+    private static void verifyParentWithReadWriteTopoChildren(CuratorFramework 
zk, ACL superUserAcl, String path,
+                                                              Map<String, Id> 
topoToZkCreds, boolean fixUp) throws Exception {
+        verifyParentWithTopoChildren(zk, superUserAcl, path, topoToZkCreds, 
fixUp, ZooDefs.Perms.ALL);
+    }
+
+    private static void verifyAclStrictRecursive(CuratorFramework zk, 
List<ACL> strictAcl, String path, boolean fixUp) throws Exception {
+        verifyAclStrict(zk, strictAcl, path, fixUp);
+        for (String child: zk.getChildren().forPath(path)) {
+            String newPath = path + ClusterUtils.ZK_SEPERATOR + child;
+            verifyAclStrictRecursive(zk, strictAcl, newPath, fixUp);
+        }
+    }
+
+    private static void verifyAclStrict(CuratorFramework zk, List<ACL> 
strictAcl, String path, boolean fixUp) throws Exception {
+        try {
+            List<ACL> foundAcl = zk.getACL().forPath(path);
+            if (!equivalent(foundAcl, strictAcl)) {
+                if (fixUp) {
+                    LOG.warn("{} expected to have ACL {}, but has {}.  
Fixing...", path, strictAcl, foundAcl);
+                    zk.setACL().withACL(strictAcl).forPath(path);
+                } else {
+                    throw new IllegalStateException(path + " did not have the 
correct ACL found " + foundAcl + " expected " + strictAcl);
+                }
+            }
+        } catch (KeeperException.NoNodeException ne) {
+            LOG.debug("{} removed in the middle of checking it", ne);
+        }
+    }
+
+    private static boolean equivalent(List<ACL> a, List<ACL> b) {
+        if (a.size() == b.size()) {
+            for (ACL aAcl: a) {
+                if (!b.contains(aAcl)) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
+    public static void main(String [] args) throws Exception {
+        Map<String, Object> conf = Utils.readStormConfig();
+        boolean fixUp = false;
+        for (String arg: args) {
+            String a = arg.toLowerCase();
+            if ("-f".equals(a) || "--fixup".equals(a)) {
+                fixUp = true;
+            } else {
+                throw new IllegalArgumentException("Unsupported argument " + 
arg + " only -f or --fixup is supported.");
+            }
+        }
+        verifyAcls(conf, fixUp);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java 
b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index a2ad797..a025ed9 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -36,6 +36,7 @@ import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.callback.DefaultWatcherCallBack;
 import org.apache.storm.callback.WatcherCallBack;
 import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.cluster.VersionedData;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.KeyNotFoundException;
@@ -47,6 +48,7 @@ import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ZookeeperAuthInfo;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
@@ -93,28 +95,28 @@ public class Zookeeper {
         _instance = INSTANCE;
     }
 
-    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, 
Object port, String root) {
-        return mkClientImpl(conf, servers, port, root, new 
DefaultWatcherCallBack());
+    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, 
Object port, String root, List<ACL> defaultAcl) {
+        return mkClientImpl(conf, servers, port, root, new 
DefaultWatcherCallBack(), null, defaultAcl);
     }
 
-    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, 
Object port, Map authConf) {
-        return mkClientImpl(conf, servers, port, "", new 
DefaultWatcherCallBack(), authConf);
+    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, 
Object port, Map authConf, List<ACL> defaultAcl) {
+        return mkClientImpl(conf, servers, port, "", new 
DefaultWatcherCallBack(), authConf, defaultAcl);
     }
 
-    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, 
Object port, String root, Map authConf) {
-        return mkClientImpl(conf, servers, port, root, new 
DefaultWatcherCallBack(), authConf);
+    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, 
Object port, String root, Map authConf, List<ACL> defaultAcl) {
+        return mkClientImpl(conf, servers, port, root, new 
DefaultWatcherCallBack(), authConf, defaultAcl);
     }
 
-    public static CuratorFramework mkClient(Map conf, List<String> servers, 
Object port, String root, final WatcherCallBack watcher, Map authConf) {
-        return _instance.mkClientImpl(conf, servers, port, root, watcher, 
authConf);
+    public static CuratorFramework mkClient(Map conf, List<String> servers, 
Object port, String root, final WatcherCallBack watcher, Map authConf, 
List<ACL> defaultAcl) {
+        return _instance.mkClientImpl(conf, servers, port, root, watcher, 
authConf, defaultAcl);
     }
 
-    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, 
Object port, String root, final WatcherCallBack watcher, Map authConf) {
+    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, 
Object port, String root, final WatcherCallBack watcher, Map authConf, 
List<ACL> defaultAcl) {
         CuratorFramework fk;
         if (authConf != null) {
-            fk = Utils.newCurator(conf, servers, port, root, new 
ZookeeperAuthInfo(authConf));
+            fk = Utils.newCurator(conf, servers, port, root, new 
ZookeeperAuthInfo(authConf), defaultAcl);
         } else {
-            fk = Utils.newCurator(conf, servers, port, root);
+            fk = Utils.newCurator(conf, servers, port, root, defaultAcl);
         }
 
         fk.getCuratorListenable().addListener(new CuratorListener() {
@@ -136,8 +138,9 @@ public class Zookeeper {
      *
      * @return
      */
-    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, 
Object port, String root, final WatcherCallBack watcher) {
-        return mkClientImpl(conf, servers, port, root, watcher, null);
+    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, 
Object port, String root, final WatcherCallBack watcher,
+                                          List<ACL> defaultAcl) {
+        return mkClientImpl(conf, servers, port, root, watcher, 
null,defaultAcl);
     }
 
     public static String createNode(CuratorFramework zk, String path, byte[] 
data, org.apache.zookeeper.CreateMode mode, List<ACL> acls) {
@@ -454,14 +457,14 @@ public class Zookeeper {
         };
     }
 
-    public static ILeaderElector zkLeaderElector(Map conf, BlobStore 
blobStore) throws UnknownHostException {
-        return _instance.zkLeaderElectorImpl(conf, blobStore);
+    public static ILeaderElector zkLeaderElector(Map conf, BlobStore 
blobStore, List<ACL> defaultAcl) throws UnknownHostException {
+        return _instance.zkLeaderElectorImpl(conf, blobStore, defaultAcl);
     }
 
-    protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore 
blobStore) throws UnknownHostException {
+    protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore 
blobStore, List<ACL> defaultAcl) throws UnknownHostException {
         List<String> servers = (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS);
         Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
-        CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf);
+        CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf, 
defaultAcl);
         String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + 
"/leader-lock";
         String id = NimbusInfo.fromConf(conf).toHostPortString();
         AtomicReference<LeaderLatch> leaderLatchAtomicReference = new 
AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj 
b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index 55b686e..77c371d 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -180,21 +180,21 @@
           base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "" nil nil {} 
"")
           base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "" nil nil {} 
"")]
       (is (= [] (.assignments state nil)))
-      (.set-assignment! state "storm1" assignment1)
+      (.set-assignment! state "storm1" assignment1 {})
       (is (= assignment1 (.assignment-info state "storm1" nil)))
       (is (= nil (.assignment-info state "storm3" nil)))
-      (.set-assignment! state "storm1" assignment2)
-      (.set-assignment! state "storm3" assignment1)
+      (.set-assignment! state "storm1" assignment2 {})
+      (.set-assignment! state "storm3" assignment1 {})
       (is (= #{"storm1" "storm3"} (set (.assignments state nil))))
       (is (= assignment2 (.assignment-info state "storm1" nil)))
       (is (= assignment1 (.assignment-info state "storm3" nil)))
       
       (is (= [] (.active-storms state)))
-      (.activate-storm! state "storm1" base1)
+      (.activate-storm! state "storm1" base1 {})
       (is (= ["storm1"] (.active-storms state)))
       (is (= base1 (.storm-base state "storm1" nil)))
       (is (= nil (.storm-base state "storm2" nil)))
-      (.activate-storm! state "storm2" base2)
+      (.activate-storm! state "storm2" base2 {})
       (is (= base1 (.storm-base state "storm1" nil)))
       (is (= base2 (.storm-base state "storm2" nil)))
       (is (= #{"storm1" "storm2"} (set (.active-storms state))))

http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/test/clj/org/apache/storm/utils_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/utils_test.clj 
b/storm-core/test/clj/org/apache/storm/utils_test.clj
index b59967a..0d1d67d 100644
--- a/storm-core/test/clj/org/apache/storm/utils_test.clj
+++ b/storm-core/test/clj/org/apache/storm/utils_test.clj
@@ -32,7 +32,7 @@
            Config/STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING expected_ceiling})
         servers ["bogus_server"]
         arbitrary_port 42
-        curator (Utils/newCurator conf servers arbitrary_port nil)
+        curator (Utils/newCurator conf servers arbitrary_port nil nil)
         retry (-> curator .getZookeeperClient .getRetryPolicy)
        ]
     (is (.isAssignableFrom ExponentialBackoffRetry (.getClass retry)))

Reply via email to