Repository: storm Updated Branches: refs/heads/1.x-branch c1a1511f1 -> 49c2fc39f
STORM-2901: Reuse ZK connection for Nimbus for 1.x-branch Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/60dfa7d1 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/60dfa7d1 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/60dfa7d1 Branch: refs/heads/1.x-branch Commit: 60dfa7d1b3c8e2500a64db37777de38393354046 Parents: 25fa9dd Author: chenyuzhao <[email protected]> Authored: Tue Jan 23 12:42:46 2018 +0800 Committer: chenyuzhao <[email protected]> Committed: Tue Jan 23 12:42:46 2018 +0800 ---------------------------------------------------------------------- .../apache/storm/command/shell_submission.clj | 7 +++- .../src/clj/org/apache/storm/daemon/nimbus.clj | 36 ++++++++++++++------ .../src/clj/org/apache/storm/zookeeper.clj | 10 +++--- .../apache/storm/blobstore/BlobStoreUtils.java | 6 ++-- .../storm/blobstore/BlobSynchronizer.java | 16 ++++----- .../storm/blobstore/KeySequenceNumber.java | 25 +++++--------- .../storm/zookeeper/LeaderElectorImp.java | 3 +- .../org/apache/storm/zookeeper/Zookeeper.java | 22 ++++++++---- 8 files changed, 72 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/clj/org/apache/storm/command/shell_submission.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj index 887ab3b..3efcc14 100644 --- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj +++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj @@ -23,7 +23,12 @@ (defn -main [^String tmpjarpath & args] (let [conf (read-storm-config) ; since this is not a purpose to add to leader lock queue, passing nil as blob-store is ok - zk-leader-elector (zk-leader-elector conf nil) + zk (mk-client conf + (conf STORM-ZOOKEEPER-SERVERS) + (conf STORM-ZOOKEEPER-PORT) + :root (conf STORM-ZOOKEEPER-ROOT) + :auth-conf conf) + zk-leader-elector (zk-leader-elector conf zk nil) leader-nimbus (.getLeader zk-leader-elector) host (.getHost leader-nimbus) port (.getPort leader-nimbus) http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 7607b1b..bc72b29 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -133,6 +133,13 @@ scheduler )) +(defn mk-zk-client [conf] + (let [zk-servers (conf STORM-ZOOKEEPER-SERVERS) + zk-port (conf STORM-ZOOKEEPER-PORT) + zk-root (conf STORM-ZOOKEEPER-ROOT)] + (if (and zk-servers zk-port) + (mk-client conf zk-servers zk-port :root zk-root :auth-conf conf)))) + (defmulti blob-sync cluster-mode) (defnk is-leader [nimbus :throw-exception true] @@ -183,7 +190,8 @@ (defn nimbus-data [conf inimbus] (let [forced-scheduler (.getForcedScheduler inimbus) - blob-store (Utils/getNimbusBlobStore conf (NimbusInfo/fromConf conf))] + blob-store (Utils/getNimbusBlobStore conf (NimbusInfo/fromConf conf)) + zk-client (mk-zk-client conf)] {:conf conf :nimbus-host-port-info (NimbusInfo/fromConf conf) :inimbus inimbus @@ -213,7 +221,8 @@ (exit-process! 20 "Error when processing an event") )) :scheduler (mk-scheduler conf inimbus) - :leader-elector (zk-leader-elector conf blob-store) + :zk-client zk-client + :leader-elector (zk-leader-elector conf zk-client blob-store) :id->sched-status (atom {}) :node-id->resources (atom {}) ;;resources of supervisors :id->resources (atom {}) ;;resources of topologies @@ -452,9 +461,9 @@ supervisor-ids)) ))) -(defn- get-version-for-key [key nimbus-host-port-info conf] +(defn- get-version-for-key [key nimbus-host-port-info zk-client] (let [version (KeySequenceNumber. key nimbus-host-port-info)] - (.getKeySequenceNumber version conf))) + (.getKeySequenceNumber version zk-client))) (defn get-key-seq-from-blob-store [blob-store] (let [key-iter (.listKeys blob-store)] @@ -464,6 +473,7 @@ (let [subject (get-subject) storm-cluster-state (:storm-cluster-state nimbus) blob-store (:blob-store nimbus) + zk-client (:zk-client nimbus) jar-key (master-stormjar-key storm-id) code-key (master-stormcode-key storm-id) conf-key (master-stormconf-key storm-id) @@ -471,13 +481,13 @@ (when tmp-jar-location ;;in local mode there is no jar (.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) (if (instance? LocalFsBlobStore blob-store) - (.setup-blobstore! storm-cluster-state jar-key nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info conf)))) + (.setup-blobstore! storm-cluster-state jar-key nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info zk-client)))) (.createBlob blob-store conf-key (Utils/toCompressedJsonConf storm-conf) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) (if (instance? LocalFsBlobStore blob-store) - (.setup-blobstore! storm-cluster-state conf-key nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info conf))) + (.setup-blobstore! storm-cluster-state conf-key nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info zk-client))) (.createBlob blob-store code-key (Utils/serialize topology) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) (if (instance? LocalFsBlobStore blob-store) - (.setup-blobstore! storm-cluster-state code-key nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info conf))))) + (.setup-blobstore! storm-cluster-state code-key nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info zk-client))))) (defn- read-storm-topology [storm-id blob-store] (Utils/deserialize @@ -1115,7 +1125,7 @@ (defn try-read-storm-conf [conf storm-id blob-store] (try-cause - (read-storm-conf-as-nimbus conf storm-id blob-store) + (read-storm-conf-as-nimbus storm-id blob-store) (catch KeyNotFoundException e (throw (NotAliveException. (str storm-id)))))) @@ -1307,6 +1317,7 @@ "Sets up blobstore state for all current keys." (let [storm-cluster-state (:storm-cluster-state nimbus) blob-store (:blob-store nimbus) + zk-client (:zk-client nimbus) local-set-of-keys (set (get-key-seq-from-blob-store blob-store)) all-keys (set (.active-keys storm-cluster-state)) locally-available-active-keys (set/intersection local-set-of-keys all-keys) @@ -1319,7 +1330,7 @@ (log-debug "Creating list of key entries for blobstore inside zookeeper" all-keys "local" locally-available-active-keys) (doseq [key locally-available-active-keys] (try - (.setup-blobstore! storm-cluster-state key (:nimbus-host-port-info nimbus) (get-version-for-key key nimbus-host-port-info conf)) + (.setup-blobstore! storm-cluster-state key (:nimbus-host-port-info nimbus) (get-version-for-key key nimbus-host-port-info zk-client)) (catch KeyNotFoundException _ ; invalid key, remove it from blobstore (.deleteBlob blob-store key nimbus-subject)))))) @@ -1473,6 +1484,7 @@ (defmethod blob-sync :distributed [conf nimbus] (if (not (is-leader nimbus :throw-exception false)) (let [storm-cluster-state (:storm-cluster-state nimbus) + zk-client (:zk-client nimbus) nimbus-host-port-info (:nimbus-host-port-info nimbus) blob-store-key-set (set (get-key-seq-from-blob-store (:blob-store nimbus))) zk-key-set (set (.blobstore storm-cluster-state (fn [] (blob-sync conf nimbus))))] @@ -1481,7 +1493,8 @@ (BlobSynchronizer. (:blob-store nimbus) conf) (.setNimbusInfo nimbus-host-port-info) (.setBlobStoreKeySet blob-store-key-set) - (.setZookeeperKeySet zk-key-set))] + (.setZookeeperKeySet zk-key-set) + (.setZkClient zk-client))] (.syncBlobs sync-blobs))))) (defmethod blob-sync :local [conf nimbus] @@ -2096,10 +2109,11 @@ (^void createStateInZookeeper [this ^String blob-key] (let [storm-cluster-state (:storm-cluster-state nimbus) blob-store (:blob-store nimbus) + zk-client (:zk-client nimbus) nimbus-host-port-info (:nimbus-host-port-info nimbus) conf (:conf nimbus)] (if (instance? LocalFsBlobStore blob-store) - (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf))) + (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info zk-client))) (log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info))) (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk] http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/clj/org/apache/storm/zookeeper.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/zookeeper.clj b/storm-core/src/clj/org/apache/storm/zookeeper.clj index ca41093..12e8ad7 100644 --- a/storm-core/src/clj/org/apache/storm/zookeeper.clj +++ b/storm-core/src/clj/org/apache/storm/zookeeper.clj @@ -252,10 +252,8 @@ (defn zk-leader-elector "Zookeeper Implementation of ILeaderElector." - [conf blob-store] - (let [servers (conf STORM-ZOOKEEPER-SERVERS) - zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf) - leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock") + [conf zk blob-store] + (let [leader-lock-path "/leader-lock" id (.toHostPortString (NimbusInfo/fromConf conf)) leader-latch (atom (LeaderLatch. zk leader-lock-path id)) leader-latch-listener (atom (Zookeeper/leaderLatchListenerImpl conf zk blob-store @leader-latch)) @@ -302,5 +300,5 @@ participants))) (^void close[this] - (log-message "closing zookeeper connection of leader elector.") - (.close zk))))) + ;;Do nothing now. + )))) http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java index f1eb2f4..569aef2 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java @@ -51,6 +51,10 @@ public class BlobStoreUtils { private static final String BLOB_DEPENDENCIES_PREFIX = "dep-"; private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtils.class); + public static String getBlobStoreSubtree() { + return BLOBSTORE_SUBTREE; + } + public static CuratorFramework createZKClient(Map conf) { List<String> zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); @@ -285,6 +289,4 @@ public class BlobStoreUtils { } return fileName; } - - } http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java index f035709..b581e12 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java @@ -17,16 +17,16 @@ */ package org.apache.storm.blobstore; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.nimbus.NimbusInfo; import org.apache.curator.framework.CuratorFramework; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - /** * Is called periodically and updates the nimbus with blobs based on the state stored inside the zookeeper * for a non leader nimbus trying to be in sync with the operations performed on the leader nimbus. @@ -57,6 +57,10 @@ public class BlobSynchronizer { this.blobStoreKeySet = blobStoreKeySet; } + public void setZkClient(CuratorFramework zkClient) { + this.zkClient = zkClient; + } + public Set<String> getBlobStoreKeySet() { Set<String> keySet = new HashSet<String>(); keySet.addAll(blobStoreKeySet); @@ -72,7 +76,6 @@ public class BlobSynchronizer { public synchronized void syncBlobs() { try { LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys {}",getBlobStoreKeySet(), getZookeeperKeySet()); - zkClient = BlobStoreUtils.createZKClient(conf); deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), getZookeeperKeySet()); updateKeySetForBlobStore(getBlobStoreKeySet()); Set<String> keySetToDownload = getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet()); @@ -89,9 +92,6 @@ public class BlobSynchronizer { LOG.debug("Detected deletion for the key {} while downloading - skipping download", key); } } - if (zkClient !=null) { - zkClient.close(); - } } catch(InterruptedException exp) { LOG.error("InterruptedException {}", exp); } catch(Exception exp) { http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java index adbd4c4..570e0ad 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java @@ -18,21 +18,20 @@ package org.apache.storm.blobstore; +import java.nio.ByteBuffer; +import java.util.TreeSet; +import java.util.List; +import java.util.Map; + +import org.apache.curator.framework.CuratorFramework; import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.nimbus.NimbusInfo; -import org.apache.storm.utils.Utils; -import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; -import java.util.TreeSet; -import java.util.Map; -import java.util.List; - /** * Class hands over the key sequence number which implies the number of updates made to a blob. * The information regarding the keys and the sequence number which represents the number of updates are @@ -120,7 +119,6 @@ import java.util.List; */ public class KeySequenceNumber { private static final Logger LOG = LoggerFactory.getLogger(KeySequenceNumber.class); - private final String BLOBSTORE_SUBTREE="/blobstore"; private final String BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE="/blobstoremaxkeysequencenumber"; private final String key; private final NimbusInfo nimbusInfo; @@ -132,12 +130,11 @@ public class KeySequenceNumber { this.nimbusInfo = nimbusInfo; } - public synchronized int getKeySequenceNumber(Map conf) throws KeyNotFoundException { + public synchronized int getKeySequenceNumber(CuratorFramework zkClient) throws KeyNotFoundException { TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>(); - CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf); try { // Key has not been created yet and it is the first time it is being created - if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) { + if (zkClient.checkExists().forPath(BlobStoreUtils.getBlobStoreSubtree() + "/" + key) == null) { zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key); zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key, @@ -148,7 +145,7 @@ public class KeySequenceNumber { // When all nimbodes go down and one or few of them come up // Unfortunately there might not be an exact way to know which one contains the most updated blob, // if all go down which is unlikely. Hence there might be a need to update the blob if all go down. - List<String> stateInfoList = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key); + List<String> stateInfoList = zkClient.getChildren().forPath(BlobStoreUtils.getBlobStoreSubtree() + "/" + key); LOG.debug("stateInfoList-size {} stateInfoList-data {}", stateInfoList.size(), stateInfoList); if (stateInfoList.isEmpty()) { return getMaxSequenceNumber(zkClient); @@ -208,10 +205,6 @@ public class KeySequenceNumber { // in other case, just set this to 0 to trigger re-sync later LOG.error("Exception {}", e); return INITIAL_SEQUENCE_NUMBER - 1; - } finally { - if (zkClient != null) { - zkClient.close(); - } } } http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java b/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java index 74816c2..2f61430 100644 --- a/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java +++ b/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java @@ -118,7 +118,6 @@ public class LeaderElectorImp implements ILeaderElector { @Override public void close() { - LOG.info("closing zookeeper connection of leader elector."); - zk.close(); + //Do nothing now. } } http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java index a2ad797..b2e2236 100644 --- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java +++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java @@ -346,7 +346,7 @@ public class Zookeeper { @Override public void isLeader() { - Set<String> activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false)); + Set<String> activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk, ClusterUtils.STORMS_SUBTREE, false)); Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds); Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys); @@ -454,15 +454,23 @@ public class Zookeeper { }; } - public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException { - return _instance.zkLeaderElectorImpl(conf, blobStore); + /** + * Get master leader elector. + * @param conf Config. + * @param zkClient ZkClient, the client must have a default Config.STORM_ZOOKEEPER_ROOT as root path. + * @param blobStore {@link BlobStore} + * @return Instance of {@link ILeaderElector} + * @throws UnknownHostException + */ + public static ILeaderElector zkLeaderElector(Map conf, CuratorFramework zkClient, + BlobStore blobStore) throws UnknownHostException { + return _instance.zkLeaderElectorImpl(conf, zkClient, blobStore); } - protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException { + protected ILeaderElector zkLeaderElectorImpl(Map conf, CuratorFramework zk, BlobStore blobStore) + throws UnknownHostException { List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); - Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); - CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf); - String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock"; + String leaderLockPath = "/leader-lock"; String id = NimbusInfo.fromConf(conf).toHostPortString(); AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id)); AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
