STORM-1731 Avoid looking up debug / backpressure enable flags within critical path
* preload the value of flags and use that value * topology.debug * topology.backpressure.enable * also remove unnecessary lookup: receive-queue Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7a15ebc5 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7a15ebc5 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7a15ebc5 Branch: refs/heads/1.x-branch Commit: 7a15ebc57300a027f10a65046c85e9d1d0bef4dc Parents: ae78815 Author: Jungtaek Lim <kabh...@gmail.com> Authored: Tue Apr 26 19:29:44 2016 +0900 Committer: Jungtaek Lim <kabh...@gmail.com> Committed: Tue Apr 26 19:29:44 2016 +0900 ---------------------------------------------------------------------- .../clj/org/apache/storm/daemon/executor.clj | 50 +++++++++----------- 1 file changed, 23 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/7a15ebc5/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 9ea4eb4..07925b8 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -214,12 +214,12 @@ ;; in its own function so that it can be mocked out by tracked topologies (defn mk-executor-transfer-fn [batch-transfer->worker storm-conf] - (fn this - [task tuple] - (let [val (AddressedTuple. task tuple)] - (when (= true (storm-conf TOPOLOGY-DEBUG)) - (log-message "TRANSFERING tuple " val)) - (disruptor/publish batch-transfer->worker val)))) + (let [debug? (= true (storm-conf TOPOLOGY-DEBUG))] + (fn this + [task tuple] + (let [val (AddressedTuple. task tuple)] + (when debug? (log-message "TRANSFERING tuple " val)) + (disruptor/publish batch-transfer->worker val))))) (defn mk-executor-data [worker executor-id] (let [worker-context (worker-context worker) @@ -426,24 +426,22 @@ (log-message "Shut down executor " component-id ":" (pr-str executor-id))) ))) -(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id] +(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id debug?] (let [^ISpout spout (:object task-data) storm-conf (:storm-conf executor-data) task-id (:task-id task-data)] ;;TODO: need to throttle these when there's lots of failures - (when (= true (storm-conf TOPOLOGY-DEBUG)) + (when debug? (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id)) (.fail spout msg-id) (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta)) (when time-delta (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta)))) -(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id] - (let [storm-conf (:storm-conf executor-data) - ^ISpout spout (:object task-data) +(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?] + (let [^ISpout spout (:object task-data) task-id (:task-id task-data)] - (when (= true (storm-conf TOPOLOGY-DEBUG)) - (log-message "SPOUT Acking message " id " " msg-id)) + (when debug? (log-message "SPOUT Acking message " id " " msg-id)) (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta @@ -501,13 +499,14 @@ rand (Random. (Utils/secureRandomLong)) ^DisruptorQueue transfer-queue (executor-data :batch-transfer-queue) debug? (= true (storm-conf TOPOLOGY-DEBUG)) + backpressure-enabled? (= true (storm-conf TOPOLOGY-BACKPRESSURE-ENABLE)) pending (RotatingMap. 2 ;; microoptimize for performance of .size method (reify RotatingMap$ExpiredCallback (expire [this id [task-id spout-id tuple-info start-time-ms]] (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))] - (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta "TIMEOUT" id) + (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta "TIMEOUT" id debug?) )))) tuple-action-fn (fn [task-id ^TupleImpl tuple] (let [stream-id (.getSourceStreamId tuple)] @@ -532,9 +531,9 @@ (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))] (condp = stream-id ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id) - spout-id tuple-finished-info time-delta id) + spout-id tuple-finished-info time-delta id debug?) ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id) - spout-id tuple-finished-info time-delta "FAIL-STREAM" id) + spout-id tuple-finished-info time-delta "FAIL-STREAM" id debug?) ))) ;; TODO: on failure, emit tuple to failure stream )))) @@ -590,7 +589,7 @@ (when message-id (ack-spout-msg executor-data task-data message-id {:stream out-stream-id :values values} - (if (sampler) 0) "0:"))) + (if (sampler) 0) "0:" debug?))) (or out-tasks []) ))]] (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data)) @@ -627,8 +626,7 @@ (let [active? @(:storm-active-atom executor-data) curr-count (.get emitted-count) - backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE) - throttle-on (and backpressure-enabled + throttle-on (and backpressure-enabled? @(:throttle-on (:worker executor-data))) reached-max-spout-pending (and max-spout-pending (>= (.size pending) max-spout-pending)) @@ -685,12 +683,12 @@ (.put pending key (bit-xor curr id)))) (defmethod mk-threads :bolt [executor-data task-datas initial-credentials] - (let [storm-conf (:storm-conf executor-data) + (let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler + open-or-prepare-was-called?]} executor-data execute-sampler (mk-stats-sampler storm-conf) executor-stats (:stats executor-data) - {:keys [storm-conf component-id worker-context transfer-fn report-error sampler - open-or-prepare-was-called?]} executor-data rand (Random. (Utils/secureRandomLong)) + debug? (= true (storm-conf TOPOLOGY-DEBUG)) tuple-action-fn (fn [task-id ^TupleImpl tuple] ;; synchronization needs to be done with a key provided by this bolt, otherwise: @@ -722,15 +720,14 @@ user-context (:user-context task-data) sampler? (sampler) execute-sampler? (execute-sampler) - now (if (or sampler? execute-sampler?) (System/currentTimeMillis)) - receive-queue (:receive-queue executor-data)] + now (if (or sampler? execute-sampler?) (System/currentTimeMillis))] (when sampler? (.setProcessSampleStartTime tuple now)) (when execute-sampler? (.setExecuteSampleStartTime tuple now)) (.execute bolt-obj tuple) (let [delta (tuple-execute-time-delta! tuple)] - (when (= true (storm-conf TOPOLOGY-DEBUG)) + (when debug? (log-message "Execute done TUPLE " tuple " TASK: " task-id " DELTA: " delta)) (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta)) @@ -808,8 +805,7 @@ (task/send-unanchored task-data ACKER-ACK-STREAM-ID [root (bit-xor id ack-val)]))) - (let [delta (tuple-time-delta! tuple) - debug? (= true (storm-conf TOPOLOGY-DEBUG))] + (let [delta (tuple-time-delta! tuple)] (when debug? (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple)) (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))