merge fix for STORM-329

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c2916315
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c2916315
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c2916315

Branch: refs/heads/master
Commit: c291631563e2ab0eaa7f197dc95b340552bd12f7
Parents: e6a29e0 522d96e
Author: P. Taylor Goetz <[email protected]>
Authored: Mon Feb 23 14:41:42 2015 -0500
Committer: P. Taylor Goetz <[email protected]>
Committed: Mon Feb 23 14:41:42 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md                                    |   1 +
 conf/defaults.yaml                              |   2 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |  52 +-
 .../src/clj/backtype/storm/messaging/local.clj  |   2 +-
 .../storm/messaging/ConnectionWithStatus.java   |  32 +
 .../backtype/storm/messaging/netty/Client.java  | 712 ++++++++++++-------
 .../messaging/netty/SaslStormClientHandler.java |   5 +-
 .../backtype/storm/messaging/netty/Server.java  | 182 +++--
 .../netty/StormClientPipelineFactory.java       |   5 +-
 .../storm/messaging/netty_unit_test.clj         |  40 +-
 .../test/clj/backtype/storm/worker_test.clj     |  37 +
 11 files changed, 737 insertions(+), 333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c2916315/CHANGELOG.md
----------------------------------------------------------------------
diff --cc CHANGELOG.md
index 3ed28c0,d0e4a03..7d465dd
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@@ -1,7 -1,4 +1,8 @@@
  ## 0.10.0
++ * STORM-329: fix cascading Storm failure by improving reconnection strategy 
and buffering messages
 + * STORM-641: Add total number of topologies to api/v1/cluster/summary.
 + * STORM-640: Storm UI vulnerable to poodle attack.
 + * STORM-651: improvements to storm.cmd
   * STORM-456: Storm UI: cannot navigate to topology page when name contains 
spaces.
   * STORM-627: Storm-hbase configuration error.
   * STORM-248: cluster.xml location is hardcoded for workers

http://git-wip-us.apache.org/repos/asf/storm/blob/c2916315/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/worker.clj
index 46ded42,15c6143..dad9354
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@@ -15,23 -15,13 +15,23 @@@
  ;; limitations under the License.
  (ns backtype.storm.daemon.worker
    (:use [backtype.storm.daemon common])
 -  (:use [backtype.storm bootstrap])
 +  (:use [backtype.storm config log util timer])
    (:require [backtype.storm.daemon [executor :as executor]])
 +  (:require [backtype.storm [disruptor :as disruptor] [cluster :as cluster]])
 +  (:require [clojure.set :as set])
 +  (:require [backtype.storm.messaging.loader :as msg-loader])
    (:import [java.util.concurrent Executors])
    (:import [java.util ArrayList HashMap])
 -  (:import [backtype.storm.utils TransferDrainer])
 +  (:import [backtype.storm.utils Utils TransferDrainer ThriftTopologyUtils])
    (:import [backtype.storm.messaging TransportFactory])
-   (:import [backtype.storm.messaging TaskMessage IContext IConnection])
+   (:import [backtype.storm.messaging TaskMessage IContext IConnection 
ConnectionWithStatus ConnectionWithStatus$Status])
 +  (:import [backtype.storm.daemon.common WorkerHeartbeat])
 +  (:import [backtype.storm.daemon Shutdownable])
 +  (:import [backtype.storm.serialization KryoTupleSerializer])
 +  (:import [backtype.storm.generated StormTopology])
 +  (:import [backtype.storm.tuple Fields])
 +  (:import [backtype.storm.task WorkerTopologyContext])
 +  (:import [backtype.storm Constants])
    (:import [backtype.storm.security.auth AuthUtils])
    (:import [javax.security.auth Subject])
    (:import [java.security PrivilegedExceptionAction])

http://git-wip-us.apache.org/repos/asf/storm/blob/c2916315/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index 38160f8,b152af2..51d03b1
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@@ -16,8 -16,10 +16,9 @@@
  (ns backtype.storm.messaging.netty-unit-test
    (:use [clojure test])
    (:import [backtype.storm.messaging TransportFactory])
-   (:use [backtype.storm testing util config])
 -  (:use [backtype.storm bootstrap testing util])
 -  (:use [backtype.storm.daemon.worker :only [is-connection-ready]]))
 -
 -(bootstrap)
++  (:use [backtype.storm testing util config log])
++  (:use [backtype.storm.daemon.worker :only [is-connection-ready]])
 +  (:import [java.util ArrayList]))
  
  (def port 6700)
  (def task 1)
@@@ -68,39 -94,9 +93,40 @@@
      (.close server)
      (.term context)))
  
 +(deftest test-server-delayed
 +    (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
 +       storm-conf {STORM-MESSAGING-TRANSPORT 
"backtype.storm.messaging.netty.Context"
 +                    STORM-MESSAGING-NETTY-AUTHENTICATION false
 +                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
 +                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
 +                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
 +                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
 +                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
 +                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
 +                    }
 +        context (TransportFactory/makeContext storm-conf)
 +        client (.connect context nil "localhost" port)
 +
 +        server (Thread.
 +                (fn []
 +                  (Thread/sleep 1000)
 +                  (let [server (.bind context nil port)
 +                        iter (.recv server 0 0)
 +                        resp (.next iter)]
 +                    (is (= task (.task resp)))
 +                    (is (= req_msg (String. (.message resp))))
 +                    (.close server)
 +                  )))
 +        _ (.start server)
 +        _ (.send client task (.getBytes req_msg))
 +        ]
 +    (.close client)
 +    (.join server)
 +    (.term context)))
 +
  (deftest test-batch
-   (let [storm-conf {STORM-MESSAGING-TRANSPORT 
"backtype.storm.messaging.netty.Context"
+   (let [num-messages 100000
+         storm-conf {STORM-MESSAGING-TRANSPORT 
"backtype.storm.messaging.netty.Context"
                      STORM-MESSAGING-NETTY-AUTHENTICATION false
                      STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
                      STORM-MESSAGING-NETTY-MAX-RETRIES 10

http://git-wip-us.apache.org/repos/asf/storm/blob/c2916315/storm-core/test/clj/backtype/storm/worker_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/worker_test.clj
index 0000000,f09baef..2e0533d
mode 000000,100644..100644
--- a/storm-core/test/clj/backtype/storm/worker_test.clj
+++ b/storm-core/test/clj/backtype/storm/worker_test.clj
@@@ -1,0 -1,38 +1,37 @@@
+ ;; Licensed to the Apache Software Foundation (ASF) under one
+ ;; or more contributor license agreements.  See the NOTICE file
+ ;; distributed with this work for additional information
+ ;; regarding copyright ownership.  The ASF licenses this file
+ ;; to you under the Apache License, Version 2.0 (the
+ ;; "License"); you may not use this file except in compliance
+ ;; with the License.  You may obtain a copy of the License at
+ ;;
+ ;; http://www.apache.org/licenses/LICENSE-2.0
+ ;;
+ ;; Unless required by applicable law or agreed to in writing, software
+ ;; distributed under the License is distributed on an "AS IS" BASIS,
+ ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ;; See the License for the specific language governing permissions and
+ ;; limitations under the License.
+ (ns backtype.storm.worker-test
+   (:use [clojure test])
+   (:import [backtype.storm.messaging TaskMessage IContext IConnection 
ConnectionWithStatus ConnectionWithStatus$Status])
+   (:import [org.mockito Mockito])
 -  (:use [backtype.storm bootstrap testing])
++  (:use [backtype.storm testing])
+   (:use [backtype.storm.daemon common])
+ 
+   (:require [backtype.storm.daemon [worker :as worker]])
+   )
+ 
 -(bootstrap)
+ 
+ (deftest test-worker-is-connection-ready
+   (let [connection (Mockito/mock ConnectionWithStatus)]
+     (. (Mockito/when (.status connection)) thenReturn 
ConnectionWithStatus$Status/Ready)
+     (is (= true (worker/is-connection-ready connection)))
+ 
+     (. (Mockito/when (.status connection)) thenReturn 
ConnectionWithStatus$Status/Connecting)
+     (is (= false (worker/is-connection-ready connection)))
+ 
+     (. (Mockito/when (.status connection)) thenReturn 
ConnectionWithStatus$Status/Closed)
+     (is (= false (worker/is-connection-ready connection)))
+   ))

Reply via email to