merge fix for STORM-329
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c2916315 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c2916315 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c2916315 Branch: refs/heads/master Commit: c291631563e2ab0eaa7f197dc95b340552bd12f7 Parents: e6a29e0 522d96e Author: P. Taylor Goetz <[email protected]> Authored: Mon Feb 23 14:41:42 2015 -0500 Committer: P. Taylor Goetz <[email protected]> Committed: Mon Feb 23 14:41:42 2015 -0500 ---------------------------------------------------------------------- CHANGELOG.md | 1 + conf/defaults.yaml | 2 +- .../src/clj/backtype/storm/daemon/worker.clj | 52 +- .../src/clj/backtype/storm/messaging/local.clj | 2 +- .../storm/messaging/ConnectionWithStatus.java | 32 + .../backtype/storm/messaging/netty/Client.java | 712 ++++++++++++------- .../messaging/netty/SaslStormClientHandler.java | 5 +- .../backtype/storm/messaging/netty/Server.java | 182 +++-- .../netty/StormClientPipelineFactory.java | 5 +- .../storm/messaging/netty_unit_test.clj | 40 +- .../test/clj/backtype/storm/worker_test.clj | 37 + 11 files changed, 737 insertions(+), 333 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/c2916315/CHANGELOG.md ---------------------------------------------------------------------- diff --cc CHANGELOG.md index 3ed28c0,d0e4a03..7d465dd --- a/CHANGELOG.md +++ b/CHANGELOG.md @@@ -1,7 -1,4 +1,8 @@@ ## 0.10.0 ++ * STORM-329: fix cascading Storm failure by improving reconnection strategy and buffering messages + * STORM-641: Add total number of topologies to api/v1/cluster/summary. + * STORM-640: Storm UI vulnerable to poodle attack. + * STORM-651: improvements to storm.cmd * STORM-456: Storm UI: cannot navigate to topology page when name contains spaces. * STORM-627: Storm-hbase configuration error. * STORM-248: cluster.xml location is hardcoded for workers http://git-wip-us.apache.org/repos/asf/storm/blob/c2916315/storm-core/src/clj/backtype/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/backtype/storm/daemon/worker.clj index 46ded42,15c6143..dad9354 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@@ -15,23 -15,13 +15,23 @@@ ;; limitations under the License. (ns backtype.storm.daemon.worker (:use [backtype.storm.daemon common]) - (:use [backtype.storm bootstrap]) + (:use [backtype.storm config log util timer]) (:require [backtype.storm.daemon [executor :as executor]]) + (:require [backtype.storm [disruptor :as disruptor] [cluster :as cluster]]) + (:require [clojure.set :as set]) + (:require [backtype.storm.messaging.loader :as msg-loader]) (:import [java.util.concurrent Executors]) (:import [java.util ArrayList HashMap]) - (:import [backtype.storm.utils TransferDrainer]) + (:import [backtype.storm.utils Utils TransferDrainer ThriftTopologyUtils]) (:import [backtype.storm.messaging TransportFactory]) - (:import [backtype.storm.messaging TaskMessage IContext IConnection]) + (:import [backtype.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status]) + (:import [backtype.storm.daemon.common WorkerHeartbeat]) + (:import [backtype.storm.daemon Shutdownable]) + (:import [backtype.storm.serialization KryoTupleSerializer]) + (:import [backtype.storm.generated StormTopology]) + (:import [backtype.storm.tuple Fields]) + (:import [backtype.storm.task WorkerTopologyContext]) + (:import [backtype.storm Constants]) (:import [backtype.storm.security.auth AuthUtils]) (:import [javax.security.auth Subject]) (:import [java.security PrivilegedExceptionAction]) http://git-wip-us.apache.org/repos/asf/storm/blob/c2916315/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj ---------------------------------------------------------------------- diff --cc storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj index 38160f8,b152af2..51d03b1 --- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj +++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj @@@ -16,8 -16,10 +16,9 @@@ (ns backtype.storm.messaging.netty-unit-test (:use [clojure test]) (:import [backtype.storm.messaging TransportFactory]) - (:use [backtype.storm testing util config]) - (:use [backtype.storm bootstrap testing util]) - (:use [backtype.storm.daemon.worker :only [is-connection-ready]])) - -(bootstrap) ++ (:use [backtype.storm testing util config log]) ++ (:use [backtype.storm.daemon.worker :only [is-connection-ready]]) + (:import [java.util ArrayList])) (def port 6700) (def task 1) @@@ -68,39 -94,9 +93,40 @@@ (.close server) (.term context))) +(deftest test-server-delayed + (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz") + storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context" + STORM-MESSAGING-NETTY-AUTHENTICATION false + STORM-MESSAGING-NETTY-BUFFER-SIZE 1024 + STORM-MESSAGING-NETTY-MAX-RETRIES 10 + STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 + STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000 + STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1 + STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1 + } + context (TransportFactory/makeContext storm-conf) + client (.connect context nil "localhost" port) + + server (Thread. + (fn [] + (Thread/sleep 1000) + (let [server (.bind context nil port) + iter (.recv server 0 0) + resp (.next iter)] + (is (= task (.task resp))) + (is (= req_msg (String. (.message resp)))) + (.close server) + ))) + _ (.start server) + _ (.send client task (.getBytes req_msg)) + ] + (.close client) + (.join server) + (.term context))) + (deftest test-batch - (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context" + (let [num-messages 100000 + storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context" STORM-MESSAGING-NETTY-AUTHENTICATION false STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000 STORM-MESSAGING-NETTY-MAX-RETRIES 10 http://git-wip-us.apache.org/repos/asf/storm/blob/c2916315/storm-core/test/clj/backtype/storm/worker_test.clj ---------------------------------------------------------------------- diff --cc storm-core/test/clj/backtype/storm/worker_test.clj index 0000000,f09baef..2e0533d mode 000000,100644..100644 --- a/storm-core/test/clj/backtype/storm/worker_test.clj +++ b/storm-core/test/clj/backtype/storm/worker_test.clj @@@ -1,0 -1,38 +1,37 @@@ + ;; Licensed to the Apache Software Foundation (ASF) under one + ;; or more contributor license agreements. See the NOTICE file + ;; distributed with this work for additional information + ;; regarding copyright ownership. The ASF licenses this file + ;; to you under the Apache License, Version 2.0 (the + ;; "License"); you may not use this file except in compliance + ;; with the License. You may obtain a copy of the License at + ;; + ;; http://www.apache.org/licenses/LICENSE-2.0 + ;; + ;; Unless required by applicable law or agreed to in writing, software + ;; distributed under the License is distributed on an "AS IS" BASIS, + ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ;; See the License for the specific language governing permissions and + ;; limitations under the License. + (ns backtype.storm.worker-test + (:use [clojure test]) + (:import [backtype.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status]) + (:import [org.mockito Mockito]) - (:use [backtype.storm bootstrap testing]) ++ (:use [backtype.storm testing]) + (:use [backtype.storm.daemon common]) + + (:require [backtype.storm.daemon [worker :as worker]]) + ) + -(bootstrap) + + (deftest test-worker-is-connection-ready + (let [connection (Mockito/mock ConnectionWithStatus)] + (. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Ready) + (is (= true (worker/is-connection-ready connection))) + + (. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Connecting) + (is (= false (worker/is-connection-ready connection))) + + (. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Closed) + (is (= false (worker/is-connection-ready connection))) + ))
