Repository: storm Updated Branches: refs/heads/0.9.x-branch 19a4e129e -> ed2adde71
Netty Fix. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8a295935 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8a295935 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8a295935 Branch: refs/heads/0.9.x-branch Commit: 8a29593539869e9144af45e8da11bf719f435d05 Parents: 19a4e12 Author: Kyle Nusbaum <[email protected]> Authored: Tue Sep 1 12:13:05 2015 -0500 Committer: Jungtaek Lim <[email protected]> Committed: Sat Oct 17 23:11:48 2015 +0900 ---------------------------------------------------------------------- .../backtype/storm/messaging/netty/Client.java | 37 ++++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/8a295935/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java index 067dddc..56ed300 100644 --- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java +++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java @@ -119,6 +119,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject { private final MessageBuffer batcher; + private final Object writeLock = new Object(); + @SuppressWarnings("rawtypes") Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port) { closing = false; @@ -240,21 +242,24 @@ public class Client extends ConnectionWithStatus implements IStatefulObject { return; } - - while (msgs.hasNext()) { - TaskMessage message = msgs.next(); - MessageBatch full = batcher.add(message); - if(full != null){ - flushMessages(channel, full); + synchronized (writeLock) { + while (msgs.hasNext()) { + TaskMessage message = msgs.next(); + MessageBatch full = batcher.add(message); + if(full != null){ + flushMessages(channel, full); + } } } - + if(channel.isWritable()){ - // Netty's internal buffer is not full and we still have message left in the buffer. - // We should write the unfilled MessageBatch immediately to reduce latency - MessageBatch batch = batcher.drain(); - if(batch != null) { - flushMessages(channel, batch); + synchronized (writeLock) { + // Netty's internal buffer is not full and we still have message left in the buffer. + // We should write the unfilled MessageBatch immediately to reduce latency + MessageBatch batch = batcher.drain(); + if(batch != null) { + flushMessages(channel, batch); + } } } else { // Channel's buffer is full, meaning that we have time to wait other messages to arrive, and create a bigger @@ -434,9 +439,11 @@ public class Client extends ConnectionWithStatus implements IStatefulObject { */ public void notifyInterestChanged(Channel channel) { if(channel.isWritable()){ - // Channel is writable again, write if there are any messages pending - MessageBatch pending = batcher.drain(); - flushMessages(channel, pending); + synchronized (writeLock) { + // Channel is writable again, write if there are any messages pending + MessageBatch pending = batcher.drain(); + flushMessages(channel, pending); + } } }
