STORM-835 Netty Client hold batch object until io operation complete

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6a230e5b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6a230e5b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6a230e5b

Branch: refs/heads/0.10.x-branch
Commit: 6a230e5b8f304985a40e36d363e985e3a68c1800
Parents: cdce04f
Author: zhanghailei <[email protected]>
Authored: Mon May 25 10:00:12 2015 +0800
Committer: P. Taylor Goetz <[email protected]>
Committed: Fri May 29 12:10:31 2015 -0400

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/messaging/netty/Client.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6a230e5b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java 
b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index f332bb3..ac3f3f2 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -461,7 +461,7 @@ public class Client extends ConnectionWithStatus implements 
IStatefulObject {
      *
      * If the write operation fails, then we will close the channel and 
trigger a reconnect.
      */
-    private synchronized void flushMessages(Channel channel, final 
MessageBatch batch) {
+    private synchronized void flushMessages(Channel channel, MessageBatch 
batch) {
         if (!containsMessages(batch)) {
             return;
         }
@@ -476,7 +476,7 @@ public class Client extends ConnectionWithStatus implements 
IStatefulObject {
                 pendingMessages.getAndAdd(0 - numMessages);
                 if (future.isSuccess()) {
                     LOG.debug("sent {} messages to {}", numMessages, 
dstAddressPrefixedName);
-                    messagesSent.getAndAdd(batch.size());
+                    messagesSent.getAndAdd(numMessages);
                 }
                 else {
                     LOG.error("failed to send {} messages to {}: {}", 
numMessages, dstAddressPrefixedName,

Reply via email to