Repository: storm Updated Branches: refs/heads/1.x-branch 4fe225cd0 -> fc64e158f
STORM-1614: backpressure init and cleanup changes for 1.x branch Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/28b96d7d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/28b96d7d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/28b96d7d Branch: refs/heads/1.x-branch Commit: 28b96d7db2313e4fc5d47729416594928886c696 Parents: 821d4ef Author: Alessandro Bellina <abell...@yahoo-inc.com> Authored: Fri Mar 11 08:25:58 2016 -0600 Committer: Alessandro Bellina <abell...@yahoo-inc.com> Committed: Fri Mar 11 09:37:01 2016 -0600 ---------------------------------------------------------------------- storm-core/src/clj/org/apache/storm/cluster.clj | 14 +- .../src/clj/org/apache/storm/daemon/nimbus.clj | 15 ++- .../src/clj/org/apache/storm/daemon/worker.clj | 2 + .../test/clj/org/apache/storm/nimbus_test.clj | 129 +++++++++++++++++++ 4 files changed, 151 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/28b96d7d/storm-core/src/clj/org/apache/storm/cluster.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj b/storm-core/src/clj/org/apache/storm/cluster.clj index 5315f1a..760f330 100644 --- a/storm-core/src/clj/org/apache/storm/cluster.clj +++ b/storm-core/src/clj/org/apache/storm/cluster.clj @@ -73,6 +73,7 @@ (teardown-topology-errors! [this storm-id]) (heartbeat-storms [this]) (error-topologies [this]) + (backpressure-topologies [this]) (set-topology-log-config! [this storm-id log-config]) (topology-log-config [this storm-id cb]) (worker-heartbeat! [this storm-id node port info]) @@ -277,7 +278,7 @@ ;; this should never happen (exit-process! 30 "Unknown callback for subtree " subtree args)))))] (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE - LOGCONFIG-SUBTREE]] + LOGCONFIG-SUBTREE BACKPRESSURE-ROOT]] (.mkdirs cluster-state p acls)) (reify StormClusterState @@ -368,6 +369,10 @@ [this] (.get_children cluster-state ERRORS-SUBTREE false)) + (backpressure-topologies + [this] + (.get_children cluster-state BACKPRESSURE-SUBTREE false)) + (get-worker-heartbeat [this storm-id node port] (let [worker-hb (.get_worker_hb cluster-state (workerbeat-path storm-id node port) false)] @@ -505,8 +510,11 @@ (remove-worker-backpressure! [this storm-id node port] - (.delete_node cluster-state (backpressure-path storm-id node port))) - + (let [path (backpressure-path storm-id node port) + existed (.node_exists cluster-state path false)] + (if existed + (.delete_node cluster-state (backpressure-path storm-id node port))))) + (teardown-topology-errors! [this storm-id] (try-cause http://git-wip-us.apache.org/repos/asf/storm/blob/28b96d7d/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 6145725..e79b94d 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -1043,12 +1043,13 @@ (filter [this key] (get-id-from-blob-key key)))] (set (.filterAndListKeys blob-store to-id)))) -(defn cleanup-storm-ids [conf storm-cluster-state blob-store] +(defn cleanup-storm-ids [storm-cluster-state blob-store] (let [heartbeat-ids (set (.heartbeat-storms storm-cluster-state)) error-ids (set (.error-topologies storm-cluster-state)) code-ids (code-ids blob-store) + backpressure-ids (set (.backpressure-topologies storm-cluster-state)) assigned-ids (set (.active-storms storm-cluster-state))] - (set/difference (set/union heartbeat-ids error-ids code-ids) assigned-ids) + (set/difference (set/union heartbeat-ids error-ids backpressure-ids code-ids) assigned-ids) )) (defn extract-status-str [base] @@ -1120,6 +1121,9 @@ (blob-rm-key blob-store (master-stormconf-key id) storm-cluster-state) (blob-rm-key blob-store (master-stormcode-key id) storm-cluster-state)) +(defn force-delete-dir [conf id] + (rmr (master-stormdist-root conf id))) + (defn do-cleanup [nimbus] (if (is-leader nimbus :throw-exception false) (let [storm-cluster-state (:storm-cluster-state nimbus) @@ -1127,13 +1131,14 @@ submit-lock (:submit-lock nimbus) blob-store (:blob-store nimbus)] (let [to-cleanup-ids (locking submit-lock - (cleanup-storm-ids conf storm-cluster-state blob-store))] + (cleanup-storm-ids storm-cluster-state blob-store))] (when-not (empty? to-cleanup-ids) (doseq [id to-cleanup-ids] (log-message "Cleaning up " id) (.teardown-heartbeats! storm-cluster-state id) (.teardown-topology-errors! storm-cluster-state id) - (rmr (master-stormdist-root conf id)) + (.remove-backpressure! storm-cluster-state id) + (force-delete-dir conf id) (blob-rm-topology-keys id blob-store storm-cluster-state) (swap! (:heartbeats-cache nimbus) dissoc id))))) (log-message "not a leader, skipping cleanup"))) @@ -1555,8 +1560,6 @@ )] (transition-name! nimbus storm-name [:kill wait-amt] true) (notify-topology-action-listener nimbus storm-name operation)) - (if (topology-conf TOPOLOGY-BACKPRESSURE-ENABLE) - (.remove-backpressure! (:storm-cluster-state nimbus) storm-id)) (add-topology-to-history-log (get-storm-id (:storm-cluster-state nimbus) storm-name) nimbus topology-conf))) http://git-wip-us.apache.org/repos/asf/storm/blob/28b96d7d/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index 9d9c482..778e83d 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -688,6 +688,8 @@ (run-worker-shutdown-hooks worker) (.remove-worker-heartbeat! (:storm-cluster-state worker) storm-id assignment-id port) + (.remove-worker-backpressure! (:storm-cluster-state worker) storm-id assignment-id port) + (log-message "Disconnecting from storm cluster state context") (.disconnect (:storm-cluster-state worker)) (.close (:cluster-state worker)) http://git-wip-us.apache.org/repos/asf/storm/blob/28b96d7d/storm-core/test/clj/org/apache/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj index 6b51d3c..0e6e4b6 100644 --- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj +++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj @@ -1514,3 +1514,132 @@ (is (= (.get_action (.get levels "other-test")) LogLevelAction/UNCHANGED)) (is (= (.get_target_log_level (.get levels "other-test")) "DEBUG"))))))) + +(defn teardown-heartbeats [id]) +(defn teardown-topo-errors [id]) +(defn teardown-backpressure-dirs [id]) + +(defn mock-cluster-state + ([] + (mock-cluster-state nil nil)) + ([active-topos inactive-topos] + (mock-cluster-state active-topos inactive-topos inactive-topos inactive-topos)) + ([active-topos hb-topos error-topos bp-topos] + (reify cluster/StormClusterState + (teardown-heartbeats! [this id] (teardown-heartbeats id)) + (teardown-topology-errors! [this id] (teardown-topo-errors id)) + (remove-backpressure! [this id] (teardown-backpressure-dirs id)) + (active-storms [this] active-topos) + (heartbeat-storms [this] hb-topos) + (error-topologies [this] error-topos) + (backpressure-topologies [this] bp-topos)))) + +(deftest cleanup-storm-ids-returns-inactive-topos + (let [mock-state (mock-cluster-state (list "topo1") (list "topo1" "topo2" "topo3"))] + (stubbing [nimbus/is-leader true + nimbus/code-ids {}] + (is (= (nimbus/cleanup-storm-ids mock-state nil) #{"topo2" "topo3"}))))) + +(deftest cleanup-storm-ids-performs-union-of-storm-ids-with-active-znodes + (let [active-topos (list "hb1" "e2" "bp3") + hb-topos (list "hb1" "hb2" "hb3") + error-topos (list "e1" "e2" "e3") + bp-topos (list "bp1" "bp2" "bp3") + mock-state (mock-cluster-state active-topos hb-topos error-topos bp-topos)] + (stubbing [nimbus/is-leader true + nimbus/code-ids {}] + (is (= (nimbus/cleanup-storm-ids mock-state nil) + #{"hb2" "hb3" "e1" "e3" "bp1" "bp2"}))))) + +(deftest cleanup-storm-ids-returns-empty-set-when-all-topos-are-active + (let [active-topos (list "hb1" "hb2" "hb3" "e1" "e2" "e3" "bp1" "bp2" "bp3") + hb-topos (list "hb1" "hb2" "hb3") + error-topos (list "e1" "e2" "e3") + bp-topos (list "bp1" "bp2" "bp3") + mock-state (mock-cluster-state active-topos hb-topos error-topos bp-topos)] + (stubbing [nimbus/is-leader true + nimbus/code-ids {}] + (is (= (nimbus/cleanup-storm-ids mock-state nil) + #{}))))) + +(deftest do-cleanup-removes-inactive-znodes + (let [inactive-topos (list "topo2" "topo3") + hb-cache (atom (into {}(map vector inactive-topos '(nil nil)))) + mock-state (mock-cluster-state) + mock-blob-store {} + conf {} + nimbus {:conf conf + :submit-lock mock-blob-store + :blob-store {} + :storm-cluster-state mock-state + :heartbeats-cache hb-cache}] + + (stubbing [nimbus/is-leader true + nimbus/blob-rm-topology-keys nil + nimbus/cleanup-storm-ids inactive-topos] + (mocking + [teardown-heartbeats + teardown-topo-errors + teardown-backpressure-dirs + nimbus/force-delete-dir + nimbus/blob-rm-topology-keys] + + (nimbus/do-cleanup nimbus) + + ;; removed heartbeats znode + (verify-nth-call-args-for 1 teardown-heartbeats "topo2") + (verify-nth-call-args-for 2 teardown-heartbeats "topo3") + + ;; removed topo errors znode + (verify-nth-call-args-for 1 teardown-topo-errors "topo2") + (verify-nth-call-args-for 2 teardown-topo-errors "topo3") + + ;; removed backpressure znodes + (verify-nth-call-args-for 1 teardown-backpressure-dirs "topo2") + (verify-nth-call-args-for 2 teardown-backpressure-dirs "topo3") + + ;; removed topo directories + (verify-nth-call-args-for 1 nimbus/force-delete-dir conf "topo2") + (verify-nth-call-args-for 2 nimbus/force-delete-dir conf "topo3") + + ;; removed blob store topo keys + (verify-nth-call-args-for 1 nimbus/blob-rm-topology-keys "topo2" mock-blob-store mock-state) + (verify-nth-call-args-for 2 nimbus/blob-rm-topology-keys "topo3" mock-blob-store mock-state) + + ;; remove topos from heartbeat cache + (is (= (count @hb-cache) 0)))))) + +(deftest do-cleanup-does-not-teardown-active-topos + (let [inactive-topos () + hb-cache (atom {"topo1" nil "topo2" nil}) + mock-state (mock-cluster-state) + mock-blob-store {} + conf {} + nimbus {:conf conf + :submit-lock mock-blob-store + :blob-store {} + :storm-cluster-state mock-state + :heartbeats-cache hb-cache}] + + (stubbing [nimbus/is-leader true + nimbus/blob-rm-topology-keys nil + nimbus/cleanup-storm-ids inactive-topos] + (mocking + [teardown-heartbeats + teardown-topo-errors + teardown-backpressure-dirs + nimbus/force-delete-dir + nimbus/blob-rm-topology-keys] + + (nimbus/do-cleanup nimbus) + + (verify-call-times-for teardown-heartbeats 0) + (verify-call-times-for teardown-topo-errors 0) + (verify-call-times-for teardown-backpressure-dirs 0) + (verify-call-times-for nimbus/force-delete-dir 0) + (verify-call-times-for nimbus/blob-rm-topology-keys 0) + + ;; hb-cache goes down to 1 because only one topo was inactive + (is (= (count @hb-cache) 2)) + (is (contains? @hb-cache "topo1")) + (is (contains? @hb-cache "topo2"))))))