Repository: storm
Updated Branches:
  refs/heads/1.x-branch ef73328df -> c6324246f


STORM-2535: Replace test-reset-timeout with a more reliable test


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6a41b22b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6a41b22b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6a41b22b

Branch: refs/heads/1.x-branch
Commit: 6a41b22bd7b633653633b868bcd4ca9cb8b858f5
Parents: c70d7d4
Author: Stig Rohde Døssing <[email protected]>
Authored: Thu May 25 22:32:42 2017 +0200
Committer: Stig Rohde Døssing <[email protected]>
Committed: Sat Nov 11 13:30:56 2017 +0100

----------------------------------------------------------------------
 .../org/apache/storm/integration_test.clj       | 53 ++++++++++++++------
 1 file changed, 37 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6a41b22b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git 
a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj 
b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index 775949e..a147713 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -129,19 +129,25 @@
       (advance-cluster-time cluster 12)
       (assert-failed tracker 2)
       )))
-
-(defbolt extend-timeout-twice {} {:prepare true}
+      
+(defbolt reset-timeout-bolt {} {:prepare true}
   [conf context collector]
-  (let [state (atom -1)]
+  (let [tuple-counter (atom 1)
+        first-tuple (atom nil)]
     (bolt
       (execute [tuple]
-        (do
-          (Time/sleep (* 8 1000))
-          (reset-timeout! collector tuple)
-          (Time/sleep (* 8 1000))
-          (reset-timeout! collector tuple)
-          (Time/sleep (* 8 1000))
-          (ack! collector tuple)
+        (do 
+          (condp = @tuple-counter
+            1 (reset! first-tuple tuple)
+            2 (reset-timeout! collector @first-tuple)
+            5 (do 
+                (ack! collector @first-tuple)
+                (ack! collector tuple)
+              )
+            (do 
+              (reset-timeout! collector @first-tuple)
+              (ack! collector tuple)))
+          (swap! tuple-counter inc)
         )))))
 
 (deftest test-reset-timeout
@@ -151,18 +157,33 @@
           _ (.setAckFailDelegate feeder tracker)
           topology (thrift/mk-topology
                      {"1" (thrift/mk-spout-spec feeder)}
-                     {"2" (thrift/mk-bolt-spec {"1" :global} 
extend-timeout-twice)})]
+                     {"2" (thrift/mk-bolt-spec {"1" :global} 
reset-timeout-bolt)})]
     (submit-local-topology (:nimbus cluster)
                            "timeout-tester"
                            {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
                            topology)
-    (advance-cluster-time cluster 11)
+    ;The first tuple will be used to check timeout reset
     (.feed feeder ["a"] 1)
-    (advance-cluster-time cluster 21)
+    ;The second tuple is used to wait for the spout to rotate the spout's 
pending map
+    (.feed feeder ["b"] 2)
+    (advance-cluster-time cluster 9)
+    ;The other tuples are used to reset the first tuple's timeout,
+    ;and to wait for the message to get through to the spout (acks use the 
same path as timeout resets)
+    (.feed feeder ["c"] 3)
+    (assert-acked tracker 3)
+    (advance-cluster-time cluster 9)
+    (.feed feeder ["d"], 4)
+    (assert-acked tracker 4)
+    (advance-cluster-time cluster 2)
+    ;The time is now twice the message timeout, the second tuple should expire 
since it was not acked
+    ;Waiting for this also ensures that the first tuple gets failed if 
reset-timeout doesn't work
+    (assert-failed tracker 2)
+    ;Put in a tuple to cause the first tuple to be acked
+    (.feed feeder ["e"], 5)
+    (assert-acked tracker 5)
+    ;The first tuple should be acked and should not have failed
     (is (not (.isFailed tracker 1)))
-    (is (not (.isAcked tracker 1)))
-    (advance-cluster-time cluster 5)
-    (assert-acked tracker 1)
+    (is (.isAcked tracker 1))
     )))
 
 (defn mk-validate-topology-1 []

Reply via email to