Repository: storm Updated Branches: refs/heads/0.10.x-branch e378dbef4 -> 631d1cb2e
Netty Fix. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fc00ca47 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fc00ca47 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fc00ca47 Branch: refs/heads/0.10.x-branch Commit: fc00ca47097be89478fa3ac9f296b74b8cbde43f Parents: e378dbe Author: Kyle Nusbaum <[email protected]> Authored: Tue Sep 1 12:13:05 2015 -0500 Committer: Jungtaek Lim <[email protected]> Committed: Sat Sep 5 10:49:37 2015 +0900 ---------------------------------------------------------------------- .../backtype/storm/messaging/netty/Client.java | 37 ++++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/fc00ca47/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java index 5b130fa..8300b1a 100644 --- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java +++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java @@ -122,6 +122,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject { private final MessageBuffer batcher; + private final Object writeLock = new Object(); + @SuppressWarnings("rawtypes") Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port, Context context) { this.stormConf = stormConf; @@ -245,21 +247,24 @@ public class Client extends ConnectionWithStatus implements IStatefulObject { return; } - - while (msgs.hasNext()) { - TaskMessage message = msgs.next(); - MessageBatch full = batcher.add(message); - if(full != null){ - flushMessages(channel, full); + synchronized (writeLock) { + while (msgs.hasNext()) { + TaskMessage message = msgs.next(); + MessageBatch full = batcher.add(message); + if(full != null){ + flushMessages(channel, full); + } } } - + if(channel.isWritable()){ - // Netty's internal buffer is not full and we still have message left in the buffer. - // We should write the unfilled MessageBatch immediately to reduce latency - MessageBatch batch = batcher.drain(); - if(batch != null) { - flushMessages(channel, batch); + synchronized (writeLock) { + // Netty's internal buffer is not full and we still have message left in the buffer. + // We should write the unfilled MessageBatch immediately to reduce latency + MessageBatch batch = batcher.drain(); + if(batch != null) { + flushMessages(channel, batch); + } } } else { // Channel's buffer is full, meaning that we have time to wait other messages to arrive, and create a bigger @@ -444,9 +449,11 @@ public class Client extends ConnectionWithStatus implements IStatefulObject { */ public void notifyInterestChanged(Channel channel) { if(channel.isWritable()){ - // Channel is writable again, write if there are any messages pending - MessageBatch pending = batcher.drain(); - flushMessages(channel, pending); + synchronized (writeLock) { + // Channel is writable again, write if there are any messages pending + MessageBatch pending = batcher.drain(); + flushMessages(channel, pending); + } } }
