[STORM-537] A worker reconnects infinitely to another dead worker
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1aacccf2 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1aacccf2 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1aacccf2 Branch: refs/heads/security Commit: 1aacccf286829e9289d86a6ed10b23cb2b21bc47 Parents: 5a46038 Author: Sergey Tryuber <[email protected]> Authored: Wed Oct 29 18:27:56 2014 +0300 Committer: Sergey Tryuber <[email protected]> Committed: Wed Oct 29 18:36:35 2014 +0300 ---------------------------------------------------------------------- .../backtype/storm/messaging/netty/Client.java | 1 + .../storm/messaging/netty_unit_test.clj | 179 +++++++++++++++++-- 2 files changed, 165 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/1aacccf2/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java index fed684e..3e4c2f6 100644 --- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java +++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java @@ -153,6 +153,7 @@ public class Client implements IConnection { if (!future.isSuccess()) { if (null != current) { current.close(); + channel = null; } } else { channel = current; http://git-wip-us.apache.org/repos/asf/storm/blob/1aacccf2/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj index ea7b8dc..b2269ad 100644 --- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj +++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj @@ -15,20 +15,21 @@ ;; limitations under the License. (ns backtype.storm.messaging.netty-unit-test (:use [clojure test]) - (:import [backtype.storm.messaging TransportFactory]) + (:import [backtype.storm.messaging TransportFactory TaskMessage]) + (:import [java.util.concurrent ExecutionException]) (:use [backtype.storm bootstrap testing util])) (bootstrap) -(def port 6700) -(def task 1) +(def port 6700) +(def task 1) (deftest test-basic (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz") storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context" STORM-MESSAGING-NETTY-BUFFER-SIZE 1024 STORM-MESSAGING-NETTY-MAX-RETRIES 10 - STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 + STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000 STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1 STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1 @@ -43,14 +44,14 @@ (is (= req_msg (String. (.message resp)))) (.close client) (.close server) - (.term context))) + (.term context))) (deftest test-large-msg - (let [req_msg (apply str (repeat 2048000 'c')) + (let [req_msg (apply str (repeat 2048000 'c')) storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context" STORM-MESSAGING-NETTY-BUFFER-SIZE 102400 STORM-MESSAGING-NETTY-MAX-RETRIES 10 - STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 + STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000 STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1 STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1 @@ -65,21 +66,21 @@ (is (= req_msg (String. (.message resp)))) (.close client) (.close server) - (.term context))) - + (.term context))) + (deftest test-server-delayed (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz") storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context" STORM-MESSAGING-NETTY-BUFFER-SIZE 1024 STORM-MESSAGING-NETTY-MAX-RETRIES 10 - STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 + STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000 STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1 STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1 } context (TransportFactory/makeContext storm-conf) client (.connect context nil "localhost" port) - + server (Thread. (fn [] (Thread/sleep 1000) @@ -88,7 +89,7 @@ resp (.next iter)] (is (= task (.task resp))) (is (= req_msg (String. (.message resp)))) - (.close server) + (.close server) ))) _ (.start server) _ (.send client task (.getBytes req_msg)) @@ -101,7 +102,7 @@ (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context" STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000 STORM-MESSAGING-NETTY-MAX-RETRIES 10 - STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 + STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000 STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1 STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1 @@ -112,7 +113,7 @@ (doseq [num (range 1 100000)] (let [req_msg (str num)] (.send client task (.getBytes req_msg)))) - + (let [resp (ArrayList.) received (atom 0)] (while (< @received (- 100000 1)) @@ -126,7 +127,155 @@ (let [req_msg (str num) resp_msg (String. (.message (.get resp (- num 1))))] (is (= req_msg resp_msg))))) - + (.close client) (.close server) (.term context))) + + +(deftest test-reconnect-to-permanently-failed-server + "Tests that if connection to a server already established and server fails, then + Client#connect() throws an exception" + (let [dummy_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz") + poison_msg (String. "kill_the_server") + storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context" + STORM-MESSAGING-NETTY-BUFFER-SIZE 1024 + STORM-MESSAGING-NETTY-MAX-RETRIES 5 ; just to decrease test duration + STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 + STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000 + STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1 + STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1 + ;critical for this test + ; STORM-NETTY-MESSAGE-BATCH-SIZE 1 + } + context (TransportFactory/makeContext storm-conf) + client (.connect context nil "localhost" port) + server_fn (fn [] + (let [server (.bind context nil port) + poll (fn [] + (let [iter (.recv server 0 0) + result ()] + (if (nil? iter) () (iterator-seq iter)) + )) + process_msg (fn [msg] + (let [msg_body (String. (.message msg))] + (if (= poison_msg msg_body) + (do (print "Received a poison...") + true) + (do (is (= dummy_msg msg_body)) + (println (str "Received: " msg_body)) + (Thread/sleep 100) + false)) + )) + ] + (loop [need_exit false] + (when (or (false? need_exit) (nil? need_exit)) + (recur (some true? (map process_msg (poll)))))) + ; (recur (some true? (poll))))) + (.close server) + (println "SERVER CLOSED") + )) + stop_server (fn [server_future] + (.send client task (.getBytes poison_msg)) + (if (= "timeout" (deref server_future 5000 "timeout")) + (do + ;Note, that this does not stop Server thread + ;because of ignoring InterruptedException in Server#recv (what is strange) + (future-cancel server_future) + (throw (RuntimeException. "Error. Server didn't stop as we asked it.")) + )) + ) + server_1 (future (server_fn)) + _ (println "Let the client connect to a server initially") + _ (.send client task (.getBytes dummy_msg)) + _ (println "Permanently stopping the server") + _ (stop_server server_1) + _ (println "Sending batch of messages to the dead server") + batch (future (.send client (.iterator [(TaskMessage. task (.getBytes dummy_msg)) + (TaskMessage. task (.getBytes dummy_msg)) + (TaskMessage. task (.getBytes dummy_msg)) + (TaskMessage. task (.getBytes dummy_msg))]))) + _ (is + (thrown-cause-with-msg? ExecutionException #".*Remote address is not reachable\. We will close this client.*" + (deref batch (* 2 (* (get storm-conf STORM-MESSAGING-NETTY-MAX-RETRIES) (get storm-conf STORM-MESSAGING-NETTY-MIN-SLEEP-MS))) "timeout"))) + _ (future-cancel batch) + ] + (.close client) + (.term context))) + +(deftest test-reconnect-to-temporarily-failed-server + "Tests that if connection to a server already established and server TEMPORARILY fails, then + Client#connect() succeeds after several re-tries" + (let [dummy_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz") + poison_msg (String. "kill_the_server") + storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context" + STORM-MESSAGING-NETTY-BUFFER-SIZE 1024 + STORM-MESSAGING-NETTY-MAX-RETRIES 50 ; important for this test + STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 + STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000 + STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1 + STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1 + ;critical for this test + ; STORM-NETTY-MESSAGE-BATCH-SIZE 1 + } + context (TransportFactory/makeContext storm-conf) + client (.connect context nil "localhost" port) + + server_fn (fn [] + (let [server (.bind context nil port) + poll (fn [] + (let [iter (.recv server 0 0) + result ()] + (if (nil? iter) () (iterator-seq iter)) + )) + process_msg (fn [msg] + (let [msg_body (String. (.message msg))] + (if (= poison_msg msg_body) + (do (print "Received a poison...") + true) + (do (is (= dummy_msg msg_body)) + (println (str "Received: " msg_body)) + (Thread/sleep 100) + false)) + )) + ] + (loop [need_exit false] + (when (or (false? need_exit) (nil? need_exit)) + (recur (some true? (map process_msg (poll)))))) + ; (recur (some true? (poll))))) + (.close server) + (println "SERVER CLOSED") + )) + stop_server (fn [server_future] + (.send client task (.getBytes poison_msg)) + + (if (= "timeout" (deref server_future 5000 "timeout")) + (do + ;Note, that this does not stop Server thread + ;because of ignoring InterruptedException in Server#recv (what is strange) + (future-cancel server_future) + (throw (RuntimeException. "Error. Server didn't stop as we asked it.")) + )) + ) + server_1 (future (server_fn)) + _ (println "Let the client connect to a server initially") + _ (.send client task (.getBytes dummy_msg)) + _ (println "Closing the server") + _ (stop_server server_1) + _ (println "Connecting to the temporarily dead server") + _ (let [reconnect (future (.send client task (.getBytes dummy_msg))) + _ (print "Sleeping for 10 seconds before resuming the server...") + _ (Thread/sleep 10000) + server_2 (future (server_fn)) + _ (println "RESUMED. Expecting that client will send the message successfully.") + _ (if (= "timeout" (deref reconnect 15000 "timeout")) + (do + (future-cancel reconnect) + "timeout") + ) + ] + (stop_server server_2) + ) + ] + (.close client) + (.term context)))
