Repository: storm Updated Branches: refs/heads/master 6cf8a9c99 -> 97e131d5a
[STORM-1696] status not sync if zk fails in backpressure Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a70195d7 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a70195d7 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a70195d7 Branch: refs/heads/master Commit: a70195d717e1ed179425a0305b2e4279afeb6118 Parents: 6415863 Author: zhuoliu <[email protected]> Authored: Wed Apr 13 13:04:57 2016 -0500 Committer: zhuoliu <[email protected]> Committed: Wed Apr 13 13:04:57 2016 -0500 ---------------------------------------------------------------------- .../src/clj/org/apache/storm/daemon/worker.clj | 19 +++++++++++-------- .../org/apache/storm/utils/DisruptorQueue.java | 4 ++++ 2 files changed, 15 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a70195d7/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index 8d30948..883630b 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -145,16 +145,19 @@ assignment-id (:assignment-id worker) port (:port worker) storm-cluster-state (:storm-cluster-state worker) - prev-backpressure-flag @(:backpressure worker)] - (when executors - (reset! (:backpressure worker) - (or @(:transfer-backpressure worker) - (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))))) + prev-backpressure-flag @(:backpressure worker) + ;; the backpressure flag is true if at least one of the disruptor queues has throttle-on + curr-backpressure-flag (if executors + (or (.getThrottleOn (:transfer-queue worker)) + (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))) + prev-backpressure-flag)] ;; update the worker's backpressure flag to zookeeper only when it has changed - (log-debug "BP " @(:backpressure worker) " WAS " prev-backpressure-flag) - (when (not= prev-backpressure-flag @(:backpressure worker)) + (when (not= prev-backpressure-flag curr-backpressure-flag) (try - (.workerBackpressure storm-cluster-state storm-id assignment-id (long port) @(:backpressure worker)) + (log-debug "worker backpressure flag changing from " prev-backpressure-flag " to " curr-backpressure-flag) + (.workerBackpressure storm-cluster-state storm-id assignment-id (long port) curr-backpressure-flag) + ;; doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception + (reset! (:backpressure worker) curr-backpressure-flag) (catch Exception exc (log-error exc "workerBackpressure update failed when connecting to ZK ... will retry")))) )))) http://git-wip-us.apache.org/repos/asf/storm/blob/a70195d7/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index 4482297..d310337 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -544,4 +544,8 @@ public class DisruptorQueue implements IStatefulObject { public QueueMetrics getMetrics() { return _metrics; } + + public boolean getThrottleOn() { + return _throttleOn; + } }
