[STORM-537] Removed overcomplicated tests
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dee91cfa Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dee91cfa Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dee91cfa Branch: refs/heads/security Commit: dee91cfa3f73cdd3da6921593a71ec60ae304091 Parents: 1d251e9 Author: Sergey Tryuber <[email protected]> Authored: Wed Nov 5 09:02:10 2014 +0300 Committer: Sergey Tryuber <[email protected]> Committed: Wed Nov 5 09:02:10 2014 +0300 ---------------------------------------------------------------------- .../backtype/storm/messaging/netty/Client.java | 1 + .../storm/messaging/netty_unit_test.clj | 146 ------------------- 2 files changed, 1 insertion(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/dee91cfa/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java index b7db4a3..c516b63 100644 --- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java +++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java @@ -142,6 +142,7 @@ public class Client implements IConnection { } int tried = 0; + //setting channel to null to make sure we throw an exception when reconnection fails channel = null; while (tried <= max_retries) { http://git-wip-us.apache.org/repos/asf/storm/blob/dee91cfa/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj index 687c3d5..dac7fe3 100644 --- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj +++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj @@ -131,149 +131,3 @@ (.close client) (.close server) (.term context))) - - -(deftest test-reconnect-to-permanently-failed-server - "Tests that if connection to a server already established and server fails, then - Client#connect() throws an exception" - (let [dummy_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz") - poison_msg (String. "kill_the_server") - storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context" - STORM-MESSAGING-NETTY-BUFFER-SIZE 1024 - STORM-MESSAGING-NETTY-MAX-RETRIES 5 ; just to decrease test duration - STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 - STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000 - STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1 - STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1 - ;critical for this test - ; STORM-NETTY-MESSAGE-BATCH-SIZE 1 - } - context (TransportFactory/makeContext storm-conf) - client (.connect context nil "localhost" port) - server_fn (fn [] - (let [server (.bind context nil port) - poll (fn [] - (let [iter (.recv server 0 0) - result ()] - (if (nil? iter) () (iterator-seq iter)) - )) - process_msg (fn [msg] - (let [msg_body (String. (.message msg))] - (if (= poison_msg msg_body) - (do (print "Received a poison...") - true) - (do (is (= dummy_msg msg_body)) - (println (str "Received: " msg_body)) - (Thread/sleep 100) - false)) - )) - ] - (loop [need_exit false] - (when (or (false? need_exit) (nil? need_exit)) - (recur (some true? (map process_msg (poll)))))) - ; (recur (some true? (poll))))) - (.close server) - (println "SERVER CLOSED") - )) - stop_server (fn [server_future] - (.send client task (.getBytes poison_msg)) - (if (= "timeout" (deref server_future 5000 "timeout")) - (do - ;Note, that this does not stop Server thread - ;because of ignoring InterruptedException in Server#recv (what is strange) - (future-cancel server_future) - (throw (RuntimeException. "Error. Server didn't stop as we asked it.")) - )) - ) - server_1 (future (server_fn)) - _ (println "Let the client connect to a server initially") - _ (.send client task (.getBytes dummy_msg)) - _ (println "Permanently stopping the server") - _ (stop_server server_1) - _ (println "Sending batch of messages to the dead server") - batch (future (.send client (.iterator [(TaskMessage. task (.getBytes dummy_msg)) - (TaskMessage. task (.getBytes dummy_msg)) - (TaskMessage. task (.getBytes dummy_msg)) - (TaskMessage. task (.getBytes dummy_msg))]))) - _ (is - (thrown-cause-with-msg? ExecutionException #".*Remote address is not reachable\. We will close this client.*" - (deref batch (* 2 (* (get storm-conf STORM-MESSAGING-NETTY-MAX-RETRIES) (get storm-conf STORM-MESSAGING-NETTY-MIN-SLEEP-MS))) "timeout"))) - _ (future-cancel batch) - ] - (.close client) - (.term context))) - -(deftest test-reconnect-to-temporarily-failed-server - "Tests that if connection to a server already established and server TEMPORARILY fails, then - Client#connect() succeeds after several re-tries" - (let [dummy_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz") - poison_msg (String. "kill_the_server") - storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context" - STORM-MESSAGING-NETTY-BUFFER-SIZE 1024 - STORM-MESSAGING-NETTY-MAX-RETRIES 50 ; important for this test - STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 - STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000 - STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1 - STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1 - } - context (TransportFactory/makeContext storm-conf) - client (.connect context nil "localhost" port) - - server_fn (fn [] - (let [server (.bind context nil port) - poll (fn [] - (let [iter (.recv server 0 0) - result ()] - (if (nil? iter) () (iterator-seq iter)) - )) - process_msg (fn [msg] - (let [msg_body (String. (.message msg))] - (if (= poison_msg msg_body) - (do (print "Received a poison...") - true) - (do (is (= dummy_msg msg_body)) - (println (str "Received: " msg_body)) - (Thread/sleep 100) - false)) - )) - ] - (loop [need_exit false] - (when (or (false? need_exit) (nil? need_exit)) - (recur (some true? (map process_msg (poll)))))) - ; (recur (some true? (poll))))) - (.close server) - (println "SERVER CLOSED") - )) - stop_server (fn [server_future] - (.send client task (.getBytes poison_msg)) - - (if (= "timeout" (deref server_future 5000 "timeout")) - (do - ;Note, that this does not stop Server thread - ;because of ignoring InterruptedException in Server#recv (what is strange) - (future-cancel server_future) - (throw (RuntimeException. "Error. Server didn't stop as we asked it.")) - )) - ) - server_1 (future (server_fn)) - _ (println "Let the client connect to a server initially") - _ (.send client task (.getBytes dummy_msg)) - _ (println "Closing the server") - _ (stop_server server_1) - _ (println "Connecting to the temporarily dead server") - _ (let [reconnect (future (.send client task (.getBytes dummy_msg))) - _ (print "Sleeping for 10 seconds before resuming the server...") - _ (Thread/sleep 10000) - server_2 (future (server_fn)) - _ (println "RESUMED. Expecting that client will send the message successfully.") - _ (if (= "timeout" (deref reconnect 15000 "timeout")) - (do - (future-cancel reconnect) - "timeout") - ) - ] - (stop_server server_2) - ) - ] - (.close client) - (.term context)))
