Repository: storm Updated Branches: refs/heads/1.1.x-branch 1daaaefbd -> 3225fd619
Merge branch 'STORM-2786-1.x' of https://github.com/revans2/incubator-storm into STORM-2786 STORM-2786: Enable tick tuples for ackers This closes #2383 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3225fd61 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3225fd61 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3225fd61 Branch: refs/heads/1.1.x-branch Commit: 3225fd619f4e6201a49ff835235fa35d262cbda3 Parents: 1daaaef Author: Robert Evans <[email protected]> Authored: Wed Oct 25 16:57:22 2017 -0500 Committer: Robert Evans <[email protected]> Committed: Wed Oct 25 17:04:23 2017 -0500 ---------------------------------------------------------------------- .../src/clj/org/apache/storm/daemon/executor.clj | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/3225fd61/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 52063fc..66d1851 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -347,21 +347,22 @@ (defn setup-ticks! [worker executor-data] (let [storm-conf (:storm-conf executor-data) + comp-id (:component-id executor-data) tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS) receive-queue (:receive-queue executor-data) context (:worker-context executor-data)] (when tick-time-secs - (if (or (Utils/isSystemId (:component-id executor-data)) + (if (or (and (not= "__acker" comp-id) (Utils/isSystemId comp-id)) (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS)) (= :spout (:type executor-data)))) - (log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data)) - (schedule-recurring - (:user-timer worker) - tick-time-secs - tick-time-secs - (fn [] - (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]] - (disruptor/publish receive-queue val)))))))) + (log-message "Timeouts disabled for executor " comp-id ":" (:executor-id executor-data)) + (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]] + (schedule-recurring + (:user-timer worker) + tick-time-secs + tick-time-secs + (fn [] + (disruptor/publish receive-queue val)))))))) (defn mk-executor [worker executor-id initial-credentials] (let [executor-data (mk-executor-data worker executor-id)
