Repository: storm
Updated Branches:
  refs/heads/master e6a29e07c -> d7334849b


STORM-329: fix cascading Storm failure by improving reconnection strategy and 
buffering messages

Thanks to @tedxia for the initial work on this patch, which covered a
lot if not most of the work!


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/205eaf4e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/205eaf4e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/205eaf4e

Branch: refs/heads/master
Commit: 205eaf4ebe28ab5550a842ea9aabd23b41678743
Parents: 8036109
Author: Michael G. Noll <[email protected]>
Authored: Wed Feb 11 19:55:53 2015 +0100
Committer: Michael G. Noll <[email protected]>
Committed: Wed Feb 11 19:55:53 2015 +0100

----------------------------------------------------------------------
 conf/defaults.yaml                              |   2 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |  52 +-
 .../src/clj/backtype/storm/messaging/local.clj  |   2 +-
 .../storm/messaging/ConnectionWithStatus.java   |  32 +
 .../backtype/storm/messaging/netty/Client.java  | 711 ++++++++++++-------
 .../messaging/netty/SaslStormClientHandler.java |   5 +-
 .../backtype/storm/messaging/netty/Server.java  | 182 +++--
 .../netty/StormClientPipelineFactory.java       |   5 +-
 .../storm/messaging/netty_unit_test.clj         |  71 +-
 .../test/clj/backtype/storm/worker_test.clj     |  38 +
 10 files changed, 736 insertions(+), 364 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 35d20ff..bc7cb65 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -146,7 +146,7 @@ zmq.hwm: 0
 storm.messaging.netty.server_worker_threads: 1
 storm.messaging.netty.client_worker_threads: 1
 storm.messaging.netty.buffer_size: 5242880 #5MB buffer
-# Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 
120, other workers should also wait at least that long before giving up on 
connecting to the other worker.
+# Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 
120, other workers should also wait at least that long before giving up on 
connecting to the other worker. The reconnection period need also be bigger 
than storm.zookeeper.session.timeout(default is 20s), so that we can abort the 
reconnection when the target worker is dead.
 storm.messaging.netty.max_retries: 300
 storm.messaging.netty.max_wait_ms: 1000
 storm.messaging.netty.min_wait_ms: 100

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj 
b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 8bba5e4..15c6143 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -21,7 +21,7 @@
   (:import [java.util ArrayList HashMap])
   (:import [backtype.storm.utils TransferDrainer])
   (:import [backtype.storm.messaging TransportFactory])
-  (:import [backtype.storm.messaging TaskMessage IContext IConnection])
+  (:import [backtype.storm.messaging TaskMessage IContext IConnection 
ConnectionWithStatus ConnectionWithStatus$Status])
   (:import [backtype.storm.security.auth AuthUtils])
   (:import [javax.security.auth Subject])
   (:import [java.security PrivilegedExceptionAction])
@@ -217,6 +217,10 @@
       :worker-id worker-id
       :cluster-state cluster-state
       :storm-cluster-state storm-cluster-state
+      ;; when worker bootup, worker will start to setup initial connections to
+      ;; other workers. When all connection is ready, we will enable this flag
+      ;; and spout and bolt will be activated.
+      :worker-active-flag (atom false)
       :storm-active-atom (atom false)
       :executors executors
       :task-ids (->> receive-queue-map keys (map int) sort)
@@ -321,7 +325,7 @@
     (let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) 
callback)]
      (reset!
       (:storm-active-atom worker)
-      (= :active (-> base :status :type))
+       (and (= :active (-> base :status :type)) @(:worker-active-flag worker))
       ))
      ))
 
@@ -343,6 +347,37 @@
               (.send drainer node+port->socket)))
           (.clear drainer))))))
 
+;; Check whether this messaging connection is ready to send data
+(defn is-connection-ready [^IConnection connection]
+  (if (instance?  ConnectionWithStatus connection)
+    (let [^ConnectionWithStatus connection connection
+          status (.status connection)]
+      (= status ConnectionWithStatus$Status/Ready))
+    true))
+
+;; all connections are ready
+(defn all-connections-ready [worker]
+    (let [connections (vals @(:cached-node+port->socket worker))]
+      (every? is-connection-ready connections)))
+
+;; we will wait all connections to be ready and then activate the spout/bolt
+;; when the worker bootup
+(defn activate-worker-when-all-connections-ready
+  [worker]
+  (let [timer (:refresh-active-timer worker)
+        delay-secs 0
+        recur-secs 1]
+    (schedule timer
+      delay-secs
+      (fn this []
+        (if (all-connections-ready worker)
+          (do
+            (log-message "All connections are ready for worker " 
(:assignment-id worker) ":" (:port worker)
+              " with id "(:worker-id worker))
+            (reset! (:worker-active-flag worker) true))
+          (schedule timer recur-secs this :check-active false)
+            )))))
+
 (defn launch-receive-thread [worker]
   (log-message "Launching receive-thread for " (:assignment-id worker) ":" 
(:port worker))
   (msg-loader/launch-receive-thread!
@@ -395,21 +430,26 @@
         ;; do this here so that the worker process dies if this fails
         ;; it's important that worker heartbeat to supervisor ASAP when 
launching so that the supervisor knows it's running (and can move on)
         _ (heartbeat-fn)
- 
+
         executors (atom nil)
         ;; launch heartbeat threads immediately so that slow-loading tasks 
don't cause the worker to timeout
         ;; to the supervisor
         _ (schedule-recurring (:heartbeat-timer worker) 0 (conf 
WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
         _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf 
TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors 
@executors))
 
+        receive-thread-shutdown (launch-receive-thread worker)
+
         refresh-connections (mk-refresh-connections worker)
 
         _ (refresh-connections nil)
+
+        _ (activate-worker-when-all-connections-ready worker)
+
         _ (refresh-storm-active worker nil)
- 
+
+
         _ (reset! executors (dofor [e (:executors worker)] 
(executor/mk-executor worker e initial-credentials)))
-        receive-thread-shutdown (launch-receive-thread worker)
-        
+
         transfer-tuples (mk-transfer-tuples-handler worker)
         
         transfer-thread (disruptor/consume-loop* (:transfer-queue worker) 
transfer-tuples)                                       

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/src/clj/backtype/storm/messaging/local.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/local.clj 
b/storm-core/src/clj/backtype/storm/messaging/local.clj
index 801f22d..4aa67ab 100644
--- a/storm-core/src/clj/backtype/storm/messaging/local.clj
+++ b/storm-core/src/clj/backtype/storm/messaging/local.clj
@@ -70,4 +70,4 @@
 (defn mk-context [] 
   (let [context  (LocalContext. nil nil)]
     (.prepare ^IContext context nil)
-    context))
\ No newline at end of file
+    context))

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java 
b/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
new file mode 100644
index 0000000..38abc19
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
@@ -0,0 +1,32 @@
+package backtype.storm.messaging;
+
+public abstract class ConnectionWithStatus implements IConnection {
+
+  public static enum Status {
+
+    /**
+     * we are establishing a active connection with target host. The new data
+     * sending request can be buffered for future sending, or dropped(cases 
like
+     * there is no enough memory). It varies with difference IConnection
+     * implementations.
+     */
+    Connecting,
+
+    /**
+     * We have a alive connection channel, which can be used to transfer data.
+     */
+    Ready,
+
+    /**
+     * The connection channel is closed or being closed. We don't accept 
further
+     * data sending or receiving. All data sending request will be dropped.
+     */
+    Closed
+  };
+
+  /**
+   * whether this connection is available to transfer data
+   */
+  public abstract Status status();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java 
b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index d770481..5d99718 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -24,15 +24,15 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.Random;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.util.concurrent.*;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
@@ -42,344 +42,577 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import backtype.storm.Config;
+import backtype.storm.messaging.ConnectionWithStatus;
 import backtype.storm.metric.api.IStatefulObject;
-import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.TaskMessage;
 import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
 import backtype.storm.utils.Utils;
 
-public class Client implements IConnection, IStatefulObject{
+/**
+ * A Netty client for sending task messages to a remote destination (Netty 
server).
+ *
+ * Implementation details:
+ *
+ * - Sending messages, i.e. writing to the channel, is performed 
asynchronously.
+ * - Messages are sent in batches to optimize for network throughput at the 
expense of network latency.  The message
+ *   batch size is configurable.
+ * - Connecting and reconnecting are performed asynchronously.
+ *     - Note: The current implementation drops any messages that are being 
enqueued for sending if the connection to
+ *       the remote destination is currently unavailable.
+ * - A background flusher thread is run in the background.  It will, at fixed 
intervals, check for any pending messages
+ *   (i.e. messages buffered in memory) and flush them to the remote 
destination iff background flushing is currently
+ *   enabled.
+ */
+public class Client extends ConnectionWithStatus implements IStatefulObject {
+
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
     private static final String PREFIX = "Netty-Client-";
-    private final int max_retries;
-    private final int base_sleep_ms;
-    private final int max_sleep_ms;
+    private static final long NO_DELAY_MS = 0L;
+    private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
+    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
+    private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
+    private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
+
     private final StormBoundedExponentialBackoffRetry retryPolicy;
-    private AtomicReference<Channel> channelRef;
     private final ClientBootstrap bootstrap;
-    private InetSocketAddress remote_addr;
-    
-    private AtomicInteger totalReconnects;
-    private AtomicInteger messagesSent;
-    private AtomicInteger messagesLostReconnect;
-    private final Random random = new Random();
-    private final ChannelFactory factory;
-    private final int buffer_size;
-    private boolean closing;
-
-    private int messageBatchSize;
-    
-    private AtomicLong pendings;
-    
-    Map storm_conf;
+    private final InetSocketAddress dstAddress;
+    protected final String dstAddressPrefixedName;
+
+    /**
+     * The channel used for all write operations from this client to the 
remote destination.
+     */
+    private final AtomicReference<Channel> channelRef = new 
AtomicReference<Channel>(null);
+
+
+    /**
+     * Maximum number of reconnection attempts we will perform after a 
disconnect before giving up.
+     */
+    private final int maxReconnectionAttempts;
+
+    /**
+     * Total number of connection attempts.
+     */
+    private final AtomicInteger totalConnectionAttempts = new AtomicInteger(0);
+
+    /**
+     * Number of connection attempts since the last disconnect.
+     */
+    private final AtomicInteger connectionAttempts = new AtomicInteger(0);
+
+    /**
+     * Number of messages successfully sent to the remote destination.
+     */
+    private final AtomicInteger messagesSent = new AtomicInteger(0);
+
+    /**
+     * Number of messages that could not be sent to the remote destination.
+     */
+    private final AtomicInteger messagesLost = new AtomicInteger(0);
+
+    /**
+     * Number of messages buffered in memory.
+     */
+    private final AtomicLong pendingMessages = new AtomicLong(0);
+
+    /**
+     * This flag is set to true if and only if a client instance is being 
closed.
+     */
+    private volatile boolean closing = false;
+
+    /**
+     * When set to true, then the background flusher thread will flush any 
pending messages on its next run.
+     */
+    private final AtomicBoolean backgroundFlushingEnabled = new 
AtomicBoolean(false);
+
+    /**
+     * The absolute time (in ms) when the next background flush should be 
performed.
+     *
+     * Note: The flush operation will only be performed if 
backgroundFlushingEnabled is true, too.
+     */
+    private final AtomicLong nextBackgroundFlushTimeMs = new 
AtomicLong(DISTANT_FUTURE_TIME_MS);
+
+    /**
+     * The time interval (in ms) at which the background flusher thread will 
be run to check for any pending messages
+     * to be flushed.
+     */
+    private final int flushCheckIntervalMs;
+
+    /**
+     * How many messages should be batched together before sending them to the 
remote destination.
+     *
+     * Messages are batched to optimize network throughput at the expense of 
latency.
+     */
+    private final int messageBatchSize;
 
     private MessageBatch messageBatch = null;
-    private AtomicLong flushCheckTimer;
-    private int flushCheckInterval;
-    private ScheduledExecutorService scheduler;
+    private final ListeningScheduledExecutorService scheduler;
+    protected final Map stormConf;
 
     @SuppressWarnings("rawtypes")
-    Client(Map storm_conf, ChannelFactory factory, 
-            ScheduledExecutorService scheduler, String host, int port) {
-       this.storm_conf = storm_conf;
-        this.factory = factory;
-        this.scheduler = scheduler;
-        channelRef = new AtomicReference<Channel>(null);
+    Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService 
scheduler, String host, int port) {
         closing = false;
-        pendings = new AtomicLong(0);
-        flushCheckTimer = new AtomicLong(Long.MAX_VALUE);
-        totalReconnects = new AtomicInteger(0);
-        messagesSent = new AtomicInteger(0);
-        messagesLostReconnect = new AtomicInteger(0);
-
-        // Configure
-        buffer_size = 
Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
-        max_retries = 
Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
-        base_sleep_ms = 
Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
-        max_sleep_ms = 
Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
-        retryPolicy = new StormBoundedExponentialBackoffRetry(base_sleep_ms, 
max_sleep_ms, max_retries);
-
-        this.messageBatchSize = 
Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
-        
-        flushCheckInterval = 
Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10); 
// default 10 ms
-
-        LOG.info("New Netty Client, connect to " + host + ", " + port
-                + ", config: " + ", buffer_size: " + buffer_size);
-
-        bootstrap = new ClientBootstrap(factory);
+        this.stormConf = stormConf;
+        this.scheduler =  MoreExecutors.listeningDecorator(scheduler);
+        int bufferSize = 
Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+        LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", 
host, port, bufferSize);
+        messageBatchSize = 
Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
+        flushCheckIntervalMs = 
Utils.getInt(stormConf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10);
+
+        maxReconnectionAttempts = 
Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
+        int minWaitMs = 
Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
+        int maxWaitMs = 
Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
+        retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, 
maxWaitMs, maxReconnectionAttempts);
+
+        // Initiate connection to remote destination
+        bootstrap = createClientBootstrap(factory, bufferSize);
+        dstAddress = new InetSocketAddress(host, port);
+        dstAddressPrefixedName = prefixedName(dstAddress);
+        connect(NO_DELAY_MS);
+
+        // Launch background flushing thread
+        pauseBackgroundFlushing();
+        long initialDelayMs = Math.min(MINIMUM_INITIAL_DELAY_MS, maxWaitMs * 
maxReconnectionAttempts);
+        scheduler.scheduleWithFixedDelay(createBackgroundFlusher(), 
initialDelayMs, flushCheckIntervalMs,
+            TimeUnit.MILLISECONDS);
+    }
+
+    private ClientBootstrap createClientBootstrap(ChannelFactory factory, int 
bufferSize) {
+        ClientBootstrap bootstrap = new ClientBootstrap(factory);
         bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("sendBufferSize", buffer_size);
+        bootstrap.setOption("sendBufferSize", bufferSize);
         bootstrap.setOption("keepAlive", true);
-
-        // Set up the pipeline factory.
         bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
+        return bootstrap;
+    }
 
-        // Start the connection attempt.
-        remote_addr = new InetSocketAddress(host, port);
-        
-        // setup the connection asyncly now
-        scheduler.execute(new Runnable() {
-            @Override
-            public void run() {   
-                connect();
-            }
-        });
-        
-        Runnable flusher = new Runnable() {
+    private String prefixedName(InetSocketAddress dstAddress) {
+        if (null != dstAddress) {
+            return PREFIX + dstAddress.toString();
+        }
+        return "";
+    }
+
+    private Runnable createBackgroundFlusher() {
+        return new Runnable() {
             @Override
             public void run() {
-
-                if(!closing) {
-                    long flushCheckTime = flushCheckTimer.get();
-                    long now = System.currentTimeMillis();
-                    if (now > flushCheckTime) {
-                        Channel channel = channelRef.get();
-                        if (null != channel && channel.isWritable()) {
-                            flush(channel);
-                        }
-                    }
+                if(!closing && backgroundFlushingEnabled.get() && nowMillis() 
> nextBackgroundFlushTimeMs.get()) {
+                    LOG.debug("flushing {} pending messages to {} in 
background", messageBatch.size(),
+                        dstAddressPrefixedName);
+                    flushPendingMessages();
                 }
-                
             }
         };
-        
-        long initialDelay = Math.min(30L * 1000, max_sleep_ms * max_retries); 
//max wait for 30s
-        scheduler.scheduleWithFixedDelay(flusher, initialDelay, 
flushCheckInterval, TimeUnit.MILLISECONDS);
+    }
+
+    private void pauseBackgroundFlushing() {
+        backgroundFlushingEnabled.set(false);
+    }
+
+    private void resumeBackgroundFlushing() {
+        backgroundFlushingEnabled.set(true);
+    }
+
+    private synchronized void flushPendingMessages() {
+        Channel channel = channelRef.get();
+        if (containsMessages(messageBatch)) {
+            if (connectionEstablished(channel)) {
+                if (channel.isWritable()) {
+                    pauseBackgroundFlushing();
+                    MessageBatch toBeFlushed = messageBatch;
+                    flushMessages(channel, toBeFlushed);
+                    messageBatch = null;
+                }
+                else if (closing) {
+                    // Ensure background flushing is enabled so that we 
definitely have a chance to re-try the flush
+                    // operation in case the client is being gracefully closed 
(where we have a brief time window where
+                    // the client will wait for pending messages to be sent).
+                    resumeBackgroundFlushing();
+                }
+            }
+            else {
+                closeChannelAndReconnect(channel);
+            }
+        }
+    }
+
+    private long nowMillis() {
+        return System.currentTimeMillis();
     }
 
     /**
      * We will retry connection with exponential back-off policy
      */
-    private synchronized void connect() {
+    private synchronized void connect(long delayMs) {
         try {
+            if (closing) {
+                return;
+            }
 
-            Channel channel = channelRef.get();
-            if (channel != null && channel.isConnected()) {
+            if (connectionEstablished(channelRef.get())) {
                 return;
             }
 
-            int tried = 0;
-            //setting channel to null to make sure we throw an exception when 
reconnection fails
-            channel = null;
-            while (tried <= max_retries) {
-
-                LOG.info("Reconnect started for {}... [{}]", name(), tried);
-                LOG.debug("connection started...");
-
-                totalReconnects.getAndIncrement();
-                ChannelFuture future = bootstrap.connect(remote_addr);
-                future.awaitUninterruptibly();
-                Channel current = future.getChannel();
-                if (!future.isSuccess()) {
-                    if (null != current) {
-                        current.close();
+            connectionAttempts.getAndIncrement();
+            if (reconnectingAllowed()) {
+                totalConnectionAttempts.getAndIncrement();
+                LOG.info("connection attempt {} to {} scheduled to run in {} 
ms", connectionAttempts.get(),
+                    dstAddressPrefixedName, delayMs);
+                ListenableFuture<Channel> channelFuture = scheduler.schedule(
+                    new Connector(dstAddress, connectionAttempts.get()), 
delayMs, TimeUnit.MILLISECONDS);
+                Futures.addCallback(channelFuture, new 
FutureCallback<Channel>() {
+                    @Override public void onSuccess(Channel result) {
+                        if (connectionEstablished(result)) {
+                            setChannel(result);
+                            LOG.info("connection established to {}", 
dstAddressPrefixedName);
+                            connectionAttempts.set(0);
+                        }
+                        else {
+                            reconnectAgain(new RuntimeException("Returned 
channel was actually not established"));
+                        }
+                    }
+
+                    @Override public void onFailure(Throwable t) {
+                        reconnectAgain(t);
                     }
-                } else {
-                    channel = current;
-                    break;
-                }
-                Thread.sleep(retryPolicy.getSleepTimeMs(tried, 0));
-                tried++;  
+
+                    private void reconnectAgain(Throwable t) {
+                        String baseMsg = String.format("connection attempt %s 
to %s failed", connectionAttempts,
+                            dstAddressPrefixedName);
+                        String failureMsg = (t == null)? baseMsg : baseMsg + 
": " + t.toString();
+                        LOG.error(failureMsg);
+                        long nextDelayMs = 
retryPolicy.getSleepTimeMs(connectionAttempts.get(), 0);
+                        connect(nextDelayMs);
+                    }
+                });
             }
-            if (null != channel) {
-                LOG.info("connection established to a remote host " + name() + 
", " + channel.toString());
-                channelRef.set(channel);
-            } else {
+            else {
                 close();
-                throw new RuntimeException("Remote address is not reachable. 
We will close this client " + name());
+                throw new RuntimeException("Giving up to connect to " + 
dstAddressPrefixedName + " after " +
+                    connectionAttempts + " failed attempts");
             }
-        } catch (InterruptedException e) {
-            throw new RuntimeException("connection failed " + name(), e);
         }
+        catch (Exception e) {
+            throw new RuntimeException("Failed to connect to " + 
dstAddressPrefixedName, e);
+        }
+    }
+
+    private void setChannel(Channel channel) {
+        channelRef.set(channel);
+    }
+
+    private boolean reconnectingAllowed() {
+        return !closing && connectionAttempts.get() <= 
(maxReconnectionAttempts + 1);
+    }
+
+    private boolean connectionEstablished(Channel channel) {
+        // Because we are using TCP (which is a connection-oriented transport 
unlike UDP), a connection is only fully
+        // established iff the channel is connected.  That is, a TCP-based 
channel must be in the CONNECTED state before
+        // anything can be read or written to the channel.
+        //
+        // See:
+        // - http://netty.io/3.9/api/org/jboss/netty/channel/ChannelEvent.html
+        // - 
http://stackoverflow.com/questions/13356622/what-are-the-netty-channel-state-transitions
+        return channel != null && channel.isConnected();
     }
 
     /**
-     * Enqueue task messages to be sent to server
+     * Note:  Storm will check via this method whether a worker can be 
activated safely during the initial startup of a
+     * topology.  The worker will only be activated once all of the its 
connections are ready.
      */
-    synchronized public void send(Iterator<TaskMessage> msgs) {
+    @Override
+    public Status status() {
+        if (closing) {
+            return Status.Closed;
+        }
+        else if (!connectionEstablished(channelRef.get())) {
+            return Status.Connecting;
+        }
+        else {
+            return Status.Ready;
+        }
+    }
+
+    /**
+     * Receiving messages is not supported by a client.
+     *
+     * @throws java.lang.UnsupportedOperationException whenever this method is 
being called.
+     */
+    @Override
+    public Iterator<TaskMessage> recv(int flags, int clientId) {
+        throw new UnsupportedOperationException("Client connection should not 
receive any messages");
+    }
 
-        // throw exception if the client is being closed
+    @Override
+    public void send(int taskId, byte[] payload) {
+        TaskMessage msg = new TaskMessage(taskId, payload);
+        List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
+        wrapper.add(msg);
+        send(wrapper.iterator());
+    }
+
+    /**
+     * Enqueue task messages to be sent to the remote destination (cf. `host` 
and `port`).
+     */
+    @Override
+    public synchronized void send(Iterator<TaskMessage> msgs) {
         if (closing) {
-            throw new RuntimeException("Client is being closed, and does not 
take requests any more");
+            int numMessages = iteratorSize(msgs);
+            LOG.warn("discarding {} messages because the Netty client to {} is 
being closed", numMessages,
+                dstAddressPrefixedName);
+            return;
         }
-        
-        if (null == msgs || !msgs.hasNext()) {
+
+        if (!hasMessages(msgs)) {
             return;
         }
 
         Channel channel = channelRef.get();
-        if (null == channel) {
-            connect();
-            channel = channelRef.get();
+        if (!connectionEstablished(channel)) {
+            // Closing the channel and reconnecting should be done before 
handling the messages.
+            closeChannelAndReconnect(channel);
+            handleMessagesWhenConnectionIsUnavailable(msgs);
+            return;
         }
 
+        // Collect messages into batches (to optimize network throughput), 
then flush them.
         while (msgs.hasNext()) {
-            if (!channel.isConnected()) {
-                connect();
-                channel = channelRef.get();
-            }
             TaskMessage message = msgs.next();
-            if (null == messageBatch) {
+            if (messageBatch == null) {
                 messageBatch = new MessageBatch(messageBatchSize);
             }
 
             messageBatch.add(message);
             if (messageBatch.isFull()) {
                 MessageBatch toBeFlushed = messageBatch;
-                flushRequest(channel, toBeFlushed);
+                flushMessages(channel, toBeFlushed);
                 messageBatch = null;
             }
         }
 
-        if (null != messageBatch && !messageBatch.isEmpty()) {
-            if (channel.isWritable()) {
-                flushCheckTimer.set(Long.MAX_VALUE);
-                
-                // Flush as fast as we can to reduce the latency
+        // Handle any remaining messages in case the "last" batch was not full.
+        if (containsMessages(messageBatch)) {
+            if (connectionEstablished(channel) && channel.isWritable()) {
+                // We can write to the channel, so we flush the remaining 
messages immediately to minimize latency.
+                pauseBackgroundFlushing();
                 MessageBatch toBeFlushed = messageBatch;
                 messageBatch = null;
-                flushRequest(channel, toBeFlushed);
-                
-            } else {
-                // when channel is NOT writable, it means the internal netty 
buffer is full. 
-                // In this case, we can try to buffer up more incoming 
messages.
-                flushCheckTimer.set(System.currentTimeMillis() + 
flushCheckInterval);
+                flushMessages(channel, toBeFlushed);
+            }
+            else {
+                // We cannot write to the channel, which means Netty's 
internal write buffer is full.
+                // In this case, we buffer the remaining messages and wait for 
the next messages to arrive.
+                //
+                // Background:
+                // Netty 3.x maintains an internal write buffer with a high 
water mark for each channel (default: 64K).
+                // This represents the amount of data waiting to be flushed to 
operating system buffers.  If the
+                // outstanding data exceeds this value then the channel is set 
to non-writable.  When this happens, a
+                // INTEREST_CHANGED channel event is triggered.  Netty sets 
the channel to writable again once the data
+                // has been flushed to the system buffers.
+                //
+                // See http://stackoverflow.com/questions/14049260
+                resumeBackgroundFlushing();
+                nextBackgroundFlushTimeMs.set(nowMillis() + 
flushCheckIntervalMs);
             }
         }
 
     }
 
-    public String name() {
-        if (null != remote_addr) {
-            return PREFIX + remote_addr.toString();
-        }
-        return "";
+    private boolean hasMessages(Iterator<TaskMessage> msgs) {
+        return msgs != null && msgs.hasNext();
     }
 
-    private synchronized void flush(Channel channel) {
-        if (!closing) {
-            if (null != messageBatch && !messageBatch.isEmpty()) {
-                MessageBatch toBeFlushed = messageBatch;
-                flushCheckTimer.set(Long.MAX_VALUE);
-                flushRequest(channel, toBeFlushed);
-                messageBatch = null;
+    /**
+     * We will drop pending messages and let at-least-once message replay kick 
in.
+     *
+     * Another option would be to buffer the messages in memory.  But this 
option has the risk of causing OOM errors,
+     * especially for topologies that disable message acking because we don't 
know whether the connection recovery will
+     * succeed  or not, and how long the recovery will take.
+     */
+    private void 
handleMessagesWhenConnectionIsUnavailable(Iterator<TaskMessage> msgs) {
+        LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
+        dropPendingMessages(msgs);
+    }
+
+    private void dropPendingMessages(Iterator<TaskMessage> msgs) {
+        // We consume the iterator by traversing and thus "emptying" it.
+        int msgCount = iteratorSize(msgs);
+        LOG.error("dropping {} pending message(s) destined for {}", msgCount, 
dstAddressPrefixedName);
+    }
+
+    private int iteratorSize(Iterator<TaskMessage> msgs) {
+        int size = 0;
+        if (msgs != null) {
+            while (msgs.hasNext()) {
+                size++;
+                msgs.next();
             }
         }
+        return size;
     }
-    
+
     /**
-     * gracefully close this client.
-     * 
-     * We will send all existing requests, and then invoke close_n_release()
-     * method
+     * Asynchronously writes the message batch to the channel.
+     *
+     * If the write operation fails, then we will close the channel and 
trigger a reconnect.
      */
-    public synchronized void close() {
-        if (!closing) {
-            closing = true;
-            LOG.info("Closing Netty Client " + name());
-            
-            if (null != messageBatch && !messageBatch.isEmpty()) {
-                MessageBatch toBeFlushed = messageBatch;
-                Channel channel = channelRef.get();
-                if (channel != null) {
-                    flushRequest(channel, toBeFlushed);
+    private synchronized void flushMessages(Channel channel, final 
MessageBatch batch) {
+        if (!containsMessages(batch)) {
+            return;
+        }
+
+        final int numMessages = batch.size();
+        pendingMessages.getAndAdd(numMessages);
+        LOG.debug("writing {} messages to channel {}", batch.size(), 
channel.toString());
+        ChannelFuture future = channel.write(batch);
+        future.addListener(new ChannelFutureListener() {
+
+            public void operationComplete(ChannelFuture future) throws 
Exception {
+                pendingMessages.getAndAdd(0 - numMessages);
+                if (future.isSuccess()) {
+                    LOG.debug("sent {} messages to {}", numMessages, 
dstAddressPrefixedName);
+                    messagesSent.getAndAdd(batch.size());
+                }
+                else {
+                    LOG.warn("failed to send {} messages to {}: {}", 
numMessages, dstAddressPrefixedName,
+                        future.getCause());
+                    closeChannelAndReconnect(future.getChannel());
+                    messagesLost.getAndAdd(numMessages);
                 }
-                messageBatch = null;
-            }
-        
-            //wait for pendings to exit
-            final long timeoutMilliSeconds = 600 * 1000; //600 seconds
-            final long start = System.currentTimeMillis();
-            
-            LOG.info("Waiting for pending batchs to be sent with "+ name() + 
"..., timeout: {}ms, pendings: {}", timeoutMilliSeconds, pendings.get());
-            
-            while(pendings.get() != 0) {
-                try {
-                    long delta = System.currentTimeMillis() - start;
-                    if (delta > timeoutMilliSeconds) {
-                        LOG.error("Timeout when sending pending batchs with 
{}..., there are still {} pending batchs not sent", name(), pendings.get());
-                        break;
-                    }
-                    Thread.sleep(1000); //sleep 1s
-                } catch (InterruptedException e) {
-                    break;
-                } 
             }
-            
-            close_n_release();
-        }
+
+        });
     }
 
-    /**
-     * close_n_release() is invoked after all messages have been sent.
-     */
-    private void close_n_release() {
-        if (channelRef.get() != null) {
-            channelRef.get().close();
-            LOG.debug("channel {} closed",remote_addr);
+    private synchronized void closeChannelAndReconnect(Channel channel) {
+        if (channel != null) {
+            channel.close();
+            if (channelRef.compareAndSet(channel, null)) {
+                connect(NO_DELAY_MS);
+            }
         }
     }
 
-    @Override
-    public Iterator<TaskMessage> recv(int flags, int clientId) {
-        throw new RuntimeException("Client connection should not receive any 
messages");
+    private boolean containsMessages(MessageBatch batch) {
+        return batch != null && !batch.isEmpty();
     }
 
+    /**
+     * Gracefully close this client.
+     *
+     * We will attempt to send any pending messages (i.e. messages currently 
buffered in memory) before closing the
+     * client.
+     */
     @Override
-    public void send(int taskId, byte[] payload) {
-        TaskMessage msg = new TaskMessage(taskId, payload);
-        List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
-        wrapper.add(msg);
-        send(wrapper.iterator());
+    public void close() {
+        if (!closing) {
+            LOG.info("closing Netty Client {}", dstAddressPrefixedName);
+            // Set closing to true to prevent any further reconnection 
attempts.
+            closing = true;
+            flushPendingMessages();
+            waitForPendingMessagesToBeSent();
+            closeChannel();
+        }
     }
 
-    private void flushRequest(Channel channel, final MessageBatch requests) {
-        if (requests == null)
-            return;
-
-        pendings.getAndAdd(requests.size());
-        ChannelFuture future = channel.write(requests);
-        future.addListener(new ChannelFutureListener() {
-            public void operationComplete(ChannelFuture future)
-                    throws Exception {
-
-                pendings.getAndAdd(0-requests.size());
-                if (!future.isSuccess()) {
-                    LOG.info(
-                            "failed to send requests to " + 
remote_addr.toString() + ": ", future.getCause());
-
-                    Channel channel = future.getChannel();
-
-                    if (null != channel) {
-                        channel.close();
-                        channelRef.compareAndSet(channel, null);
-                    }
-                    messagesLostReconnect.getAndAdd(requests.size());
-                } else {
-                    messagesSent.getAndAdd(requests.size());
-                    LOG.debug("{} request(s) sent", requests.size());
+    private synchronized void waitForPendingMessagesToBeSent() {
+        LOG.info("waiting up to {} ms to send {} pending messages to {}",
+            PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), 
dstAddressPrefixedName);
+        long totalPendingMsgs = pendingMessages.get();
+        long startMs = nowMillis();
+        while (pendingMessages.get() != 0) {
+            try {
+                long deltaMs = nowMillis() - startMs;
+                if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
+                    LOG.error("failed to send all pending messages to {} 
within timeout, {} of {} messages were not " +
+                        "sent", dstAddressPrefixedName, pendingMessages.get(), 
totalPendingMsgs);
+                    break;
                 }
+                Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
             }
-        });
+            catch (InterruptedException e) {
+                break;
+            }
+        }
+
+    }
+
+    private synchronized void closeChannel() {
+        if (channelRef.get() != null) {
+            channelRef.get().close();
+            LOG.debug("channel to {} closed", dstAddressPrefixedName);
+        }
     }
 
     @Override
     public Object getState() {
-        LOG.info("Getting metrics for connection to "+remote_addr);
+        LOG.info("Getting metrics for client connection to {}", 
dstAddressPrefixedName);
         HashMap<String, Object> ret = new HashMap<String, Object>();
-        ret.put("reconnects", totalReconnects.getAndSet(0));
+        ret.put("reconnects", totalConnectionAttempts.getAndSet(0));
         ret.put("sent", messagesSent.getAndSet(0));
-        ret.put("pending", pendings.get());
-        ret.put("lostOnSend", messagesLostReconnect.getAndSet(0));
-        ret.put("dest", remote_addr.toString());
+        ret.put("pending", pendingMessages.get());
+        ret.put("lostOnSend", messagesLost.getAndSet(0));
+        ret.put("dest", dstAddress.toString());
+        String src = srcAddressName();
+        if (src != null) {
+            ret.put("src", src);
+        }
+        return ret;
+    }
+
+    private String srcAddressName() {
+        String name = null;
         Channel c = channelRef.get();
         if (c != null) {
             SocketAddress address = c.getLocalAddress();
             if (address != null) {
-              ret.put("src", address.toString());
+                name = address.toString();
             }
         }
-        return ret;
+        return name;
+    }
+
+    @Override public String toString() {
+        return String.format("Netty client for connecting to %s", 
dstAddressPrefixedName);
     }
-}
 
+    /**
+     * Asynchronously establishes a Netty connection to the remote address, 
returning a Netty Channel on success.
+     */
+    private class Connector implements Callable<Channel> {
+
+        private final InetSocketAddress address;
+        private final int connectionAttempt;
+
+        public Connector(InetSocketAddress address, int connectionAttempt) {
+            this.address = address;
+            if (connectionAttempt < 1) {
+                throw new IllegalArgumentException("connection attempt must be 
>= 1 (you provided " +
+                    connectionAttempt + ")");
+            }
+            this.connectionAttempt = connectionAttempt;
+        }
+
+        @Override public Channel call() throws Exception {
+            LOG.debug("connecting to {} [attempt {}]", address.toString(), 
connectionAttempt);
+            Channel channel = null;
+            ChannelFuture future = bootstrap.connect(address);
+            future.awaitUninterruptibly();
+            Channel current = future.getChannel();
+
+            if (future.isSuccess() && connectionEstablished(current)) {
+                channel = current;
+                LOG.debug("successfully connected to {}, {} [attempt {}]", 
address.toString(), channel.toString(),
+                    connectionAttempt);
+            }
+            else {
+                LOG.debug("failed to connect to {} [attempt {}]", 
address.toString(), connectionAttempt);
+                if (current != null) {
+                    current.close();
+                }
+            }
+            return channel;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java 
b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
index f94cbc3..32ecb40 100644
--- 
a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
+++ 
b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
@@ -146,9 +146,8 @@ public class SaslStormClientHandler extends 
SimpleChannelUpstreamHandler {
     }
 
     private void getSASLCredentials() throws IOException {
-        topologyName = (String) this.client.storm_conf
-                .get(Config.TOPOLOGY_NAME);
-        String secretKey = SaslUtils.getSecretKey(this.client.storm_conf);
+        topologyName = (String) 
this.client.stormConf.get(Config.TOPOLOGY_NAME);
+        String secretKey = SaslUtils.getSecretKey(this.client.stormConf);
         if (secretKey != null) {
             token = secretKey.getBytes();
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java 
b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index d1f10e1..e984144 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -40,12 +40,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import backtype.storm.Config;
+import backtype.storm.messaging.ConnectionWithStatus;
 import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.TaskMessage;
 import backtype.storm.metric.api.IStatefulObject;
 import backtype.storm.utils.Utils;
 
-class Server implements IConnection, IStatefulObject {
+class Server extends ConnectionWithStatus implements IStatefulObject {
+
     private static final Logger LOG = LoggerFactory.getLogger(Server.class);
     @SuppressWarnings("rawtypes")
     Map storm_conf;
@@ -67,7 +69,7 @@ class Server implements IConnection, IStatefulObject {
     private volatile HashMap<Integer, Integer> taskToQueueId = null;
     int roundRobinQueueId;
        
-    boolean closing = false;
+    private volatile boolean closing = false;
     List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
     
     
@@ -120,45 +122,45 @@ class Server implements IConnection, IStatefulObject {
     }
     
     private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) {
-      ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
-      
-      for (int i = 0; i < msgs.size(); i++) {
-        TaskMessage message = msgs.get(i);
-        int task = message.task();
-        
-        if (task == -1) {
-          closing = true;
-          return null;
-        }
-        
-        Integer queueId = getMessageQueueId(task);
-        
-        if (null == messageGroups[queueId]) {
-          messageGroups[queueId] = new ArrayList<TaskMessage>();
+        ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
+
+        for (int i = 0; i < msgs.size(); i++) {
+            TaskMessage message = msgs.get(i);
+            int task = message.task();
+
+            if (task == -1) {
+                closing = true;
+                return null;
+            }
+
+            Integer queueId = getMessageQueueId(task);
+
+            if (null == messageGroups[queueId]) {
+                messageGroups[queueId] = new ArrayList<TaskMessage>();
+            }
+            messageGroups[queueId].add(message);
         }
-        messageGroups[queueId].add(message);
-      }
-      return messageGroups;
+        return messageGroups;
     }
     
     private Integer getMessageQueueId(int task) {
-      // try to construct the map from taskId -> queueId in round robin manner.
-      Integer queueId = taskToQueueId.get(task);
-      if (null == queueId) {
-        synchronized (this) {
-          queueId = taskToQueueId.get(task);
-          if (queueId == null) {
-            queueId = roundRobinQueueId++;
-            if (roundRobinQueueId == queueCount) {
-              roundRobinQueueId = 0;
+        // try to construct the map from taskId -> queueId in round robin 
manner.
+        Integer queueId = taskToQueueId.get(task);
+        if (null == queueId) {
+            synchronized (this) {
+                queueId = taskToQueueId.get(task);
+                if (queueId == null) {
+                    queueId = roundRobinQueueId++;
+                    if (roundRobinQueueId == queueCount) {
+                        roundRobinQueueId = 0;
+                    }
+                    HashMap<Integer, Integer> newRef = new HashMap<Integer, 
Integer>(taskToQueueId);
+                    newRef.put(task, queueId);
+                    taskToQueueId = newRef;
+                }
             }
-            HashMap<Integer, Integer> newRef = new HashMap<Integer, 
Integer>(taskToQueueId);
-            newRef.put(task, queueId);
-            taskToQueueId = newRef;
-          }
         }
-      }
-      return queueId;
+        return queueId;
     }
 
     private void addReceiveCount(String from, int amount) {
@@ -182,57 +184,57 @@ class Server implements IConnection, IStatefulObject {
 
     /**
      * enqueue a received message 
-     * @param message
      * @throws InterruptedException
      */
     protected void enqueue(List<TaskMessage> msgs, String from) throws 
InterruptedException {
-      
-      if (null == msgs || msgs.size() == 0 || closing) {
-        return;
-      }
-      addReceiveCount(from, msgs.size());
-      ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
-      
-      if (null == messageGroups || closing) {
-        return;
-      }
-      
-      for (int receiverId = 0; receiverId < messageGroups.length; 
receiverId++) {
-        ArrayList<TaskMessage> msgGroup = messageGroups[receiverId];
-        if (null != msgGroup) {
-          message_queue[receiverId].put(msgGroup);
-          pendingMessages[receiverId].addAndGet(msgGroup.size());
+        if (null == msgs || msgs.size() == 0 || closing) {
+            return;
+        }
+        addReceiveCount(from, msgs.size());
+        ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
+
+        if (null == messageGroups || closing) {
+            return;
+        }
+
+        for (int receiverId = 0; receiverId < messageGroups.length; 
receiverId++) {
+            ArrayList<TaskMessage> msgGroup = messageGroups[receiverId];
+            if (null != msgGroup) {
+                message_queue[receiverId].put(msgGroup);
+                pendingMessages[receiverId].addAndGet(msgGroup.size());
+            }
         }
-      }
     }
-    
-    public Iterator<TaskMessage> recv(int flags, int receiverId)  {
-      if (closing) {
-        return closeMessage.iterator();
-      }
-      
-      ArrayList<TaskMessage> ret = null; 
-      int queueId = receiverId % queueCount;
-      if ((flags & 0x01) == 0x01) { 
+
+    public Iterator<TaskMessage> recv(int flags, int receiverId) {
+        if (closing) {
+            return closeMessage.iterator();
+        }
+
+        ArrayList<TaskMessage> ret = null;
+        int queueId = receiverId % queueCount;
+        if ((flags & 0x01) == 0x01) {
             //non-blocking
             ret = message_queue[queueId].poll();
-        } else {
+        }
+        else {
             try {
                 ArrayList<TaskMessage> request = message_queue[queueId].take();
                 LOG.debug("request to be processed: {}", request);
                 ret = request;
-            } catch (InterruptedException e) {
+            }
+            catch (InterruptedException e) {
                 LOG.info("exception within msg receiving", e);
                 ret = null;
             }
         }
-      
-      if (null != ret) {
-        messagesDequeued.addAndGet(ret.size());
-        pendingMessages[queueId].addAndGet(0 - ret.size());
-        return ret.iterator();
-      }
-      return null;
+
+        if (null != ret) {
+            messagesDequeued.addAndGet(ret.size());
+            pendingMessages[queueId].addAndGet(0 - ret.size());
+            return ret.iterator();
+        }
+        return null;
     }
    
     /**
@@ -264,11 +266,11 @@ class Server implements IConnection, IStatefulObject {
     }
 
     public void send(int task, byte[] message) {
-        throw new RuntimeException("Server connection should not send any 
messages");
+        throw new UnsupportedOperationException("Server connection should not 
send any messages");
     }
     
     public void send(Iterator<TaskMessage> msgs) {
-      throw new RuntimeException("Server connection should not send any 
messages");
+      throw new UnsupportedOperationException("Server connection should not 
send any messages");
     }
        
     public String name() {
@@ -276,8 +278,35 @@ class Server implements IConnection, IStatefulObject {
     }
 
     @Override
+    public Status status() {
+        if (closing) {
+          return Status.Closed;
+        }
+        else if (!connectionEstablished(allChannels)) {
+            return Status.Connecting;
+        }
+        else {
+            return Status.Ready;
+        }
+    }
+
+    private boolean connectionEstablished(Channel channel) {
+      return channel != null && channel.isBound();
+    }
+
+    private boolean connectionEstablished(ChannelGroup allChannels) {
+        boolean allEstablished = true;
+        for (Channel channel : allChannels) {
+            if (!(connectionEstablished(channel))) {
+                allEstablished = false;
+                break;
+            }
+        }
+        return allEstablished;
+    }
+
     public Object getState() {
-        LOG.info("Getting metrics for server on " + port);
+        LOG.info("Getting metrics for server on port {}", port);
         HashMap<String, Object> ret = new HashMap<String, Object>();
         ret.put("dequeuedMessages", messagesDequeued.getAndSet(0));
         ArrayList<Integer> pending = new 
ArrayList<Integer>(pendingMessages.length);
@@ -300,4 +329,9 @@ class Server implements IConnection, IStatefulObject {
         ret.put("enqueued", enqueued);
         return ret;
     }
+
+    @Override public String toString() {
+       return String.format("Netty server listening on port %s", port);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
 
b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
index 1ea382b..2adfceb 100644
--- 
a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ 
b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@ -39,15 +39,14 @@ class StormClientPipelineFactory implements 
ChannelPipelineFactory {
         // Encoder
         pipeline.addLast("encoder", new MessageEncoder());
 
-        boolean isNettyAuth = (Boolean) this.client.storm_conf
-                .get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
+        boolean isNettyAuth = (Boolean) 
this.client.stormConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
         if (isNettyAuth) {
             // Authenticate: Removed after authentication completes
             pipeline.addLast("saslClientHandler", new SaslStormClientHandler(
                     client));
         }
         // business logic.
-        pipeline.addLast("handler", new 
StormClientErrorHandler(client.name()));
+        pipeline.addLast("handler", new 
StormClientErrorHandler(client.dstAddressPrefixedName));
 
         return pipeline;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj 
b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index 2061ddf..b152af2 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -16,14 +16,36 @@
 (ns backtype.storm.messaging.netty-unit-test
   (:use [clojure test])
   (:import [backtype.storm.messaging TransportFactory])
-  (:use [backtype.storm bootstrap testing util]))
+  (:use [backtype.storm bootstrap testing util])
+  (:use [backtype.storm.daemon.worker :only [is-connection-ready]]))
 
 (bootstrap)
 
 (def port 6700)
 (def task 1)
 
+;; In a "real" cluster (or an integration test), Storm itself would ensure 
that a topology's workers would only be
+;; activated once all the workers' connections are ready.  The tests in this 
file however launch Netty servers and
+;; clients directly, and thus we must ensure manually that the server and the 
client connections are ready before we
+;; commence testing.  If we don't do this, then we will lose the first 
messages being sent between the client and the
+;; server, which will fail the tests.
+(defn- wait-until-ready
+  ([connections]
+      (do (log-message "Waiting until all Netty connections are ready...")
+          (wait-until-ready connections 0)))
+  ([connections waited-ms]
+    (let [interval-ms 10
+          max-wait-ms 5000]
+      (if-not (every? is-connection-ready connections)
+        (if (<= waited-ms max-wait-ms)
+          (do
+            (Thread/sleep interval-ms)
+            (wait-until-ready connections (+ waited-ms interval-ms)))
+          (throw (RuntimeException. (str "Netty connections were not ready 
within " max-wait-ms " ms"))))
+        (log-message "All Netty connections are ready")))))
+
 (deftest test-basic
+  (log-message "Should send and receive a basic message")
   (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
         storm-conf {STORM-MESSAGING-TRANSPORT 
"backtype.storm.messaging.netty.Context"
                     STORM-MESSAGING-NETTY-AUTHENTICATION false
@@ -37,6 +59,7 @@
         context (TransportFactory/makeContext storm-conf)
         server (.bind context nil port)
         client (.connect context nil "localhost" port)
+        _ (wait-until-ready [server client])
         _ (.send client task (.getBytes req_msg))
         iter (.recv server 0 0)
         resp (.next iter)]
@@ -47,6 +70,7 @@
     (.term context)))
 
 (deftest test-large-msg
+  (log-message "Should send and receive a large message")
   (let [req_msg (apply str (repeat 2048000 'c'))
         storm-conf {STORM-MESSAGING-TRANSPORT 
"backtype.storm.messaging.netty.Context"
                     STORM-MESSAGING-NETTY-AUTHENTICATION false
@@ -60,6 +84,7 @@
         context (TransportFactory/makeContext storm-conf)
         server (.bind context nil port)
         client (.connect context nil "localhost" port)
+        _ (wait-until-ready [server client])
         _ (.send client task (.getBytes req_msg))
         iter (.recv server 0 0)
         resp (.next iter)]
@@ -69,39 +94,9 @@
     (.close server)
     (.term context)))
 
-(deftest test-server-delayed
-    (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
-       storm-conf {STORM-MESSAGING-TRANSPORT 
"backtype.storm.messaging.netty.Context"
-                    STORM-MESSAGING-NETTY-AUTHENTICATION false
-                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
-                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
-                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
-                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
-                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
-                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
-                    }
-        context (TransportFactory/makeContext storm-conf)
-        client (.connect context nil "localhost" port)
-
-        server (Thread.
-                (fn []
-                  (Thread/sleep 1000)
-                  (let [server (.bind context nil port)
-                        iter (.recv server 0 0)
-                        resp (.next iter)]
-                    (is (= task (.task resp)))
-                    (is (= req_msg (String. (.message resp))))
-                    (.close server)
-                  )))
-        _ (.start server)
-        _ (.send client task (.getBytes req_msg))
-        ]
-    (.close client)
-    (.join server)
-    (.term context)))
-
 (deftest test-batch
-  (let [storm-conf {STORM-MESSAGING-TRANSPORT 
"backtype.storm.messaging.netty.Context"
+  (let [num-messages 100000
+        storm-conf {STORM-MESSAGING-TRANSPORT 
"backtype.storm.messaging.netty.Context"
                     STORM-MESSAGING-NETTY-AUTHENTICATION false
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
@@ -110,23 +105,25 @@
                     STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
                     STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
                     }
+        _ (log-message "Should send and receive many messages (testing with " 
num-messages " messages)")
         context (TransportFactory/makeContext storm-conf)
         server (.bind context nil port)
-        client (.connect context nil "localhost" port)]
-    (doseq [num  (range 1 100000)]
+        client (.connect context nil "localhost" port)
+        _ (wait-until-ready [server client])]
+    (doseq [num  (range 1 num-messages)]
       (let [req_msg (str num)]
         (.send client task (.getBytes req_msg))))
 
     (let [resp (ArrayList.)
           received (atom 0)]
-      (while (< @received (- 100000 1))
+      (while (< @received (- num-messages 1))
         (let [iter (.recv server 0 0)]
           (while (.hasNext iter)
             (let [msg (.next iter)]
               (.add resp msg)
               (swap! received inc)
               ))))
-      (doseq [num  (range 1 100000)]
+      (doseq [num  (range 1 num-messages)]
       (let [req_msg (str num)
             resp_msg (String. (.message (.get resp (- num 1))))]
         (is (= req_msg resp_msg)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/test/clj/backtype/storm/worker_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/worker_test.clj 
b/storm-core/test/clj/backtype/storm/worker_test.clj
new file mode 100644
index 0000000..f09baef
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/worker_test.clj
@@ -0,0 +1,38 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.worker-test
+  (:use [clojure test])
+  (:import [backtype.storm.messaging TaskMessage IContext IConnection 
ConnectionWithStatus ConnectionWithStatus$Status])
+  (:import [org.mockito Mockito])
+  (:use [backtype.storm bootstrap testing])
+  (:use [backtype.storm.daemon common])
+
+  (:require [backtype.storm.daemon [worker :as worker]])
+  )
+
+(bootstrap)
+
+(deftest test-worker-is-connection-ready
+  (let [connection (Mockito/mock ConnectionWithStatus)]
+    (. (Mockito/when (.status connection)) thenReturn 
ConnectionWithStatus$Status/Ready)
+    (is (= true (worker/is-connection-ready connection)))
+
+    (. (Mockito/when (.status connection)) thenReturn 
ConnectionWithStatus$Status/Connecting)
+    (is (= false (worker/is-connection-ready connection)))
+
+    (. (Mockito/when (.status connection)) thenReturn 
ConnectionWithStatus$Status/Closed)
+    (is (= false (worker/is-connection-ready connection)))
+  ))
\ No newline at end of file

Reply via email to