Repository: storm
Updated Branches:
  refs/heads/1.x-branch b511a8b4f -> 3a4825ed9


[STORM-1696]-1.x-branch status not sync if zk fails in backpressure


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9271056b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9271056b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9271056b

Branch: refs/heads/1.x-branch
Commit: 9271056b22ab5c734157a9ca1f3f4ab9a28d4b4b
Parents: 6af0d10
Author: zhuol <[email protected]>
Authored: Thu Apr 7 18:12:33 2016 -0500
Committer: zhuol <[email protected]>
Committed: Thu Apr 7 18:12:33 2016 -0500

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/worker.clj   | 19 +++++++++++--------
 .../org/apache/storm/utils/DisruptorQueue.java   |  8 ++++++--
 2 files changed, 17 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9271056b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj 
b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 778e83d..b8bc423 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -138,16 +138,19 @@
             assignment-id (:assignment-id worker)
             port (:port worker)
             storm-cluster-state (:storm-cluster-state worker)
-            prev-backpressure-flag @(:backpressure worker)]
-        (when executors
-          (reset! (:backpressure worker)
-                  (or @(:transfer-backpressure worker)
-                      (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) 
executors)))))
+            prev-backpressure-flag @(:backpressure worker)
+            ;; the backpressure flag is true if at least one of the disruptor 
queues has throttle-on
+            curr-backpressure-flag (if executors
+                                     (or (.getThrottleOn (:transfer-queue 
worker))
+                                       (reduce #(or %1 %2) (map 
#(.get-backpressure-flag %1) executors)))
+                                     prev-backpressure-flag)]
         ;; update the worker's backpressure flag to zookeeper only when it has 
changed
-        (log-debug "BP " @(:backpressure worker) " WAS " 
prev-backpressure-flag)
-        (when (not= prev-backpressure-flag @(:backpressure worker))
+        (when (not= prev-backpressure-flag curr-backpressure-flag)
           (try
-            (.worker-backpressure! storm-cluster-state storm-id assignment-id 
port @(:backpressure worker))
+            (log-debug "worker backpressure flag changing from " 
prev-backpressure-flag " to " curr-backpressure-flag)
+            (.worker-backpressure! storm-cluster-state storm-id assignment-id 
port curr-backpressure-flag)
+            ;; doing the local reset after the zk update succeeds is very 
important to avoid a bad state upon zk exception
+            (reset! (:backpressure worker) curr-backpressure-flag)
             (catch Exception exc
               (log-error exc "workerBackpressure update failed when connecting 
to ZK ... will retry"))))
         ))))

http://git-wip-us.apache.org/repos/asf/storm/blob/9271056b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java 
b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index 19aba06..9f39d06 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -146,8 +146,8 @@ public class DisruptorQueue implements IStatefulObject {
             if (_enableBackpressure && _cb != null && (_metrics.population() + 
_overflowCount.get()) >= _highWaterMark) {
                 try {
                     if (!_throttleOn) {
-                        _cb.highWaterMark();
                         _throttleOn = true;
+                        _cb.highWaterMark();
                     }
                 } catch (Exception e) {
                     throw new RuntimeException("Exception during calling 
highWaterMark callback!", e);
@@ -200,8 +200,8 @@ public class DisruptorQueue implements IStatefulObject {
             if (_enableBackpressure && _cb != null && (_metrics.population() + 
_overflowCount.get()) >= _highWaterMark) {
                 try {
                     if (!_throttleOn) {
-                        _cb.highWaterMark();
                         _throttleOn = true;
+                        _cb.highWaterMark();
                     }
                 } catch (Exception e) {
                     throw new RuntimeException("Exception during calling 
highWaterMark callback!", e);
@@ -537,6 +537,10 @@ public class DisruptorQueue implements IStatefulObject {
         return this;
     }
 
+    public boolean getThrottleOn() {
+        return _throttleOn;
+    }
+
     //This method enables the metrics to be accessed from outside of the 
DisruptorQueue class
     public QueueMetrics getMetrics() {
         return _metrics;

Reply via email to