Repository: storm
Updated Branches:
  refs/heads/master 6cf8a9c99 -> 97e131d5a


[STORM-1696] status not sync if zk fails in backpressure


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a70195d7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a70195d7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a70195d7

Branch: refs/heads/master
Commit: a70195d717e1ed179425a0305b2e4279afeb6118
Parents: 6415863
Author: zhuoliu <[email protected]>
Authored: Wed Apr 13 13:04:57 2016 -0500
Committer: zhuoliu <[email protected]>
Committed: Wed Apr 13 13:04:57 2016 -0500

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/worker.clj   | 19 +++++++++++--------
 .../org/apache/storm/utils/DisruptorQueue.java   |  4 ++++
 2 files changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a70195d7/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj 
b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 8d30948..883630b 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -145,16 +145,19 @@
             assignment-id (:assignment-id worker)
             port (:port worker)
             storm-cluster-state (:storm-cluster-state worker)
-            prev-backpressure-flag @(:backpressure worker)]
-        (when executors
-          (reset! (:backpressure worker)
-                  (or @(:transfer-backpressure worker)
-                      (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) 
executors)))))
+            prev-backpressure-flag @(:backpressure worker)
+            ;; the backpressure flag is true if at least one of the disruptor 
queues has throttle-on
+            curr-backpressure-flag (if executors
+                                     (or (.getThrottleOn (:transfer-queue 
worker))
+                                       (reduce #(or %1 %2) (map 
#(.get-backpressure-flag %1) executors)))
+                                     prev-backpressure-flag)]
         ;; update the worker's backpressure flag to zookeeper only when it has 
changed
-        (log-debug "BP " @(:backpressure worker) " WAS " 
prev-backpressure-flag)
-        (when (not= prev-backpressure-flag @(:backpressure worker))
+        (when (not= prev-backpressure-flag curr-backpressure-flag)
           (try
-            (.workerBackpressure storm-cluster-state storm-id assignment-id 
(long port) @(:backpressure worker))
+            (log-debug "worker backpressure flag changing from " 
prev-backpressure-flag " to " curr-backpressure-flag)
+            (.workerBackpressure storm-cluster-state storm-id assignment-id 
(long port) curr-backpressure-flag)
+            ;; doing the local reset after the zk update succeeds is very 
important to avoid a bad state upon zk exception
+            (reset! (:backpressure worker) curr-backpressure-flag)
             (catch Exception exc
               (log-error exc "workerBackpressure update failed when connecting 
to ZK ... will retry"))))
         ))))

http://git-wip-us.apache.org/repos/asf/storm/blob/a70195d7/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java 
b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index 4482297..d310337 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -544,4 +544,8 @@ public class DisruptorQueue implements IStatefulObject {
     public QueueMetrics getMetrics() {
         return _metrics;
     }
+
+       public boolean getThrottleOn() {
+           return _throttleOn;
+       }
 }

Reply via email to