Repository: storm
Updated Branches:
  refs/heads/1.x-branch 4fe225cd0 -> fc64e158f


STORM-1614: backpressure init and cleanup changes for  1.x branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/28b96d7d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/28b96d7d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/28b96d7d

Branch: refs/heads/1.x-branch
Commit: 28b96d7db2313e4fc5d47729416594928886c696
Parents: 821d4ef
Author: Alessandro Bellina <abell...@yahoo-inc.com>
Authored: Fri Mar 11 08:25:58 2016 -0600
Committer: Alessandro Bellina <abell...@yahoo-inc.com>
Committed: Fri Mar 11 09:37:01 2016 -0600

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/cluster.clj |  14 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  15 ++-
 .../src/clj/org/apache/storm/daemon/worker.clj  |   2 +
 .../test/clj/org/apache/storm/nimbus_test.clj   | 129 +++++++++++++++++++
 4 files changed, 151 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/28b96d7d/storm-core/src/clj/org/apache/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj 
b/storm-core/src/clj/org/apache/storm/cluster.clj
index 5315f1a..760f330 100644
--- a/storm-core/src/clj/org/apache/storm/cluster.clj
+++ b/storm-core/src/clj/org/apache/storm/cluster.clj
@@ -73,6 +73,7 @@
   (teardown-topology-errors! [this storm-id])
   (heartbeat-storms [this])
   (error-topologies [this])
+  (backpressure-topologies [this])
   (set-topology-log-config! [this storm-id log-config])
   (topology-log-config [this storm-id cb])
   (worker-heartbeat! [this storm-id node port info])
@@ -277,7 +278,7 @@
                          ;; this should never happen
                          (exit-process! 30 "Unknown callback for subtree " 
subtree args)))))]
     (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE 
WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE
-               LOGCONFIG-SUBTREE]]
+               LOGCONFIG-SUBTREE BACKPRESSURE-ROOT]]
       (.mkdirs cluster-state p acls))
     (reify
       StormClusterState
@@ -368,6 +369,10 @@
         [this]
         (.get_children cluster-state ERRORS-SUBTREE false))
 
+      (backpressure-topologies
+        [this]
+        (.get_children cluster-state BACKPRESSURE-SUBTREE false))
+
       (get-worker-heartbeat
         [this storm-id node port]
         (let [worker-hb (.get_worker_hb cluster-state (workerbeat-path 
storm-id node port) false)]
@@ -505,8 +510,11 @@
 
       (remove-worker-backpressure!
         [this storm-id node port]
-        (.delete_node cluster-state (backpressure-path storm-id node port)))
-
+        (let [path (backpressure-path storm-id node port)
+              existed (.node_exists cluster-state path false)]
+          (if existed
+            (.delete_node cluster-state (backpressure-path storm-id node 
port)))))
+    
       (teardown-topology-errors!
         [this storm-id]
         (try-cause

http://git-wip-us.apache.org/repos/asf/storm/blob/28b96d7d/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj 
b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 6145725..e79b94d 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1043,12 +1043,13 @@
                 (filter [this key] (get-id-from-blob-key key)))]
     (set (.filterAndListKeys blob-store to-id))))
 
-(defn cleanup-storm-ids [conf storm-cluster-state blob-store]
+(defn cleanup-storm-ids [storm-cluster-state blob-store]
   (let [heartbeat-ids (set (.heartbeat-storms storm-cluster-state))
         error-ids (set (.error-topologies storm-cluster-state))
         code-ids (code-ids blob-store)
+        backpressure-ids (set (.backpressure-topologies storm-cluster-state))
         assigned-ids (set (.active-storms storm-cluster-state))]
-    (set/difference (set/union heartbeat-ids error-ids code-ids) assigned-ids)
+    (set/difference (set/union heartbeat-ids error-ids backpressure-ids 
code-ids) assigned-ids)
     ))
 
 (defn extract-status-str [base]
@@ -1120,6 +1121,9 @@
   (blob-rm-key blob-store (master-stormconf-key id) storm-cluster-state)
   (blob-rm-key blob-store (master-stormcode-key id) storm-cluster-state))
 
+(defn force-delete-dir [conf id]
+  (rmr (master-stormdist-root conf id)))
+
 (defn do-cleanup [nimbus]
   (if (is-leader nimbus :throw-exception false)
     (let [storm-cluster-state (:storm-cluster-state nimbus)
@@ -1127,13 +1131,14 @@
           submit-lock (:submit-lock nimbus)
           blob-store (:blob-store nimbus)]
       (let [to-cleanup-ids (locking submit-lock
-                             (cleanup-storm-ids conf storm-cluster-state 
blob-store))]
+                             (cleanup-storm-ids storm-cluster-state 
blob-store))]
         (when-not (empty? to-cleanup-ids)
           (doseq [id to-cleanup-ids]
             (log-message "Cleaning up " id)
             (.teardown-heartbeats! storm-cluster-state id)
             (.teardown-topology-errors! storm-cluster-state id)
-            (rmr (master-stormdist-root conf id))
+            (.remove-backpressure! storm-cluster-state id)
+            (force-delete-dir conf id)
             (blob-rm-topology-keys id blob-store storm-cluster-state)
             (swap! (:heartbeats-cache nimbus) dissoc id)))))
     (log-message "not a leader, skipping cleanup")))
@@ -1555,8 +1560,6 @@
                            )]
             (transition-name! nimbus storm-name [:kill wait-amt] true)
             (notify-topology-action-listener nimbus storm-name operation))
-          (if (topology-conf TOPOLOGY-BACKPRESSURE-ENABLE)
-            (.remove-backpressure! (:storm-cluster-state nimbus) storm-id))
           (add-topology-to-history-log (get-storm-id (:storm-cluster-state 
nimbus) storm-name)
             nimbus topology-conf)))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/28b96d7d/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj 
b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 9d9c482..778e83d 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -688,6 +688,8 @@
                     (run-worker-shutdown-hooks worker)
 
                     (.remove-worker-heartbeat! (:storm-cluster-state worker) 
storm-id assignment-id port)
+                    (.remove-worker-backpressure! (:storm-cluster-state 
worker) storm-id assignment-id port)
+
                     (log-message "Disconnecting from storm cluster state 
context")
                     (.disconnect (:storm-cluster-state worker))
                     (.close (:cluster-state worker))

http://git-wip-us.apache.org/repos/asf/storm/blob/28b96d7d/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj 
b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 6b51d3c..0e6e4b6 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1514,3 +1514,132 @@
 
            (is (= (.get_action (.get levels "other-test")) 
LogLevelAction/UNCHANGED))
            (is (= (.get_target_log_level (.get levels "other-test")) 
"DEBUG")))))))
+
+(defn teardown-heartbeats [id])
+(defn teardown-topo-errors [id])
+(defn teardown-backpressure-dirs [id])
+
+(defn mock-cluster-state 
+  ([] 
+    (mock-cluster-state nil nil))
+  ([active-topos inactive-topos]
+    (mock-cluster-state active-topos inactive-topos inactive-topos 
inactive-topos)) 
+  ([active-topos hb-topos error-topos bp-topos]
+    (reify cluster/StormClusterState
+      (teardown-heartbeats! [this id] (teardown-heartbeats id))
+      (teardown-topology-errors! [this id] (teardown-topo-errors id))
+      (remove-backpressure! [this id] (teardown-backpressure-dirs id))
+      (active-storms [this] active-topos)
+      (heartbeat-storms [this] hb-topos)
+      (error-topologies [this] error-topos)
+      (backpressure-topologies [this] bp-topos))))
+
+(deftest cleanup-storm-ids-returns-inactive-topos
+  (let [mock-state (mock-cluster-state (list "topo1") (list "topo1" "topo2" 
"topo3"))]
+    (stubbing [nimbus/is-leader true
+               nimbus/code-ids {}] 
+    (is (= (nimbus/cleanup-storm-ids mock-state nil) #{"topo2" "topo3"})))))
+
+(deftest cleanup-storm-ids-performs-union-of-storm-ids-with-active-znodes
+  (let [active-topos (list "hb1" "e2" "bp3")
+        hb-topos (list "hb1" "hb2" "hb3")
+        error-topos (list "e1" "e2" "e3")
+        bp-topos (list "bp1" "bp2" "bp3")
+        mock-state (mock-cluster-state active-topos hb-topos error-topos 
bp-topos)]
+    (stubbing [nimbus/is-leader true
+               nimbus/code-ids {}] 
+    (is (= (nimbus/cleanup-storm-ids mock-state nil) 
+           #{"hb2" "hb3" "e1" "e3" "bp1" "bp2"})))))
+
+(deftest cleanup-storm-ids-returns-empty-set-when-all-topos-are-active
+  (let [active-topos (list "hb1" "hb2" "hb3" "e1" "e2" "e3" "bp1" "bp2" "bp3")
+        hb-topos (list "hb1" "hb2" "hb3")
+        error-topos (list "e1" "e2" "e3")
+        bp-topos (list "bp1" "bp2" "bp3")
+        mock-state (mock-cluster-state active-topos hb-topos error-topos 
bp-topos)]
+    (stubbing [nimbus/is-leader true
+               nimbus/code-ids {}] 
+    (is (= (nimbus/cleanup-storm-ids mock-state nil) 
+           #{})))))
+
+(deftest do-cleanup-removes-inactive-znodes
+  (let [inactive-topos (list "topo2" "topo3")
+        hb-cache (atom (into {}(map vector inactive-topos '(nil nil))))
+        mock-state (mock-cluster-state)
+        mock-blob-store {}
+        conf {}
+        nimbus {:conf conf
+                :submit-lock mock-blob-store 
+                :blob-store {}
+                :storm-cluster-state mock-state
+                :heartbeats-cache hb-cache}]
+
+    (stubbing [nimbus/is-leader true
+               nimbus/blob-rm-topology-keys nil
+               nimbus/cleanup-storm-ids inactive-topos]
+      (mocking
+        [teardown-heartbeats 
+         teardown-topo-errors 
+         teardown-backpressure-dirs
+         nimbus/force-delete-dir
+         nimbus/blob-rm-topology-keys] 
+
+        (nimbus/do-cleanup nimbus)
+
+        ;; removed heartbeats znode
+        (verify-nth-call-args-for 1 teardown-heartbeats "topo2")
+        (verify-nth-call-args-for 2 teardown-heartbeats "topo3")
+
+        ;; removed topo errors znode
+        (verify-nth-call-args-for 1 teardown-topo-errors "topo2")
+        (verify-nth-call-args-for 2 teardown-topo-errors "topo3")
+
+        ;; removed backpressure znodes
+        (verify-nth-call-args-for 1 teardown-backpressure-dirs "topo2")
+        (verify-nth-call-args-for 2 teardown-backpressure-dirs "topo3")
+
+        ;; removed topo directories
+        (verify-nth-call-args-for 1 nimbus/force-delete-dir conf "topo2")
+        (verify-nth-call-args-for 2 nimbus/force-delete-dir conf "topo3")
+
+        ;; removed blob store topo keys
+        (verify-nth-call-args-for 1 nimbus/blob-rm-topology-keys "topo2" 
mock-blob-store mock-state)
+        (verify-nth-call-args-for 2 nimbus/blob-rm-topology-keys "topo3" 
mock-blob-store mock-state)
+
+        ;; remove topos from heartbeat cache
+        (is (= (count @hb-cache) 0))))))
+
+(deftest do-cleanup-does-not-teardown-active-topos
+  (let [inactive-topos ()
+        hb-cache (atom {"topo1" nil "topo2" nil})
+        mock-state (mock-cluster-state)
+        mock-blob-store {}
+        conf {}
+        nimbus {:conf conf
+                :submit-lock mock-blob-store 
+                :blob-store {}
+                :storm-cluster-state mock-state
+                :heartbeats-cache hb-cache}]
+
+    (stubbing [nimbus/is-leader true
+               nimbus/blob-rm-topology-keys nil
+               nimbus/cleanup-storm-ids inactive-topos]
+      (mocking
+        [teardown-heartbeats 
+         teardown-topo-errors 
+         teardown-backpressure-dirs
+         nimbus/force-delete-dir
+         nimbus/blob-rm-topology-keys] 
+
+        (nimbus/do-cleanup nimbus)
+
+        (verify-call-times-for teardown-heartbeats 0)
+        (verify-call-times-for teardown-topo-errors 0)
+        (verify-call-times-for teardown-backpressure-dirs 0)
+        (verify-call-times-for nimbus/force-delete-dir 0)
+        (verify-call-times-for nimbus/blob-rm-topology-keys 0)
+
+        ;; hb-cache goes down to 1 because only one topo was inactive
+        (is (= (count @hb-cache) 2))
+        (is (contains? @hb-cache "topo1"))
+        (is (contains? @hb-cache "topo2"))))))

Reply via email to