STORM-835 Netty Client hold batch object until io operation complete
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6a230e5b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6a230e5b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6a230e5b Branch: refs/heads/0.10.x-branch Commit: 6a230e5b8f304985a40e36d363e985e3a68c1800 Parents: cdce04f Author: zhanghailei <[email protected]> Authored: Mon May 25 10:00:12 2015 +0800 Committer: P. Taylor Goetz <[email protected]> Committed: Fri May 29 12:10:31 2015 -0400 ---------------------------------------------------------------------- storm-core/src/jvm/backtype/storm/messaging/netty/Client.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/6a230e5b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java index f332bb3..ac3f3f2 100644 --- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java +++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java @@ -461,7 +461,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject { * * If the write operation fails, then we will close the channel and trigger a reconnect. */ - private synchronized void flushMessages(Channel channel, final MessageBatch batch) { + private synchronized void flushMessages(Channel channel, MessageBatch batch) { if (!containsMessages(batch)) { return; } @@ -476,7 +476,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject { pendingMessages.getAndAdd(0 - numMessages); if (future.isSuccess()) { LOG.debug("sent {} messages to {}", numMessages, dstAddressPrefixedName); - messagesSent.getAndAdd(batch.size()); + messagesSent.getAndAdd(numMessages); } else { LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
