Repository: storm Updated Branches: refs/heads/master fc4ac8e2a -> f37a6bd99
STORM-2912 Revert optimization of sharing tick tuple * since it incurs side effect and messes metrics Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1b918e57 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1b918e57 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1b918e57 Branch: refs/heads/master Commit: 1b918e5764c67dfc6587c2fc64aae5d792f79a73 Parents: fc4ac8e Author: Jungtaek Lim <[email protected]> Authored: Thu Jan 25 15:56:53 2018 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Thu Jan 25 16:39:24 2018 +0900 ---------------------------------------------------------------------- .../src/jvm/org/apache/storm/executor/Executor.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/1b918e57/storm-client/src/jvm/org/apache/storm/executor/Executor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java index 3c39194..ed900dc 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java @@ -387,11 +387,15 @@ public abstract class Executor implements Callable, EventHandler<Object> { LOG.info("Timeouts disabled for executor {}:{}", componentId, executorId); } else { StormTimer timerTask = workerData.getUserTimer(); - TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs), - (int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID); - final List<AddressedTuple> tickTuple = - Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple)); - timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, () -> receiveQueue.publish(tickTuple)); + timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, () -> { + // We should create a new tick tuple for each recurrence instead of sharing object + // More detail on https://issues.apache.org/jira/browse/STORM-2912 + TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs), + (int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID); + final List<AddressedTuple> tickTuple = + Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple)); + receiveQueue.publish(tickTuple); + }); } } }
