Repository: storm
Updated Branches:
  refs/heads/master fc4ac8e2a -> f37a6bd99


STORM-2912 Revert optimization of sharing tick tuple

* since it incurs side effect and messes metrics


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1b918e57
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1b918e57
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1b918e57

Branch: refs/heads/master
Commit: 1b918e5764c67dfc6587c2fc64aae5d792f79a73
Parents: fc4ac8e
Author: Jungtaek Lim <[email protected]>
Authored: Thu Jan 25 15:56:53 2018 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Thu Jan 25 16:39:24 2018 +0900

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/executor/Executor.java   | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1b918e57/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java 
b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 3c39194..ed900dc 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -387,11 +387,15 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
                 LOG.info("Timeouts disabled for executor {}:{}", componentId, 
executorId);
             } else {
                 StormTimer timerTask = workerData.getUserTimer();
-                TupleImpl tuple = new TupleImpl(workerTopologyContext, new 
Values(tickTimeSecs),
-                    (int) Constants.SYSTEM_TASK_ID, 
Constants.SYSTEM_TICK_STREAM_ID);
-                final List<AddressedTuple> tickTuple =
-                    Lists.newArrayList(new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
-                timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, () -> 
receiveQueue.publish(tickTuple));
+                timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, () -> {
+                    // We should create a new tick tuple for each recurrence 
instead of sharing object
+                    // More detail on 
https://issues.apache.org/jira/browse/STORM-2912
+                    TupleImpl tuple = new TupleImpl(workerTopologyContext, new 
Values(tickTimeSecs),
+                            (int) Constants.SYSTEM_TASK_ID, 
Constants.SYSTEM_TICK_STREAM_ID);
+                    final List<AddressedTuple> tickTuple =
+                            Lists.newArrayList(new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
+                    receiveQueue.publish(tickTuple);
+                });
             }
         }
     }

Reply via email to