Repository: storm Updated Branches: refs/heads/1.x-branch f0abfff92 -> 53e1ab0c6
STORM-1549: Add support for resetting tuple timeout from bolts via the OutputCollector Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/406052cd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/406052cd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/406052cd Branch: refs/heads/1.x-branch Commit: 406052cdc7138046a79104fc5c6f72212415f7f2 Parents: 5fd3121 Author: Stig Rohde Døssing <[email protected]> Authored: Sun Feb 14 02:39:42 2016 +0100 Committer: Stig Rohde Døssing <[email protected]> Committed: Sun Feb 14 15:17:37 2016 +0100 ---------------------------------------------------------------------- storm-core/src/clj/org/apache/storm/clojure.clj | 3 ++ .../src/clj/org/apache/storm/daemon/acker.clj | 10 +++- .../src/clj/org/apache/storm/daemon/common.clj | 9 +++- .../clj/org/apache/storm/daemon/executor.clj | 10 ++++ .../storm/coordination/CoordinatedBolt.java | 4 ++ .../org/apache/storm/task/IOutputCollector.java | 1 + .../org/apache/storm/task/OutputCollector.java | 10 ++++ .../storm/topology/BasicOutputCollector.java | 4 ++ .../storm/topology/IBasicOutputCollector.java | 2 + .../trident/topology/TridentBoltExecutor.java | 4 ++ .../org/apache/storm/integration_test.clj | 51 ++++++++++++++++++-- 11 files changed, 103 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/clj/org/apache/storm/clojure.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/clojure.clj b/storm-core/src/clj/org/apache/storm/clojure.clj index ff33829..6e88cb6 100644 --- a/storm-core/src/clj/org/apache/storm/clojure.clj +++ b/storm-core/src/clj/org/apache/storm/clojure.clj @@ -173,6 +173,9 @@ (defn fail! [collector ^Tuple tuple] (.fail ^OutputCollector (:output-collector collector) tuple)) +(defn reset-timeout! [collector ^Tuple tuple] + (.resetTimeout ^OutputCollector (:output-collector collector) tuple)) + (defn report-error! [collector ^Tuple tuple] (.reportError ^OutputCollector (:output-collector collector) tuple)) http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/clj/org/apache/storm/daemon/acker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj index 7c4d614..7c29f46 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj @@ -30,6 +30,7 @@ (def ACKER-INIT-STREAM-ID "__ack_init") (def ACKER-ACK-STREAM-ID "__ack_ack") (def ACKER-FAIL-STREAM-ID "__ack_fail") +(def ACKER-RESET-TIMEOUT-STREAM-ID "__ack_reset_timeout") (defn- update-ack [curr-entry val] (let [old (get curr-entry :val 0)] @@ -61,7 +62,8 @@ (update-ack (.getValue tuple 1)) (assoc :spout-task (.getValue tuple 2))) ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1)) - ACKER-FAIL-STREAM-ID (assoc curr :failed true))] + ACKER-FAIL-STREAM-ID (assoc curr :failed true) + ACKER-RESET-TIMEOUT-STREAM-ID curr)] (.put pending id curr) (when (and curr (:spout-task curr)) (cond (= 0 (:val curr)) @@ -80,6 +82,12 @@ ACKER-FAIL-STREAM-ID [id] )) + (= stream-id ACKER-RESET-TIMEOUT-STREAM-ID) + (acker-emit-direct output-collector + (:spout-task curr) + ACKER-RESET-TIMEOUT-STREAM-ID + [id] + ) )) (.ack output-collector tuple) )))) http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/clj/org/apache/storm/daemon/common.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj index c1e261f..55bc030 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/common.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj @@ -48,6 +48,7 @@ (def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID) (def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID) (def ACKER-FAIL-STREAM-ID acker/ACKER-FAIL-STREAM-ID) +(def ACKER-RESET-TIMEOUT-STREAM-ID acker/ACKER-RESET-TIMEOUT-STREAM-ID) (def SYSTEM-STREAM-ID "__system") @@ -195,7 +196,8 @@ bolt-inputs (apply merge (for [id bolt-ids] {[id ACKER-ACK-STREAM-ID] ["id"] - [id ACKER-FAIL-STREAM-ID] ["id"]} + [id ACKER-FAIL-STREAM-ID] ["id"] + [id ACKER-RESET-TIMEOUT-STREAM-ID] ["id"]} ))] (merge spout-inputs bolt-inputs))) @@ -221,6 +223,7 @@ (new org.apache.storm.daemon.acker) {ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"]) ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"]) + ACKER-RESET-TIMEOUT-STREAM-ID (thrift/direct-output-fields ["id"]) } :p num-executors :conf {TOPOLOGY-TASKS num-executors @@ -230,6 +233,7 @@ (do (.put_to_streams common ACKER-ACK-STREAM-ID (thrift/output-fields ["id" "ack-val"])) (.put_to_streams common ACKER-FAIL-STREAM-ID (thrift/output-fields ["id"])) + (.put_to_streams common ACKER-RESET-TIMEOUT-STREAM-ID (thrift/output-fields ["id"])) )) (dofor [[_ spout] (.get_spouts ret) :let [common (.get_common spout) @@ -246,6 +250,9 @@ (.put_to_inputs common (GlobalStreamId. ACKER-COMPONENT-ID ACKER-FAIL-STREAM-ID) (thrift/mk-direct-grouping)) + (.put_to_inputs common + (GlobalStreamId. ACKER-COMPONENT-ID ACKER-RESET-TIMEOUT-STREAM-ID) + (thrift/mk-direct-grouping)) )) (.put_to_bolts ret "__acker" acker-bolt) )) http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 251387b..0d48548 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -519,6 +519,11 @@ spout-obj (:object task-data)] (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj (.getValue tuple 0)))) + ACKER-RESET-TIMEOUT-STREAM-ID + (let [id (.getValue tuple 0) + pending-for-id (.get pending id)] + (when pending-for-id + (.put pending id pending-for-id))) (let [id (.getValue tuple 0) [stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)] (when spout-id @@ -828,6 +833,11 @@ (.getSourceComponent tuple) (.getSourceStreamId tuple) delta)))) + (^void resetTimeout [this ^Tuple tuple] + (fast-list-iter [root (.. tuple getMessageId getAnchors)] + (task/send-unanchored task-data + ACKER-RESET-TIMEOUT-STREAM-ID + [root]))) (reportError [this error] (report-error error) ))))) http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java b/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java index ee66b09..15ac5e2 100644 --- a/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java +++ b/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java @@ -124,6 +124,10 @@ public class CoordinatedBolt implements IRichBolt { checkFinishId(tuple, TupleType.REGULAR); _delegate.fail(tuple); } + + public void resetTimeout(Tuple tuple) { + _delegate.resetTimeout(tuple); + } public void reportError(Throwable error) { _delegate.reportError(error); http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java b/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java index cbbe108..cda4d9f 100644 --- a/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java +++ b/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java @@ -29,4 +29,5 @@ public interface IOutputCollector extends IErrorReporter { void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple); void ack(Tuple input); void fail(Tuple input); + void resetTimeout(Tuple input); } http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java index e6e54ac..071d8aa 100644 --- a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java +++ b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java @@ -218,6 +218,16 @@ public class OutputCollector implements IOutputCollector { _delegate.fail(input); } + /** + * Resets the message timeout for any tuple trees to which the given tuple belongs. + * The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS. + * @param input the tuple to reset timeout for + */ + @Override + public void resetTimeout(Tuple input) { + _delegate.resetTimeout(input); + } + @Override public void reportError(Throwable error) { _delegate.reportError(error); http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java index cedc7c9..343c349 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java +++ b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java @@ -52,6 +52,10 @@ public class BasicOutputCollector implements IBasicOutputCollector { emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple); } + public void resetTimeout(Tuple tuple){ + out.resetTimeout(tuple); + } + protected IOutputCollector getOutputter() { return out; } http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java b/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java index 60da48a..7b7c9fc 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java +++ b/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java @@ -18,10 +18,12 @@ package org.apache.storm.topology; import org.apache.storm.task.IErrorReporter; +import org.apache.storm.tuple.Tuple; import java.util.List; public interface IBasicOutputCollector extends IErrorReporter{ List<Integer> emit(String streamId, List<Object> tuple); void emitDirect(int taskId, String streamId, List<Object> tuple); + void resetTimeout(Tuple tuple); } http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java index d85d217..41feb12 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java +++ b/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java @@ -180,6 +180,10 @@ public class TridentBoltExecutor implements IRichBolt { public void fail(Tuple tuple) { throw new IllegalStateException("Method should never be called"); } + + public void resetTimeout(Tuple tuple) { + throw new IllegalStateException("Method should never be called"); + } public void reportError(Throwable error) { _delegate.reportError(error); http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/test/clj/integration/org/apache/storm/integration_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj index cd2bc26..238e0db 100644 --- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj +++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj @@ -20,6 +20,7 @@ (:import [org.apache.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions]) (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout]) + (:import [org.apache.storm.utils Time]) (:import [org.apache.storm.tuple Fields]) (:use [org.apache.storm testing config clojure util]) (:use [org.apache.storm.daemon common]) @@ -83,9 +84,18 @@ (ack! collector tuple) )))))) -(defn assert-loop [afn ids] - (while (not (every? afn ids)) - (Thread/sleep 1))) +(defn assert-loop +([afn ids] (assert-loop afn ids 10)) +([afn ids timeout-secs] + (loop [remaining-time (* timeout-secs 1000)] + (let [start-time (System/currentTimeMillis) + assertion-is-true (every? afn ids)] + (if (or assertion-is-true (neg? remaining-time)) + (is assertion-is-true) + (do + (Thread/sleep 1) + (recur (- remaining-time (- (System/currentTimeMillis) start-time))) + )))))) (defn assert-acked [tracker & ids] (assert-loop #(.isAcked tracker %) ids)) @@ -116,6 +126,41 @@ (assert-failed tracker 2) ))) +(defbolt extend-timeout-twice {} {:prepare true} + [conf context collector] + (let [state (atom -1)] + (bolt + (execute [tuple] + (do + (Time/sleep (* 8 1000)) + (reset-timeout! collector tuple) + (Time/sleep (* 8 1000)) + (reset-timeout! collector tuple) + (Time/sleep (* 8 1000)) + (ack! collector tuple) + ))))) + +(deftest test-reset-timeout + (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}] + (let [feeder (feeder-spout ["field1"]) + tracker (AckFailMapTracker.) + _ (.setAckFailDelegate feeder tracker) + topology (thrift/mk-topology + {"1" (thrift/mk-spout-spec feeder)} + {"2" (thrift/mk-bolt-spec {"1" :global} extend-timeout-twice)})] + (submit-local-topology (:nimbus cluster) + "timeout-tester" + {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10} + topology) + (advance-cluster-time cluster 11) + (.feed feeder ["a"] 1) + (advance-cluster-time cluster 21) + (is (not (.isFailed tracker 1))) + (is (not (.isAcked tracker 1))) + (advance-cluster-time cluster 5) + (assert-acked tracker 1) + ))) + (defn mk-validate-topology-1 [] (thrift/mk-topology {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
