Repository: storm
Updated Branches:
  refs/heads/1.1.x-branch 1daaaefbd -> 3225fd619


Merge branch 'STORM-2786-1.x' of https://github.com/revans2/incubator-storm 
into STORM-2786

STORM-2786: Enable tick tuples for ackers

This closes #2383


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3225fd61
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3225fd61
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3225fd61

Branch: refs/heads/1.1.x-branch
Commit: 3225fd619f4e6201a49ff835235fa35d262cbda3
Parents: 1daaaef
Author: Robert Evans <[email protected]>
Authored: Wed Oct 25 16:57:22 2017 -0500
Committer: Robert Evans <[email protected]>
Committed: Wed Oct 25 17:04:23 2017 -0500

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/executor.clj | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3225fd61/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 52063fc..66d1851 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -347,21 +347,22 @@
 
 (defn setup-ticks! [worker executor-data]
   (let [storm-conf (:storm-conf executor-data)
+        comp-id (:component-id executor-data)
         tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS)
         receive-queue (:receive-queue executor-data)
         context (:worker-context executor-data)]
     (when tick-time-secs
-      (if (or (Utils/isSystemId (:component-id executor-data))
+      (if (or (and (not= "__acker" comp-id) (Utils/isSystemId comp-id))
               (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
                    (= :spout (:type executor-data))))
-        (log-message "Timeouts disabled for executor " (:component-id 
executor-data) ":" (:executor-id executor-data))
-        (schedule-recurring
-          (:user-timer worker)
-          tick-time-secs
-          tick-time-secs
-          (fn []
-            (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST 
(TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID 
Constants/SYSTEM_TICK_STREAM_ID))]]
-              (disruptor/publish receive-queue val))))))))
+        (log-message "Timeouts disabled for executor " comp-id ":" 
(:executor-id executor-data))
+        (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. 
context [tick-time-secs] Constants/SYSTEM_TASK_ID 
Constants/SYSTEM_TICK_STREAM_ID))]]
+          (schedule-recurring
+            (:user-timer worker)
+            tick-time-secs
+            tick-time-secs
+            (fn []
+                (disruptor/publish receive-queue val))))))))
 
 (defn mk-executor [worker executor-id initial-credentials]
   (let [executor-data (mk-executor-data worker executor-id)

Reply via email to