Repository: storm Updated Branches: refs/heads/1.x-branch 49c2fc39f -> 33f543cf6
Adding backpressure timeout, backpressure znodes cleanup, Do not delete backpressure ephemeral node frequently Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dd04a556 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dd04a556 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dd04a556 Branch: refs/heads/1.x-branch Commit: dd04a5563317fa6f57d3d7ec32190940b98454d7 Parents: 58ae04b Author: Kishor Patil <[email protected]> Authored: Mon Jan 22 15:47:42 2018 -0500 Committer: Kishor Patil <[email protected]> Committed: Mon Jan 22 15:47:52 2018 -0500 ---------------------------------------------------------------------- conf/defaults.yaml | 2 + storm-core/src/clj/org/apache/storm/cluster.clj | 63 ++++++++++++++---- .../src/clj/org/apache/storm/daemon/worker.clj | 68 ++++++++++++-------- storm-core/src/jvm/org/apache/storm/Config.java | 17 +++++ .../test/clj/org/apache/storm/cluster_test.clj | 15 +++++ 5 files changed, 123 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/dd04a556/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index f89211b..2bd7855 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -194,6 +194,8 @@ task.backpressure.poll.secs: 30 topology.backpressure.enable: false backpressure.disruptor.high.watermark: 0.9 backpressure.disruptor.low.watermark: 0.4 +backpressure.znode.timeout.secs: 30 +backpressure.znode.update.freq.secs: 15 zmq.threads: 1 zmq.linger.millis: 5000 http://git-wip-us.apache.org/repos/asf/storm/blob/dd04a556/storm-core/src/clj/org/apache/storm/cluster.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj b/storm-core/src/clj/org/apache/storm/cluster.clj index 810b3c3..eafa40b 100644 --- a/storm-core/src/clj/org/apache/storm/cluster.clj +++ b/storm-core/src/clj/org/apache/storm/cluster.clj @@ -18,7 +18,8 @@ (:import [org.apache.zookeeper.data Stat ACL Id] [org.apache.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary LogConfig ProfileAction ProfileRequest NodeInfo] - [java.io Serializable]) + [java.io Serializable] + [java.nio ByteBuffer]) (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms]) (:import [org.apache.curator.framework CuratorFramework]) (:import [org.apache.storm.utils Utils]) @@ -80,7 +81,7 @@ (remove-worker-heartbeat! [this storm-id node port]) (supervisor-heartbeat! [this supervisor-id info]) (worker-backpressure! [this storm-id node port info]) - (topology-backpressure [this storm-id callback]) + (topology-backpressure [this storm-id timeout-ms callback]) (setup-backpressure! [this storm-id]) (remove-backpressure! [this storm-id]) (remove-worker-backpressure! [this storm-id node port]) @@ -172,6 +173,10 @@ [storm-id node port] (str (backpressure-storm-root storm-id) "/" node "-" port)) +(defn backpressure-full-path + [storm-id short-path] + (str (backpressure-storm-root storm-id) "/" short-path)) + (defn error-storm-root [storm-id] (str ERRORS-SUBTREE "/" storm-id)) @@ -242,6 +247,20 @@ :stats (get executor-stats t)}}))) (into {})))) + +(defn max-timestamp + "Reduces the timestamps (e.g. those set by worker-backpressure!) + to the most recent timestamp" + [cluster-state storm-id paths] + (reduce (fn [acc path] + (let [data (.get_data cluster-state (backpressure-full-path storm-id path) false) + timestamp (if data + (.. (ByteBuffer/wrap data) (getLong)) + 0)] + (Math/max acc timestamp))) + 0 + paths)) + ;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called. (defnk mk-storm-cluster-state [cluster-state-spec :acls nil :context (ClusterStateContext.)] @@ -483,27 +502,37 @@ (log-warn-error e "Could not teardown heartbeats for " storm-id)))) (worker-backpressure! - [this storm-id node port on?] - "if znode exists and to be not on?, delete; if exists and on?, do nothing; - if not exists and to be on?, create; if not exists and not on?, do nothing" + [this storm-id node port timestamp] + "If znode exists and timestamp is non-positive, ignore; + if exists and timestamp is larger than 0, update the timestamp; + if not exists and timestamp is larger than 0, create the znode and set the timestamp; + if not exists and timestamp is non-positive, do nothing." (let [path (backpressure-path storm-id node port) existed (.node_exists cluster-state path false)] (if existed - (if (not on?) - (.delete_node cluster-state path)) ;; delete the znode since the worker is not congested - (if on? - (.set_ephemeral_node cluster-state path nil acls))))) ;; create the znode since worker is congested + (if-not (<= timestamp 0) + (let [bytes (.. (ByteBuffer/allocate (Long/BYTES)) (putLong timestamp) (array))] + (.set_data cluster-state path bytes acls))) + (when timestamp + (let [bytes (.. (ByteBuffer/allocate (Long/BYTES)) (putLong timestamp) (array))] + (.set_ephemeral_node cluster-state path bytes acls)))))) ;; create the znode since worker is congested (topology-backpressure - [this storm-id callback] + [this storm-id timeout-ms callback] "if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off. + But if the backpresure/storm-id dir is not empty and has not been updated for more than timeoutMs, we treat it as throttle-off. + This will prevent the spouts from getting stuck indefinitely if something wrong happens. The backpressure/storm-id dir may not exist if nimbus has shutdown the topology" (when callback (swap! backpressure-callback assoc storm-id callback)) (let [path (backpressure-storm-root storm-id) children (if (.node_exists cluster-state path false) - (.get_children cluster-state path (not-nil? callback))) ] - (> (count children) 0))) + (.get_children cluster-state path (not-nil? callback))) + most-recent-backpressure (max-timestamp cluster-state storm-id children) + current-time (System/currentTimeMillis) + ret (> timeout-ms (- current-time most-recent-backpressure))] + (log-debug "topology backpressure is " (if ret "on" "off")) + ret)) (setup-backpressure! [this storm-id] @@ -511,14 +540,20 @@ (remove-backpressure! [this storm-id] - (.delete_node cluster-state (backpressure-storm-root storm-id))) + (try-cause + (.delete_node cluster-state (backpressure-storm-root storm-id)) + (catch KeeperException e + (log-warn-error e "Could not teardown backpressure for " storm-id)))) (remove-worker-backpressure! [this storm-id node port] (let [path (backpressure-path storm-id node port) existed (.node_exists cluster-state path false)] (if existed - (.delete_node cluster-state (backpressure-path storm-id node port))))) + (try-cause + (.delete_node cluster-state (backpressure-path storm-id node port)) + (catch KeeperException e + (log-warn-error e "Could not teardown backpressure for " storm-id)))))) (teardown-topology-errors! [this storm-id] http://git-wip-us.apache.org/repos/asf/storm/blob/dd04a556/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index 6626272..633a61d 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -131,30 +131,39 @@ (let [tuple (.getTuple addressed-tuple)] (.serialize serializer tuple)))) -(defn- mk-backpressure-handler [executors] - "make a handler that checks and updates worker's backpressure flag" - (disruptor/worker-backpressure-handler - (fn [worker] - (let [storm-id (:storm-id worker) - assignment-id (:assignment-id worker) - port (:port worker) - storm-cluster-state (:storm-cluster-state worker) - prev-backpressure-flag @(:backpressure worker) - ;; the backpressure flag is true if at least one of the disruptor queues has throttle-on - curr-backpressure-flag (if executors - (or (.getThrottleOn (:transfer-queue worker)) - (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))) - prev-backpressure-flag)] - ;; update the worker's backpressure flag to zookeeper only when it has changed - (when (not= prev-backpressure-flag curr-backpressure-flag) - (try - (log-debug "worker backpressure flag changing from " prev-backpressure-flag " to " curr-backpressure-flag) - (.worker-backpressure! storm-cluster-state storm-id assignment-id port curr-backpressure-flag) - ;; doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception - (reset! (:backpressure worker) curr-backpressure-flag) - (catch Exception exc - (log-error exc "workerBackpressure update failed when connecting to ZK ... will retry")))) - )))) +(defn should-trigger-backpressure [executors worker] + (or (.getThrottleOn (:transfer-queue worker)) + (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors)))) + +(defn- mk-backpressure-handler [executors topo-conf] + "make a handler that checks and updates worker's backpressure timestamp" + (let [update-freq-ms (* (topo-conf BACKPRESSURE-ZNODE-UPDATE-FREQ-SECS) 1000)] + (disruptor/worker-backpressure-handler + (if executors + (fn [worker] + (let [storm-id (:storm-id worker) + assignment-id (:assignment-id worker) + port (:port worker) + storm-cluster-state (:storm-cluster-state worker) + prev-backpressure-timestamp @(:backpressure worker) + curr-timestamp (System/currentTimeMillis) + ;; the backpressure flag is true if at least one of the disruptor queues has throttle-on + curr-backpressure-timestamp (if (should-trigger-backpressure executors worker) + ;; Update the backpressure timestamp every update-freq-ms seconds + (if (> (- curr-timestamp (or prev-backpressure-timestamp 0)) update-freq-ms) + curr-timestamp + prev-backpressure-timestamp) + 0)] + ;; update the worker's backpressure timestamp to zookeeper only when it has changed + (when (not= prev-backpressure-timestamp curr-backpressure-timestamp) + (try + (log-debug "worker backpressure timestamp changing from " prev-backpressure-timestamp " to " curr-backpressure-timestamp) + (.worker-backpressure! storm-cluster-state storm-id assignment-id port curr-backpressure-timestamp) + ;; doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception + (reset! (:backpressure worker) curr-backpressure-timestamp) + (catch Exception exc + (log-error exc "workerBackpressure update failed when connecting to ZK ... will retry")))))) + (fn [workers]))))) (defn- mk-disruptor-backpressure-handler [worker] "make a handler for the worker's send disruptor queue to @@ -317,7 +326,7 @@ :transfer-fn (mk-transfer-fn <>) :load-mapping (LoadMapping.) :assignment-versions assignment-versions - :backpressure (atom false) ;; whether this worker is going slow + :backpressure (atom 0) ;; whether this worker is going slow. non-positive means turning off backpressure :backpressure-trigger (Object.) ;; a trigger for synchronization with executors :throttle-on (atom false) ;; whether throttle is activated for spouts ))) @@ -647,15 +656,18 @@ _ (-> (.setHighWaterMark (:transfer-queue worker) ((:storm-conf worker) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK)) (.setLowWaterMark ((:storm-conf worker) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK)) (.setEnableBackpressure ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE))) - backpressure-handler (mk-backpressure-handler @executors) + backpressure-handler (mk-backpressure-handler @executors storm-conf) backpressure-thread (WorkerBackpressureThread. (:backpressure-trigger worker) worker backpressure-handler) _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) (.start backpressure-thread)) + ;; this callback is registered as a zk watch on topology's backpressure directory + ;; which makes sure that the topology's backpressure status is updated to the worker's throttle-on + backpressure-znode-timeout-ms (* (storm-conf BACKPRESSURE-ZNODE-TIMEOUT-SECS) 1000) topology-backpressure-callback (fn cb [& ignored] - (let [throttle-on (.topology-backpressure storm-cluster-state storm-id cb)] + (let [throttle-on (.topology-backpressure storm-cluster-state storm-id backpressure-znode-timeout-ms cb)] (reset! (:throttle-on worker) throttle-on))) _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) - (.topology-backpressure storm-cluster-state storm-id topology-backpressure-callback)) + (.topology-backpressure storm-cluster-state storm-id backpressure-znode-timeout-ms topology-backpressure-callback)) shutdown* (fn [] (log-message "Shutting down worker " storm-id " " assignment-id " " port) http://git-wip-us.apache.org/repos/asf/storm/blob/dd04a556/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 6b0c868..11f980e 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -1561,6 +1561,23 @@ public class Config extends HashMap<String, Object> { public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark"; /** + * How long until the backpressure znode is invalid. + * It's measured by the data (timestamp) of the znode, not the ctime (creation time) or mtime (modification time), etc. + * This must be larger than BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS. + */ + @isInteger + @isPositiveNumber + public static final String BACKPRESSURE_ZNODE_TIMEOUT_SECS = "backpressure.znode.timeout.secs"; + + /** + * How often will the data (timestamp) of backpressure znode be updated. + * But if the worker backpressure status (on/off) changes, the znode will be updated anyway. + */ + @isInteger + @isPositiveNumber + public static final String BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS = "backpressure.znode.update.freq.secs"; + + /** * A list of classes implementing IClusterMetricsConsumer (See storm.yaml.example for exact config format). * Each listed class will be routed cluster related metrics data. * Each listed class maps 1:1 to a ClusterMetricsConsumerExecutor and they're executed in Nimbus. http://git-wip-us.apache.org/repos/asf/storm/blob/dd04a556/storm-core/test/clj/org/apache/storm/cluster_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj index 55b686e..cb5f064 100644 --- a/storm-core/test/clj/org/apache/storm/cluster_test.clj +++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj @@ -319,3 +319,18 @@ (mk-storm-cluster-state {}) (verify-call-times-for mk-distributed-cluster-state 1) (verify-first-call-args-for-indices mk-distributed-cluster-state [4] nil)))) + +(deftest test-cluster-state-backpressure + (testing "Test that we can get topology backpressure." + (stubbing [zk/mkdirs nil + zk/mk-client (reify CuratorFramework (^void close [this] nil)) + mk-distributed-cluster-state (reify ClusterState + (get_data [this path watch?] (byte-array 10)) + (register [this callback] nil) + (mkdirs [this path acls] nil) + (node_exists [this path watch?] + (log-message "Running node_exists.") true) + (get_children [this path watch?] '("/foo/bar")))] + (let [cluster-state (mk-storm-cluster-state {})] + (.get_data (mk-distributed-cluster-state) "/foo/bar" false) + (topology-backpressure cluster-state "" 30 nil)))))
