Repository: storm Updated Branches: refs/heads/1.x-branch c335ad31d -> 458aa1cb6
STORM-2912 Revert optimization of sharing tick tuple * since it incurs side effect and messes metrics Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/923dcc56 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/923dcc56 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/923dcc56 Branch: refs/heads/1.x-branch Commit: 923dcc56097d050d8a9ace86b7842a6e0edd4da0 Parents: c335ad3 Author: Jungtaek Lim <[email protected]> Authored: Thu Jan 25 15:25:01 2018 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Thu Jan 25 16:44:19 2018 +0900 ---------------------------------------------------------------------- .../src/clj/org/apache/storm/daemon/executor.clj | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/923dcc56/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index ecbfb14..b9bcaae 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -361,13 +361,15 @@ (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS)) (= :spout (:type executor-data)))) (log-message "Timeouts disabled for executor " comp-id ":" (:executor-id executor-data)) - (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]] - (schedule-recurring - (:user-timer worker) - tick-time-secs - tick-time-secs - (fn [] - (disruptor/publish receive-queue val)))))))) + (schedule-recurring + (:user-timer worker) + tick-time-secs + tick-time-secs + (fn [] + ;; We should create a new tick tuple for each recurrence instead of sharing object + ;; More detail on https://issues.apache.org/jira/browse/STORM-2912 + (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]] + (disruptor/publish receive-queue val)))))))) (defn mk-executor [worker executor-id initial-credentials] (let [executor-data (mk-executor-data worker executor-id)
