Repository: storm Updated Branches: refs/heads/1.1.x-branch 58f7aefb8 -> 22a962073
STORM-3026: Upgrade ZK instance for security Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/22a96207 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/22a96207 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/22a96207 Branch: refs/heads/1.1.x-branch Commit: 22a962073c5f12dc5ab281a15d93eb5efc31ab6b Parents: 58f7aef Author: Robert Evans <ev...@yahoo-inc.com> Authored: Fri Apr 13 16:46:42 2018 -0500 Committer: Robert Evans <ev...@yahoo-inc.com> Committed: Fri Apr 13 16:46:42 2018 -0500 ---------------------------------------------------------------------- conf/defaults.yaml | 2 + storm-core/src/clj/org/apache/storm/cluster.clj | 61 ++-- .../cluster_state/zookeeper_state_factory.clj | 4 +- .../apache/storm/command/shell_submission.clj | 24 +- .../src/clj/org/apache/storm/daemon/nimbus.clj | 41 ++- .../src/clj/org/apache/storm/zookeeper.clj | 8 +- storm-core/src/jvm/org/apache/storm/Config.java | 15 + .../apache/storm/blobstore/BlobStoreUtils.java | 9 +- .../storm/blobstore/BlobSynchronizer.java | 3 +- .../storm/blobstore/KeySequenceNumber.java | 5 +- .../storm/blobstore/LocalFsBlobStore.java | 8 +- .../org/apache/storm/cluster/ClusterUtils.java | 30 +- .../storm/cluster/IStormClusterState.java | 14 +- .../storm/cluster/StormClusterStateImpl.java | 38 ++- .../apache/storm/cluster/ZKStateStorage.java | 14 +- .../daemon/supervisor/SupervisorUtils.java | 5 +- .../transactional/state/TransactionalState.java | 4 +- .../topology/state/TransactionalState.java | 4 +- .../src/jvm/org/apache/storm/utils/Utils.java | 69 +++-- .../apache/storm/zookeeper/AclEnforcement.java | 301 +++++++++++++++++++ .../org/apache/storm/zookeeper/Zookeeper.java | 37 +-- .../test/clj/org/apache/storm/cluster_test.clj | 10 +- .../test/clj/org/apache/storm/utils_test.clj | 2 +- 23 files changed, 568 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 46a4d87..d8029ff 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -48,6 +48,8 @@ storm.messaging.transport: "org.apache.storm.messaging.netty.Context" storm.nimbus.retry.times: 5 storm.nimbus.retry.interval.millis: 2000 storm.nimbus.retry.intervalceiling.millis: 60000 +storm.nimbus.zookeeper.acls.check: true +storm.nimbus.zookeeper.acls.fixup: true storm.auth.simple-white-list.users: [] storm.auth.simple-acl.users: [] storm.auth.simple-acl.users.commands: [] http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/clj/org/apache/storm/cluster.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj b/storm-core/src/clj/org/apache/storm/cluster.clj index 810b3c3..f1a0412 100644 --- a/storm-core/src/clj/org/apache/storm/cluster.clj +++ b/storm-core/src/clj/org/apache/storm/cluster.clj @@ -30,13 +30,22 @@ (:require [org.apache.storm [zookeeper :as zk]]) (:require [org.apache.storm.daemon [common :as common]])) -(defn mk-topo-only-acls - [topo-conf] +(defn mk-topo-acls + [topo-conf type] (let [payload (.get topo-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)] (when (Utils/isZkAuthenticationConfiguredTopology topo-conf) [(first ZooDefs$Ids/CREATOR_ALL_ACL) - (ACL. ZooDefs$Perms/READ (Id. "digest" (DigestAuthenticationProvider/generateDigest payload)))]))) - + (ACL. type (Id. "digest" (DigestAuthenticationProvider/generateDigest payload)))]))) + +(defn mk-topo-read-write-acls + [topo-conf] + (mk-topo-acls topo-conf ZooDefs$Perms/ALL)) + +(defn mk-topo-read-only-acls + [topo-conf] + [topo-conf] + (mk-topo-acls topo-conf ZooDefs$Perms/READ)) + (defnk mk-distributed-cluster-state [conf :auth-conf nil :acls nil :context (ClusterStateContext.)] (let [clazz (Class/forName (or (conf STORM-CLUSTER-STATE-STORE) @@ -68,26 +77,27 @@ (executor-beats [this storm-id executor->node+port]) (supervisors [this callback]) (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist - (setup-heartbeats! [this storm-id]) + (setup-heartbeats! [this storm-id topo-conf]) (teardown-heartbeats! [this storm-id]) (teardown-topology-errors! [this storm-id]) (heartbeat-storms [this]) (error-topologies [this]) (backpressure-topologies [this]) - (set-topology-log-config! [this storm-id log-config]) + (set-topology-log-config! [this storm-id log-config topo-conf]) (topology-log-config [this storm-id cb]) (worker-heartbeat! [this storm-id node port info]) (remove-worker-heartbeat! [this storm-id node port]) (supervisor-heartbeat! [this supervisor-id info]) (worker-backpressure! [this storm-id node port info]) (topology-backpressure [this storm-id callback]) - (setup-backpressure! [this storm-id]) + (setup-backpressure! [this storm-id topo-conf]) (remove-backpressure! [this storm-id]) (remove-worker-backpressure! [this storm-id node port]) - (activate-storm! [this storm-id storm-base]) + (activate-storm! [this storm-id storm-base topo-conf]) (update-storm! [this storm-id new-elems]) (remove-storm-base! [this storm-id]) - (set-assignment! [this storm-id info]) + (setup-errors! [this storm-id topo-conf]) + (set-assignment! [this storm-id info topo-conf]) ;; sets up information related to key consisting of nimbus ;; host:port and version info of the blob (setup-blobstore! [this key nimbusInfo versionInfo]) @@ -416,8 +426,9 @@ (maybe-deserialize (.get_data cluster-state (log-config-path storm-id) (not-nil? cb)) LogConfig)) (set-topology-log-config! - [this storm-id log-config] - (.set_data cluster-state (log-config-path storm-id) (Utils/serialize log-config) acls)) + [this storm-id log-config topo-conf] + (.mkdirs cluster-state LOGCONFIG-SUBTREE acls) + (.set_data cluster-state (log-config-path storm-id) (Utils/serialize log-config) (mk-topo-read-only-acls topo-conf))) (set-worker-profile-request [this storm-id profile-request] @@ -472,8 +483,9 @@ (.delete_worker_hb cluster-state (workerbeat-path storm-id node port))) (setup-heartbeats! - [this storm-id] - (.mkdirs cluster-state (workerbeat-storm-root storm-id) acls)) + [this storm-id topo-conf] + (.mkdirs cluster-state WORKERBEATS-SUBTREE acls) + (.mkdirs cluster-state (workerbeat-storm-root storm-id) (mk-topo-read-write-acls topo-conf))) (teardown-heartbeats! [this storm-id] @@ -506,8 +518,9 @@ (> (count children) 0))) (setup-backpressure! - [this storm-id] - (.mkdirs cluster-state (backpressure-storm-root storm-id) acls)) + [this storm-id topo-conf] + (.mkdirs cluster-state BACKPRESSURE-SUBTREE acls) + (.mkdirs cluster-state (backpressure-storm-root storm-id) (mk-topo-read-write-acls topo-conf))) (remove-backpressure! [this storm-id] @@ -533,9 +546,10 @@ (.set_ephemeral_node cluster-state (supervisor-path supervisor-id) (Utils/serialize thrift-supervisor-info) acls))) (activate-storm! - [this storm-id storm-base] + [this storm-id storm-base topo-conf] (let [thrift-storm-base (thriftify-storm-base storm-base)] - (.set_data cluster-state (storm-path storm-id) (Utils/serialize thrift-storm-base) acls))) + (.mkdirs cluster-state STORMS-SUBTREE acls) + (.set_data cluster-state (storm-path storm-id) (Utils/serialize thrift-storm-base) (mk-topo-read-only-acls topo-conf)))) (update-storm! [this storm-id new-elems] @@ -562,9 +576,10 @@ (.delete_node cluster-state (storm-path storm-id))) (set-assignment! - [this storm-id info] + [this storm-id info topo-conf] + (.mkdirs cluster-state ASSIGNMENTS-SUBTREE acls) (let [thrift-assignment (thriftify-assignment info)] - (.set_data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls))) + (.set_data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) (mk-topo-read-only-acls topo-conf)))) (remove-blobstore-key! [this blob-key] @@ -585,9 +600,10 @@ (set-credentials! [this storm-id creds topo-conf] - (let [topo-acls (mk-topo-only-acls topo-conf) + (let [topo-acls (mk-topo-read-only-acls topo-conf) path (credentials-path storm-id) thriftified-creds (thriftify-credentials creds)] + (.mkdirs cluster-state CREDENTIALS-SUBTREE acls) (.set_data cluster-state path (Utils/serialize thriftified-creds) topo-acls))) (credentials @@ -596,6 +612,11 @@ (swap! credentials-callback assoc storm-id callback)) (clojurify-crdentials (maybe-deserialize (.get_data cluster-state (credentials-path storm-id) (not-nil? callback)) Credentials))) + (setup-errors! + [this storm-id topo-conf] + (.mkdirs cluster-state ERRORS-SUBTREE acls) + (.mkdirs cluster-state (error-storm-root storm-id) (mk-topo-read-write-acls topo-conf))) + (report-error [this storm-id component-id node port error] (let [path (error-path storm-id component-id) http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj index fdef972..d2915eb 100644 --- a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj +++ b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj @@ -28,7 +28,7 @@ (ZKStateStorage. conf, auth-conf, acls, context)) (defn -mkState [this conf auth-conf acls context] - (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf auth-conf)] + (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) acls :auth-conf auth-conf)] (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls) (.close zk)) (let [callbacks (atom {}) @@ -36,6 +36,7 @@ zk-writer (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) + acls :auth-conf auth-conf :root (conf STORM-ZOOKEEPER-ROOT) :watcher (fn [state type path] @@ -50,6 +51,7 @@ (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) + acls :auth-conf auth-conf :root (conf STORM-ZOOKEEPER-ROOT) :watcher (fn [state type path] http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/clj/org/apache/storm/command/shell_submission.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj index 887ab3b..1bb3d10 100644 --- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj +++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj @@ -15,20 +15,18 @@ ;; limitations under the License. (ns org.apache.storm.command.shell-submission (:import [org.apache.storm StormSubmitter]) - (:use [org.apache.storm thrift util config log zookeeper]) + (:use [org.apache.storm util config log]) (:require [clojure.string :as str]) + (:import [org.apache.storm.utils ConfigUtils NimbusClient]) (:gen-class)) - (defn -main [^String tmpjarpath & args] - (let [conf (read-storm-config) - ; since this is not a purpose to add to leader lock queue, passing nil as blob-store is ok - zk-leader-elector (zk-leader-elector conf nil) - leader-nimbus (.getLeader zk-leader-elector) - host (.getHost leader-nimbus) - port (.getPort leader-nimbus) - no-op (.close zk-leader-elector) - jarpath (StormSubmitter/submitJar conf tmpjarpath) - args (concat args [host port jarpath])] - (exec-command! (str/join " " args)) - )) + (let [conf (clojurify-structure (ConfigUtils/readStormConfig))] + (with-open [client (NimbusClient/getConfiguredClient conf)] + (let [c (.getClient client) + ns (.getLeader c) + host (.get_host ns) + port (.get_port ns) + jarpath (StormSubmitter/submitJar conf tmpjarpath) + args (concat args [host port jarpath])] + (exec-command! (str/join " " args)))))) http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 7607b1b..94ce7fd 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -56,6 +56,7 @@ [stats :as stats]]) (:require [org.apache.storm.ui.core :as ui]) (:require [clojure.set :as set]) + (:import [org.apache.storm.zookeeper AclEnforcement]) (:import [org.apache.storm.daemon.common StormBase Assignment]) (:use [org.apache.storm.daemon common]) (:use [org.apache.storm config]) @@ -133,6 +134,16 @@ scheduler )) +(def NIMBUS-ZK-ACLS ZooDefs$Ids/CREATOR_ALL_ACL) + +(defn mk-zk-client [conf] + (let [zk-servers (conf STORM-ZOOKEEPER-SERVERS) + zk-port (conf STORM-ZOOKEEPER-PORT) + zk-root (conf STORM-ZOOKEEPER-ROOT)] + (if (and zk-servers zk-port) + (mk-client conf zk-servers zk-port (if (Utils/isZkAuthenticationConfiguredStormServer conf) NIMBUS-ZK-ACLS nil) :root zk-root + :auth-conf conf)))) + (defmulti blob-sync cluster-mode) (defnk is-leader [nimbus :throw-exception true] @@ -142,10 +153,6 @@ (let [leader-address (.getLeader leader-elector)] (throw (RuntimeException. (str "not a leader, current leader is " leader-address)))))))) -(def NIMBUS-ZK-ACLS - [(first ZooDefs$Ids/CREATOR_ALL_ACL) - (ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)]) - (defn mk-blob-cache-map "Constructs a TimeCacheMap instance with a blob store timeout whose expiration callback invokes cancel on the value held by an expired entry when @@ -213,7 +220,7 @@ (exit-process! 20 "Error when processing an event") )) :scheduler (mk-scheduler conf inimbus) - :leader-elector (zk-leader-elector conf blob-store) + :leader-elector (zk-leader-elector conf blob-store (if (Utils/isZkAuthenticationConfiguredStormServer conf) NIMBUS-ZK-ACLS nil)) :id->sched-status (atom {}) :node-id->resources (atom {}) ;;resources of supervisors :id->resources (atom {}) ;;resources of topologies @@ -454,7 +461,7 @@ (defn- get-version-for-key [key nimbus-host-port-info conf] (let [version (KeySequenceNumber. key nimbus-host-port-info)] - (.getKeySequenceNumber version conf))) + (.getKeySequenceNumber version conf (if (Utils/isZkAuthenticationConfiguredStormServer conf) NIMBUS-ZK-ACLS nil)))) (defn get-key-seq-from-blob-store [blob-store] (let [key-iter (.listKeys blob-store)] @@ -1002,7 +1009,7 @@ td (.get tds tid) assignment (if (and (not (:owner assignment)) (not (nil? td))) (let [new-assignment (fixup-assignment assignment td)] - (.set-assignment! storm-cluster-state tid new-assignment) + (.set-assignment! storm-cluster-state tid new-assignment (.getConf td)) new-assignment) assignment)] {tid assignment}))))] @@ -1060,7 +1067,7 @@ (log-debug "Assignment for " topology-id " hasn't changed") (do (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment)) - (.set-assignment! storm-cluster-state topology-id assignment) + (.set-assignment! storm-cluster-state topology-id assignment (.getConf topology-details)) ))) (->> new-assignments (map (fn [[topology-id assignment]] @@ -1098,7 +1105,8 @@ nil nil {} - principal)) + principal) + storm-conf) (notify-topology-action-listener nimbus storm-name "activate"))) (defn storm-active? [storm-cluster-state storm-name] @@ -1744,9 +1752,10 @@ (log-message "uploadedJar " uploadedJarLocation) (setup-storm-code nimbus conf storm-id uploadedJarLocation total-storm-conf topology) (wait-for-desired-code-replication nimbus total-storm-conf storm-id) - (.setup-heartbeats! storm-cluster-state storm-id) + (.setup-heartbeats! storm-cluster-state storm-id total-storm-conf) + (.setup-errors! storm-cluster-state storm-id total-storm-conf) (if (total-storm-conf TOPOLOGY-BACKPRESSURE-ENABLE) - (.setup-backpressure! storm-cluster-state storm-id)) + (.setup-backpressure! storm-cluster-state storm-id total-storm-conf)) (notify-topology-action-listener nimbus storm-name "submitTopology") (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive TopologyInitialStatus/ACTIVE :active}] @@ -1892,7 +1901,7 @@ (.containsKey named-loggers logger-name)) (.remove named-loggers logger-name)))))) (log-message "Setting log config for " storm-name ":" merged-log-config) - (.set-topology-log-config! storm-cluster-state id merged-log-config))) + (.set-topology-log-config! storm-cluster-state id merged-log-config topology-conf))) (uploadNewCredentials [this storm-name credentials] (mark! nimbus:num-uploadNewCredentials-calls) @@ -2565,8 +2574,12 @@ (defn -launch [nimbus] (let [conf (merge (read-storm-config) - (read-yaml-config "storm-cluster-auth.yaml" false))] - (launch-server! conf nimbus))) + (read-yaml-config "storm-cluster-auth.yaml" false)) + fixup-acl (conf STORM-NIMBUS-ZOOKEEPER-ACLS-FIXUP) + check-acl (or fixup-acl (conf STORM-NIMBUS-ZOOKEEPER-ACLS-CHECK))] + (when check-acl + (AclEnforcement/verifyAcls conf fixup-acl)) + (launch-server! conf nimbus))) (defn standalone-nimbus [] (reify INimbus http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/clj/org/apache/storm/zookeeper.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/zookeeper.clj b/storm-core/src/clj/org/apache/storm/zookeeper.clj index ca41093..43795e5 100644 --- a/storm-core/src/clj/org/apache/storm/zookeeper.clj +++ b/storm-core/src/clj/org/apache/storm/zookeeper.clj @@ -54,11 +54,11 @@ (log-message "Zookeeper state update: " state type path)) (defnk mk-client - [conf servers port + [conf servers port default-acl :root "" :watcher default-watcher :auth-conf nil] - (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))] + (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)) default-acl)] (.. fk (getCuratorListenable) (addListener @@ -252,9 +252,9 @@ (defn zk-leader-elector "Zookeeper Implementation of ILeaderElector." - [conf blob-store] + [conf blob-store default-acl] (let [servers (conf STORM-ZOOKEEPER-SERVERS) - zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf) + zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) default-acl :auth-conf conf) leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock") id (.toHostPortString (NimbusInfo/fromConf conf)) leader-latch (atom (LeaderLatch. zk leader-lock-path id)) http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 370d027..988540f 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -53,6 +53,21 @@ public class Config extends HashMap<String, Object> { private static final long serialVersionUID = -1550278723792864455L; /** + * In nimbus on startup check if all of the zookeeper ACLs are correct before starting. If not + * don't start nimbus. + */ + @isBoolean + public static final String STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK = "storm.nimbus.zookeeper.acls.check"; + + /** + * In nimbus on startup check if all of the zookeeper ACLs are correct before starting. If not do + * your best to fix them before nimbus starts, if it cannot fix them nimbus will not start. + * This overrides any value set for storm.nimbus.zookeeper.acls.check. + */ + @isBoolean + public static final String STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP = "storm.nimbus.zookeeper.acls.fixup"; + + /** * This is part of a temporary workaround to a ZK bug, it is the 'scheme:acl' for * the user Nimbus and Supervisors use to authenticate with ZK. */ http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java index f1eb2f4..89bc610 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java @@ -33,6 +33,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,11 +52,15 @@ public class BlobStoreUtils { private static final String BLOB_DEPENDENCIES_PREFIX = "dep-"; private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtils.class); - public static CuratorFramework createZKClient(Map conf) { + public static String getBlobStoreSubtree() { + return BLOBSTORE_SUBTREE; + } + + public static CuratorFramework createZKClient(Map conf, List<ACL> defaultAcls) { List<String> zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf); - CuratorFramework zkClient = Utils.newCurator(conf, zkServers, port, (String) conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo); + CuratorFramework zkClient = Utils.newCurator(conf, zkServers, port, (String) conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo, defaultAcls); zkClient.start(); return zkClient; } http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java index f035709..9e888e8 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java @@ -20,6 +20,7 @@ package org.apache.storm.blobstore; import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.nimbus.NimbusInfo; import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.ZooDefs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +73,7 @@ public class BlobSynchronizer { public synchronized void syncBlobs() { try { LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys {}",getBlobStoreKeySet(), getZookeeperKeySet()); - zkClient = BlobStoreUtils.createZKClient(conf); + zkClient = BlobStoreUtils.createZKClient(conf, ZooDefs.Ids.CREATOR_ALL_ACL); deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), getZookeeperKeySet()); updateKeySetForBlobStore(getBlobStoreKeySet()); Set<String> keySetToDownload = getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet()); http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java index adbd4c4..c175a97 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java @@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,9 +133,9 @@ public class KeySequenceNumber { this.nimbusInfo = nimbusInfo; } - public synchronized int getKeySequenceNumber(Map conf) throws KeyNotFoundException { + public synchronized int getKeySequenceNumber(Map conf, List<ACL> defaultAcls) throws KeyNotFoundException { TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>(); - CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf); + CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf, defaultAcls); try { // Key has not been created yet and it is the first time it is being created if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) { http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java index c9326e2..be4c3ad 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java @@ -28,6 +28,8 @@ import org.apache.storm.nimbus.NimbusInfo; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.Utils; import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,7 +85,11 @@ public class LocalFsBlobStore extends BlobStore { public void prepare(Map conf, String overrideBase, NimbusInfo nimbusInfo) { this.conf = conf; this.nimbusInfo = nimbusInfo; - zkClient = BlobStoreUtils.createZKClient(conf); + List<ACL> acl = null; + if (Utils.isZkAuthenticationConfiguredStormServer(conf)) { + acl = ZooDefs.Ids.CREATOR_ALL_ACL; + } + zkClient = BlobStoreUtils.createZKClient(conf, acl); if (overrideBase == null) { overrideBase = ConfigUtils.absoluteStormBlobStoreDir(conf); } http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java index 96c177b..11ab035 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java @@ -43,7 +43,6 @@ public class ClusterUtils { public static final String ZK_SEPERATOR = "/"; public static final String ASSIGNMENTS_ROOT = "assignments"; - public static final String CODE_ROOT = "code"; public static final String STORMS_ROOT = "storms"; public static final String SUPERVISORS_ROOT = "supervisors"; public static final String WORKERBEATS_ROOT = "workerbeats"; @@ -91,15 +90,38 @@ public class ClusterUtils { _instance = INSTANCE; } - public static List<ACL> mkTopoOnlyAcls(Map topoConf) throws NoSuchAlgorithmException { + /** + * Get ZK ACLs for a topology to have read/write access. + * @param topoConf the topology config. + * @return the ACLs. + */ + public static List<ACL> mkTopoReadWriteAcls(Map<String, Object> topoConf) { + return mkTopoAcls(topoConf, ZooDefs.Perms.ALL); + } + + /** + * Get ZK ACLs for a topology to have read only access. + * @param topoConf the topology config. + * @return the ACLs. + */ + public static List<ACL> mkTopoReadOnlyAcls(Map<String, Object> topoConf) { + return mkTopoAcls(topoConf, ZooDefs.Perms.READ); + } + + private static List<ACL> mkTopoAcls(Map<String, Object> topoConf, int perms) { List<ACL> aclList = null; String payload = (String) topoConf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD); if (Utils.isZkAuthenticationConfiguredTopology(topoConf)) { aclList = new ArrayList<>(); ACL acl1 = ZooDefs.Ids.CREATOR_ALL_ACL.get(0); aclList.add(acl1); - ACL acl2 = new ACL(ZooDefs.Perms.READ, new Id("digest", DigestAuthenticationProvider.generateDigest(payload))); - aclList.add(acl2); + try { + ACL acl2 = new ACL(perms, new Id("digest", DigestAuthenticationProvider.generateDigest(payload))); + aclList.add(acl2); + } catch (NoSuchAlgorithmException e) { + //Should only happen on a badly configured system + throw new RuntimeException(e); + } } return aclList; } http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java index a6f07ed..c93a6b2 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java @@ -59,7 +59,7 @@ public interface IStormClusterState { public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist - public void setupHeatbeats(String stormId); + public void setupHeatbeats(String stormId, Map<String, Object> topoConf); public void teardownHeartbeats(String stormId); @@ -71,7 +71,7 @@ public interface IStormClusterState { public List<String> backpressureTopologies(); - public void setTopologyLogConfig(String stormId, LogConfig logConfig); + public void setTopologyLogConfig(String stormId, LogConfig logConfig, Map<String, Object> topoConf); public LogConfig topologyLogConfig(String stormId, Runnable cb); @@ -85,19 +85,19 @@ public interface IStormClusterState { public boolean topologyBackpressure(String stormId, Runnable callback); - public void setupBackpressure(String stormId); + public void setupBackpressure(String stormId, Map<String, Object> topoConf); public void removeBackpressure(String stormId); public void removeWorkerBackpressure(String stormId, String node, Long port); - public void activateStorm(String stormId, StormBase stormBase); + public void activateStorm(String stormId, StormBase stormBase, Map<String, Object> topoConf); public void updateStorm(String stormId, StormBase newElems); public void removeStormBase(String stormId); - public void setAssignment(String stormId, Assignment info); + public void setAssignment(String stormId, Assignment info, Map<String, Object> topoConf); public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo); @@ -113,11 +113,13 @@ public interface IStormClusterState { public void reportError(String stormId, String componentId, String node, Long port, Throwable error); + void setupErrors(String stormId, Map<String, Object> topoConf); + public List<ErrorInfo> errors(String stormId, String componentId); public ErrorInfo lastError(String stormId, String componentId); - public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException; + public void setCredentials(String stormId, Credentials creds, Map topoConf); public Credentials credentials(String stormId, Runnable callback); http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java index f9952cf..2d26c2f 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java @@ -33,7 +33,6 @@ import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.security.NoSuchAlgorithmException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; @@ -243,7 +242,6 @@ public class StormClusterStateImpl implements IStormClusterState { public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) { byte[] bytes = stateStorage.get_worker_hb(ClusterUtils.workerbeatPath(stormId, node, port), false); return ClusterUtils.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class); - } @Override @@ -338,8 +336,9 @@ public class StormClusterStateImpl implements IStormClusterState { } @Override - public void setupHeatbeats(String stormId) { - stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), acls); + public void setupHeatbeats(String stormId, Map<String, Object> topoConf) { + stateStorage.mkdirs(ClusterUtils.WORKERBEATS_SUBTREE, acls); + stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), ClusterUtils.mkTopoReadWriteAcls(topoConf)); } @Override @@ -386,8 +385,9 @@ public class StormClusterStateImpl implements IStormClusterState { } @Override - public void setTopologyLogConfig(String stormId, LogConfig logConfig) { - stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), acls); + public void setTopologyLogConfig(String stormId, LogConfig logConfig, Map<String, Object> topoConf) { + stateStorage.mkdirs(ClusterUtils.LOGCONFIG_SUBTREE, acls); + stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), ClusterUtils.mkTopoReadOnlyAcls(topoConf)); } @Override @@ -467,8 +467,9 @@ public class StormClusterStateImpl implements IStormClusterState { } @Override - public void setupBackpressure(String stormId) { - stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), acls); + public void setupBackpressure(String stormId, Map<String, Object> topoConf) { + stateStorage.mkdirs(ClusterUtils.BACKPRESSURE_SUBTREE, acls); + stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), ClusterUtils.mkTopoReadWriteAcls(topoConf)); } @Override @@ -495,9 +496,10 @@ public class StormClusterStateImpl implements IStormClusterState { } @Override - public void activateStorm(String stormId, StormBase stormBase) { + public void activateStorm(String stormId, StormBase stormBase, Map<String, Object> topoConf) { String path = ClusterUtils.stormPath(stormId); - stateStorage.set_data(path, Utils.serialize(stormBase), acls); + stateStorage.mkdirs(ClusterUtils.STORMS_SUBTREE, acls); + stateStorage.set_data(path, Utils.serialize(stormBase), ClusterUtils.mkTopoReadOnlyAcls(topoConf)); } /** @@ -589,8 +591,9 @@ public class StormClusterStateImpl implements IStormClusterState { } @Override - public void setAssignment(String stormId, Assignment info) { - stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), acls); + public void setAssignment(String stormId, Assignment info, Map<String, Object> topoConf) { + stateStorage.mkdirs(ClusterUtils.ASSIGNMENTS_SUBTREE, acls); + stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), ClusterUtils.mkTopoReadOnlyAcls(topoConf)); } @Override @@ -639,6 +642,12 @@ public class StormClusterStateImpl implements IStormClusterState { } @Override + public void setupErrors(String stormId, Map<String, Object> topoConf) { + stateStorage.mkdirs(ClusterUtils.ERRORS_SUBTREE, acls); + stateStorage.mkdirs(ClusterUtils.errorStormRoot(stormId), ClusterUtils.mkTopoReadWriteAcls(topoConf)); + } + + @Override public void reportError(String stormId, String componentId, String node, Long port, Throwable error) { String path = ClusterUtils.errorPath(stormId, componentId); @@ -708,11 +717,10 @@ public class StormClusterStateImpl implements IStormClusterState { } @Override - public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException { - List<ACL> aclList = ClusterUtils.mkTopoOnlyAcls(topoConf); + public void setCredentials(String stormId, Credentials creds, Map topoConf) { + List<ACL> aclList = ClusterUtils.mkTopoReadOnlyAcls(topoConf); String path = ClusterUtils.credentialsPath(stormId); stateStorage.set_data(path, Utils.serialize(creds), aclList); - } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java index e337b1f..af15dd9 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java @@ -80,15 +80,15 @@ public class ZKStateStorage implements IStateStorage { this.isNimbus = true; // just mkdir STORM_ZOOKEEPER_ROOT dir - CuratorFramework zkTemp = mkZk(); + CuratorFramework zkTemp = mkZk(acls); String rootPath = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)); Zookeeper.mkdirs(zkTemp, rootPath, acls); zkTemp.close(); active = new AtomicBoolean(true); - zkWriter = mkZk(new ZkWatcherCallBack()); + zkWriter = mkZk(acls, new ZkWatcherCallBack()); if (isNimbus) { - zkReader = mkZk(new ZkWatcherCallBack()); + zkReader = mkZk(acls, new ZkWatcherCallBack()); } else { zkReader = zkWriter; } @@ -96,15 +96,15 @@ public class ZKStateStorage implements IStateStorage { } @SuppressWarnings("unchecked") - private CuratorFramework mkZk() throws IOException { + private CuratorFramework mkZk(List<ACL> acls) throws IOException { return Zookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), "", - new DefaultWatcherCallBack(), authConf); + new DefaultWatcherCallBack(), authConf, acls); } @SuppressWarnings("unchecked") - private CuratorFramework mkZk(WatcherCallBack watcher) throws NumberFormatException, IOException { + private CuratorFramework mkZk(List<ACL> acls, WatcherCallBack watcher) throws NumberFormatException, IOException { return Zookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), - String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcher, authConf); + String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcher, authConf, acls); } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java index ef4b54d..31d1130 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java @@ -312,9 +312,6 @@ public class SupervisorUtils { } static List<ACL> supervisorZkAcls() { - final List<ACL> acls = new ArrayList<>(); - acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0)); - acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE)); - return acls; + return ZooDefs.Ids.CREATOR_ALL_ACL; } } http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java b/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java index 7662daa..e470348 100644 --- a/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java +++ b/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java @@ -66,7 +66,7 @@ public class TransactionalState { List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS); Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT); ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf); - CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port, auth); + CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port, auth, null); _zkAcls = Utils.getWorkerACL(conf); try { TransactionalState.createNode(initter, transactionalRoot, null, null, null); @@ -78,7 +78,7 @@ public class TransactionalState { } initter.close(); - _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth); + _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth, null); _ser = new KryoValuesSerializer(conf); _des = new KryoValuesDeserializer(conf); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java index 71068dc..5bdb16c 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java +++ b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java @@ -63,7 +63,7 @@ public class TransactionalState { List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS); Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT); ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf); - CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port, auth); + CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port, auth, null); _zkAcls = Utils.getWorkerACL(conf); try { TransactionalState.createNode(initter, transactionalRoot, null, null, null); @@ -75,7 +75,7 @@ public class TransactionalState { } initter.close(); - _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth); + _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth, null); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java index b9ced2c..846205a 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java @@ -32,6 +32,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient; import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider; import org.apache.curator.ensemble.exhibitor.Exhibitors; +import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.storm.Config; @@ -1079,15 +1080,16 @@ public class Utils { return false; } - public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root) { - return newCurator(conf, servers, port, root, null); + public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, List<ACL> defaultAcl) { + return newCurator(conf, servers, port, root, null, defaultAcl); } - public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) { - return newCurator(conf, servers, port, "", auth); + public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth, List<ACL> defaultAcl) { + return newCurator(conf, servers, port, "", auth, defaultAcl); } - public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) { + public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth, + final List<ACL> defaultAcl) { List<String> serverPorts = new ArrayList<String>(); for (String zkServer : servers) { serverPorts.add(zkServer + ":" + Utils.getInt(port)); @@ -1096,6 +1098,19 @@ public class Utils { CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); setupBuilder(builder, zkStr, conf, auth); + if (defaultAcl != null) { + builder.aclProvider(new ACLProvider() { + @Override + public List<ACL> getDefaultAcl() { + return defaultAcl; + } + + @Override + public List<ACL> getAclForPath(String s) { + return null; + } + }); + } return builder.build(); } @@ -1142,15 +1157,15 @@ public class Utils { setupBuilder(builder, zkStr, conf, auth); } - public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) { - CuratorFramework ret = newCurator(conf, servers, port, root, auth); + public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth, List<ACL> defaultAcl) { + CuratorFramework ret = newCurator(conf, servers, port, root, auth, defaultAcl); LOG.info("Starting Utils Curator..."); ret.start(); return ret; } - public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) { - CuratorFramework ret = newCurator(conf, servers, port, auth); + public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth, List<ACL> defaultAcl) { + CuratorFramework ret = newCurator(conf, servers, port, auth, defaultAcl); LOG.info("Starting Utils Curator (2)..."); ret.start(); return ret; @@ -1227,23 +1242,39 @@ public class Utils { && !((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty()); } - - public static List<ACL> getWorkerACL(Map conf) { - //This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct perms - if (!isZkAuthenticationConfiguredTopology(conf)) { - return null; + public static Id parseZkId(String id, String configName) { + String[] split = id.split(":", 2); + if (split.length != 2) { + throw new IllegalArgumentException(configName + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user"); } + return new Id(split[0], split[1]); + } + + /** + * Get the ACL for nimbus/supervisor. The Super User ACL. This assumes that security is enabled. + * @param conf the config to get the super User ACL from + * @return the super user ACL. + */ + public static ACL getSuperUserAcl(Map<String, Object> conf) { String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL); if (stormZKUser == null) { throw new IllegalArgumentException("Authentication is enabled but " + Config.STORM_ZOOKEEPER_SUPERACL + " is not set"); } - String[] split = stormZKUser.split(":", 2); - if (split.length != 2) { - throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user"); + return new ACL(ZooDefs.Perms.ALL, parseZkId(stormZKUser, Config.STORM_ZOOKEEPER_SUPERACL)); + } + + /** + * Get the ZK ACLs that a worker should use when writing to ZK. + * @param conf the config for the topology. + * @return the ACLs + */ + public static List<ACL> getWorkerACL(Map<String, Object> conf) { + if (!isZkAuthenticationConfiguredTopology(conf)) { + return null; } ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL); - ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1]))); - return ret; + ret.add(getSuperUserAcl(conf)); + return ret; } /** http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/zookeeper/AclEnforcement.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/AclEnforcement.java b/storm-core/src/jvm/org/apache/storm/zookeeper/AclEnforcement.java new file mode 100644 index 0000000..b39db99 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/zookeeper/AclEnforcement.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.zookeeper; + +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.security.auth.Subject; +import org.apache.curator.framework.CuratorFramework; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.callback.DefaultWatcherCallBack; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.generated.KeyNotFoundException; +import org.apache.storm.nimbus.NimbusInfo; +import org.apache.storm.security.auth.NimbusPrincipal; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is code intended to enforce ZK ACLs. + */ +public class AclEnforcement { + private static final Logger LOG = LoggerFactory.getLogger(AclEnforcement.class); + + /** + * Verify the ZK ACLs are correct and optionally fix them if needed. + * @param conf the cluster config. + * @param fixUp true if we want to fix the ACLs else false. + * @throws Exception on any error. + */ + public static void verifyAcls(Map<String, Object> conf, final boolean fixUp) throws Exception { + if (!Utils.isZkAuthenticationConfiguredStormServer(conf)) { + LOG.info("SECURITY IS DISABLED NO FURTHER CHECKS..."); + //There is no security so we are done. + return; + } + ACL superUserAcl = Utils.getSuperUserAcl(conf); + List<ACL> superAcl = new ArrayList<>(1); + superAcl.add(superUserAcl); + + List<String> zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); + int port = Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_PORT)); + String stormRoot = (String) conf.get(Config.STORM_ZOOKEEPER_ROOT); + + try (CuratorFramework zk = Zookeeper.mkClient(conf, zkServers, port, "", + new DefaultWatcherCallBack(), conf, superAcl)) { + if (zk.checkExists().forPath(stormRoot) != null) { + //First off we want to verify that ROOT is good + verifyAclStrict(zk, superAcl, stormRoot, fixUp); + } else { + LOG.warn("{} does not exist no need to check any more...", stormRoot); + return; + } + } + + // Now that the root is fine we can start to look at the other paths under it. + try (CuratorFramework zk = Zookeeper.mkClient(conf, zkServers, port, stormRoot, + new DefaultWatcherCallBack(), conf, superAcl)) { + //Next verify that the blob store is correct before we start it up. + if (zk.checkExists().forPath(ClusterUtils.BLOBSTORE_SUBTREE) != null) { + verifyAclStrictRecursive(zk, superAcl, ClusterUtils.BLOBSTORE_SUBTREE, fixUp); + } + + if (zk.checkExists().forPath(ClusterUtils.BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE) != null) { + verifyAclStrict(zk, superAcl, ClusterUtils.BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE, fixUp); + } + + //The blobstore is good, now lets get the list of all topo Ids + Set<String> topoIds = new HashSet<>(); + if (zk.checkExists().forPath(ClusterUtils.STORMS_SUBTREE) != null) { + topoIds.addAll(zk.getChildren().forPath(ClusterUtils.STORMS_SUBTREE)); + } + + Map<String, Id> topoToZkCreds = new HashMap<>(); + //Now lets get the creds for the topos so we can verify those as well. + BlobStore bs = Utils.getNimbusBlobStore(conf, NimbusInfo.fromConf(conf)); + try { + Subject nimbusSubject = new Subject(); + nimbusSubject.getPrincipals().add(new NimbusPrincipal()); + for (String topoId: topoIds) { + try { + String blobKey = topoId + "-stormconf.ser"; + Map<String, Object> topoConf = Utils.fromCompressedJsonConf(bs.readBlob(blobKey, nimbusSubject)); + String payload = (String) topoConf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD); + try { + topoToZkCreds.put(topoId, new Id("digest", DigestAuthenticationProvider.generateDigest(payload))); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } catch (KeyNotFoundException knf) { + LOG.debug("topo removed {}", topoId, knf); + } + } + } finally { + if (bs != null) { + bs.shutdown(); + } + } + + verifyParentWithReadOnlyTopoChildren(zk, superUserAcl, ClusterUtils.STORMS_SUBTREE, topoToZkCreds, fixUp); + verifyParentWithReadOnlyTopoChildren(zk, superUserAcl, ClusterUtils.ASSIGNMENTS_SUBTREE, topoToZkCreds, fixUp); + //There is a race on credentials where they can be leaked in some versions of storm. + verifyParentWithReadOnlyTopoChildrenDeleteDead(zk, superUserAcl, ClusterUtils.CREDENTIALS_SUBTREE, topoToZkCreds, fixUp); + //There is a race on logconfig where they can be leaked in some versions of storm. + verifyParentWithReadOnlyTopoChildrenDeleteDead(zk, superUserAcl, ClusterUtils.LOGCONFIG_SUBTREE, topoToZkCreds, fixUp); + //There is a race on backpressure too... + verifyParentWithReadWriteTopoChildrenDeleteDead(zk, superUserAcl, ClusterUtils.BACKPRESSURE_SUBTREE, topoToZkCreds, fixUp); + + if (zk.checkExists().forPath(ClusterUtils.ERRORS_SUBTREE) != null) { + //errors is a bit special because in older versions of storm the worker created the parent directories lazily + // because of this it means we need to auto create at least the topo-id directory for all running topos. + for (String topoId : topoToZkCreds.keySet()) { + String path = ClusterUtils.errorStormRoot(topoId); + if (zk.checkExists().forPath(path) == null) { + LOG.warn("Creating missing errors location {}", path); + zk.create().withACL(getTopoReadWrite(path, topoId, topoToZkCreds, superUserAcl, fixUp)).forPath(path); + } + } + } + //Error should not be leaked according to the code, but they are not important enough to fail the build if + // for some odd reason they are leaked. + verifyParentWithReadWriteTopoChildrenDeleteDead(zk, superUserAcl, ClusterUtils.ERRORS_SUBTREE, topoToZkCreds, fixUp); + + if (zk.checkExists().forPath(ClusterUtils.NIMBUSES_SUBTREE) != null) { + verifyAclStrictRecursive(zk, superAcl, ClusterUtils.NIMBUSES_SUBTREE, fixUp); + } + + if (zk.checkExists().forPath("/leader-lock") != null) { + verifyAclStrictRecursive(zk, superAcl, "/leader-lock", fixUp); + } + + if (zk.checkExists().forPath(ClusterUtils.PROFILERCONFIG_SUBTREE) != null) { + verifyAclStrictRecursive(zk, superAcl, ClusterUtils.PROFILERCONFIG_SUBTREE, fixUp); + } + + if (zk.checkExists().forPath(ClusterUtils.SUPERVISORS_SUBTREE) != null) { + verifyAclStrictRecursive(zk, superAcl, ClusterUtils.SUPERVISORS_SUBTREE, fixUp); + } + + // When moving to pacemaker workerbeats can be leaked too... + verifyParentWithReadWriteTopoChildrenDeleteDead(zk, superUserAcl, ClusterUtils.WORKERBEATS_SUBTREE, topoToZkCreds, fixUp); + } + } + + private static List<ACL> getTopoAcl(String path, String topoId, Map<String, Id> topoToZkCreds, ACL superAcl, boolean fixUp, int perms) { + Id id = topoToZkCreds.get(topoId); + if (id == null) { + String error = "Could not find credentials for topology " + topoId + " at path " + path + "."; + if (fixUp) { + error += " Don't know how to fix this automatically. Please add needed ACLs, or delete the path."; + } + throw new IllegalStateException(error); + } + List<ACL> ret = new ArrayList<>(2); + ret.add(superAcl); + ret.add(new ACL(perms, id)); + return ret; + } + + private static List<ACL> getTopoReadWrite(String path, String topoId, Map<String, Id> topoToZkCreds, ACL superAcl, boolean fixUp) { + return getTopoAcl(path, topoId, topoToZkCreds, superAcl, fixUp, ZooDefs.Perms.ALL); + } + + private static void verifyParentWithTopoChildrenDeleteDead(CuratorFramework zk, ACL superUserAcl, String path, + Map<String, Id> topoToZkCreds, boolean fixUp, int perms) throws Exception { + if (zk.checkExists().forPath(path) != null) { + verifyAclStrict(zk, Arrays.asList(superUserAcl), path, fixUp); + Set<String> possiblyBadIds = new HashSet<>(); + for (String topoId : zk.getChildren().forPath(path)) { + String childPath = path + ClusterUtils.ZK_SEPERATOR + topoId; + if (!topoToZkCreds.containsKey(topoId)) { + //Save it to try again later... + possiblyBadIds.add(topoId); + } else { + List<ACL> rwAcl = getTopoAcl(path, topoId, topoToZkCreds, superUserAcl, fixUp, perms); + verifyAclStrictRecursive(zk, rwAcl, childPath, fixUp); + } + } + + if (!possiblyBadIds.isEmpty()) { + //Lets reread the children in STORMS as the source of truth and see if a new one was created in the background + possiblyBadIds.removeAll(zk.getChildren().forPath(ClusterUtils.STORMS_SUBTREE)); + for (String topoId: possiblyBadIds) { + //Now we know for sure that this is a bad id + String childPath = path + ClusterUtils.ZK_SEPERATOR + topoId; + zk.delete().deletingChildrenIfNeeded().forPath(childPath); + } + } + } + } + + private static void verifyParentWithReadOnlyTopoChildrenDeleteDead(CuratorFramework zk, ACL superUserAcl, String path, + Map<String, Id> topoToZkCreds, boolean fixUp) throws Exception { + verifyParentWithTopoChildrenDeleteDead(zk, superUserAcl, path, topoToZkCreds, fixUp, ZooDefs.Perms.READ); + } + + private static void verifyParentWithReadWriteTopoChildrenDeleteDead(CuratorFramework zk, ACL superUserAcl, String path, + Map<String, Id> topoToZkCreds, boolean fixUp) throws Exception { + verifyParentWithTopoChildrenDeleteDead(zk, superUserAcl, path, topoToZkCreds, fixUp, ZooDefs.Perms.ALL); + } + + private static void verifyParentWithTopoChildren(CuratorFramework zk, ACL superUserAcl, String path, + Map<String, Id> topoToZkCreds, boolean fixUp, int perms) throws Exception { + if (zk.checkExists().forPath(path) != null) { + verifyAclStrict(zk, Arrays.asList(superUserAcl), path, fixUp); + for (String topoId : zk.getChildren().forPath(path)) { + String childPath = path + ClusterUtils.ZK_SEPERATOR + topoId; + List<ACL> rwAcl = getTopoAcl(path, topoId, topoToZkCreds, superUserAcl, fixUp, perms); + verifyAclStrictRecursive(zk, rwAcl, childPath, fixUp); + } + } + } + + private static void verifyParentWithReadOnlyTopoChildren(CuratorFramework zk, ACL superUserAcl, String path, + Map<String, Id> topoToZkCreds, boolean fixUp) throws Exception { + verifyParentWithTopoChildren(zk, superUserAcl, path, topoToZkCreds, fixUp, ZooDefs.Perms.READ); + } + + private static void verifyParentWithReadWriteTopoChildren(CuratorFramework zk, ACL superUserAcl, String path, + Map<String, Id> topoToZkCreds, boolean fixUp) throws Exception { + verifyParentWithTopoChildren(zk, superUserAcl, path, topoToZkCreds, fixUp, ZooDefs.Perms.ALL); + } + + private static void verifyAclStrictRecursive(CuratorFramework zk, List<ACL> strictAcl, String path, boolean fixUp) throws Exception { + verifyAclStrict(zk, strictAcl, path, fixUp); + for (String child: zk.getChildren().forPath(path)) { + String newPath = path + ClusterUtils.ZK_SEPERATOR + child; + verifyAclStrictRecursive(zk, strictAcl, newPath, fixUp); + } + } + + private static void verifyAclStrict(CuratorFramework zk, List<ACL> strictAcl, String path, boolean fixUp) throws Exception { + try { + List<ACL> foundAcl = zk.getACL().forPath(path); + if (!equivalent(foundAcl, strictAcl)) { + if (fixUp) { + LOG.warn("{} expected to have ACL {}, but has {}. Fixing...", path, strictAcl, foundAcl); + zk.setACL().withACL(strictAcl).forPath(path); + } else { + throw new IllegalStateException(path + " did not have the correct ACL found " + foundAcl + " expected " + strictAcl); + } + } + } catch (KeeperException.NoNodeException ne) { + LOG.debug("{} removed in the middle of checking it", ne); + } + } + + private static boolean equivalent(List<ACL> a, List<ACL> b) { + if (a.size() == b.size()) { + for (ACL aAcl: a) { + if (!b.contains(aAcl)) { + return false; + } + } + return true; + } + return false; + } + + public static void main(String [] args) throws Exception { + Map<String, Object> conf = Utils.readStormConfig(); + boolean fixUp = false; + for (String arg: args) { + String a = arg.toLowerCase(); + if ("-f".equals(a) || "--fixup".equals(a)) { + fixUp = true; + } else { + throw new IllegalArgumentException("Unsupported argument " + arg + " only -f or --fixup is supported."); + } + } + verifyAcls(conf, fixUp); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java index a2ad797..a025ed9 100644 --- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java +++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java @@ -36,6 +36,7 @@ import org.apache.storm.blobstore.InputStreamWithMeta; import org.apache.storm.callback.DefaultWatcherCallBack; import org.apache.storm.callback.WatcherCallBack; import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; import org.apache.storm.cluster.VersionedData; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.KeyNotFoundException; @@ -47,6 +48,7 @@ import org.apache.storm.utils.Utils; import org.apache.storm.utils.ZookeeperAuthInfo; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.NIOServerCnxnFactory; @@ -93,28 +95,28 @@ public class Zookeeper { _instance = INSTANCE; } - public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root) { - return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack()); + public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, List<ACL> defaultAcl) { + return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack(), null, defaultAcl); } - public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, Map authConf) { - return mkClientImpl(conf, servers, port, "", new DefaultWatcherCallBack(), authConf); + public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, Map authConf, List<ACL> defaultAcl) { + return mkClientImpl(conf, servers, port, "", new DefaultWatcherCallBack(), authConf, defaultAcl); } - public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, Map authConf) { - return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack(), authConf); + public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, Map authConf, List<ACL> defaultAcl) { + return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack(), authConf, defaultAcl); } - public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) { - return _instance.mkClientImpl(conf, servers, port, root, watcher, authConf); + public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf, List<ACL> defaultAcl) { + return _instance.mkClientImpl(conf, servers, port, root, watcher, authConf, defaultAcl); } - public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) { + public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf, List<ACL> defaultAcl) { CuratorFramework fk; if (authConf != null) { - fk = Utils.newCurator(conf, servers, port, root, new ZookeeperAuthInfo(authConf)); + fk = Utils.newCurator(conf, servers, port, root, new ZookeeperAuthInfo(authConf), defaultAcl); } else { - fk = Utils.newCurator(conf, servers, port, root); + fk = Utils.newCurator(conf, servers, port, root, defaultAcl); } fk.getCuratorListenable().addListener(new CuratorListener() { @@ -136,8 +138,9 @@ public class Zookeeper { * * @return */ - public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { - return mkClientImpl(conf, servers, port, root, watcher, null); + public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, + List<ACL> defaultAcl) { + return mkClientImpl(conf, servers, port, root, watcher, null,defaultAcl); } public static String createNode(CuratorFramework zk, String path, byte[] data, org.apache.zookeeper.CreateMode mode, List<ACL> acls) { @@ -454,14 +457,14 @@ public class Zookeeper { }; } - public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException { - return _instance.zkLeaderElectorImpl(conf, blobStore); + public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore, List<ACL> defaultAcl) throws UnknownHostException { + return _instance.zkLeaderElectorImpl(conf, blobStore, defaultAcl); } - protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException { + protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore, List<ACL> defaultAcl) throws UnknownHostException { List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); - CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf); + CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf, defaultAcl); String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock"; String id = NimbusInfo.fromConf(conf).toHostPortString(); AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id)); http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/test/clj/org/apache/storm/cluster_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj index 55b686e..77c371d 100644 --- a/storm-core/test/clj/org/apache/storm/cluster_test.clj +++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj @@ -180,21 +180,21 @@ base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "" nil nil {} "") base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "" nil nil {} "")] (is (= [] (.assignments state nil))) - (.set-assignment! state "storm1" assignment1) + (.set-assignment! state "storm1" assignment1 {}) (is (= assignment1 (.assignment-info state "storm1" nil))) (is (= nil (.assignment-info state "storm3" nil))) - (.set-assignment! state "storm1" assignment2) - (.set-assignment! state "storm3" assignment1) + (.set-assignment! state "storm1" assignment2 {}) + (.set-assignment! state "storm3" assignment1 {}) (is (= #{"storm1" "storm3"} (set (.assignments state nil)))) (is (= assignment2 (.assignment-info state "storm1" nil))) (is (= assignment1 (.assignment-info state "storm3" nil))) (is (= [] (.active-storms state))) - (.activate-storm! state "storm1" base1) + (.activate-storm! state "storm1" base1 {}) (is (= ["storm1"] (.active-storms state))) (is (= base1 (.storm-base state "storm1" nil))) (is (= nil (.storm-base state "storm2" nil))) - (.activate-storm! state "storm2" base2) + (.activate-storm! state "storm2" base2 {}) (is (= base1 (.storm-base state "storm1" nil))) (is (= base2 (.storm-base state "storm2" nil))) (is (= #{"storm1" "storm2"} (set (.active-storms state)))) http://git-wip-us.apache.org/repos/asf/storm/blob/22a96207/storm-core/test/clj/org/apache/storm/utils_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/utils_test.clj b/storm-core/test/clj/org/apache/storm/utils_test.clj index b59967a..0d1d67d 100644 --- a/storm-core/test/clj/org/apache/storm/utils_test.clj +++ b/storm-core/test/clj/org/apache/storm/utils_test.clj @@ -32,7 +32,7 @@ Config/STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING expected_ceiling}) servers ["bogus_server"] arbitrary_port 42 - curator (Utils/newCurator conf servers arbitrary_port nil) + curator (Utils/newCurator conf servers arbitrary_port nil nil) retry (-> curator .getZookeeperClient .getRetryPolicy) ] (is (.isAssignableFrom ExponentialBackoffRetry (.getClass retry)))