Repository: storm
Updated Branches:
  refs/heads/1.x-branch 38e308e3a -> 502ed7e7a


STORM-2786: Enable tick tuples for ackers


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/da041895
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/da041895
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/da041895

Branch: refs/heads/1.x-branch
Commit: da041895a80d1691d1af95ddc47b9a35999ad917
Parents: 9aff9f9
Author: Robert (Bobby) Evans <[email protected]>
Authored: Mon Oct 23 14:03:37 2017 -0500
Committer: Robert (Bobby) Evans <[email protected]>
Committed: Mon Oct 23 14:03:37 2017 -0500

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/executor.clj | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/da041895/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 52063fc..66d1851 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -347,21 +347,22 @@
 
 (defn setup-ticks! [worker executor-data]
   (let [storm-conf (:storm-conf executor-data)
+        comp-id (:component-id executor-data)
         tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS)
         receive-queue (:receive-queue executor-data)
         context (:worker-context executor-data)]
     (when tick-time-secs
-      (if (or (Utils/isSystemId (:component-id executor-data))
+      (if (or (and (not= "__acker" comp-id) (Utils/isSystemId comp-id))
               (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
                    (= :spout (:type executor-data))))
-        (log-message "Timeouts disabled for executor " (:component-id 
executor-data) ":" (:executor-id executor-data))
-        (schedule-recurring
-          (:user-timer worker)
-          tick-time-secs
-          tick-time-secs
-          (fn []
-            (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST 
(TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID 
Constants/SYSTEM_TICK_STREAM_ID))]]
-              (disruptor/publish receive-queue val))))))))
+        (log-message "Timeouts disabled for executor " comp-id ":" 
(:executor-id executor-data))
+        (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. 
context [tick-time-secs] Constants/SYSTEM_TASK_ID 
Constants/SYSTEM_TICK_STREAM_ID))]]
+          (schedule-recurring
+            (:user-timer worker)
+            tick-time-secs
+            tick-time-secs
+            (fn []
+                (disruptor/publish receive-queue val))))))))
 
 (defn mk-executor [worker executor-id initial-credentials]
   (let [executor-data (mk-executor-data worker executor-id)

Reply via email to