Repository: storm Updated Branches: refs/heads/1.0.x-branch ec6eef73f -> f57a49848
STORM-2912 Revert optimization of sharing tick tuple * since it incurs side effect and messes metrics Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f57a4984 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f57a4984 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f57a4984 Branch: refs/heads/1.0.x-branch Commit: f57a498486e852520e2cda80dd694542a37c2479 Parents: ec6eef7 Author: Jungtaek Lim <[email protected]> Authored: Thu Jan 25 15:25:01 2018 +0900 Committer: P. Taylor Goetz <[email protected]> Committed: Fri Jan 26 13:19:01 2018 -0500 ---------------------------------------------------------------------- .../src/clj/org/apache/storm/daemon/executor.clj | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/f57a4984/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index a630dab..e0c048f 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -356,13 +356,15 @@ (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS)) (= :spout (:type executor-data)))) (log-message "Timeouts disabled for executor " comp-id ":" (:executor-id executor-data)) - (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]] - (schedule-recurring - (:user-timer worker) - tick-time-secs - tick-time-secs - (fn [] - (disruptor/publish receive-queue val)))))))) + (schedule-recurring + (:user-timer worker) + tick-time-secs + tick-time-secs + (fn [] + ;; We should create a new tick tuple for each recurrence instead of sharing object + ;; More detail on https://issues.apache.org/jira/browse/STORM-2912 + (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]] + (disruptor/publish receive-queue val)))))))) (defn mk-executor [worker executor-id initial-credentials] (let [executor-data (mk-executor-data worker executor-id)
