Repository: storm
Updated Branches:
  refs/heads/0.9.x-branch 19a4e129e -> ed2adde71


Netty Fix.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8a295935
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8a295935
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8a295935

Branch: refs/heads/0.9.x-branch
Commit: 8a29593539869e9144af45e8da11bf719f435d05
Parents: 19a4e12
Author: Kyle Nusbaum <[email protected]>
Authored: Tue Sep 1 12:13:05 2015 -0500
Committer: Jungtaek Lim <[email protected]>
Committed: Sat Oct 17 23:11:48 2015 +0900

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 37 ++++++++++++--------
 1 file changed, 22 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8a295935/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java 
b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 067dddc..56ed300 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -119,6 +119,8 @@ public class Client extends ConnectionWithStatus implements 
IStatefulObject {
 
     private final MessageBuffer batcher;
 
+    private final Object writeLock = new Object();
+    
     @SuppressWarnings("rawtypes")
     Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, 
String host, int port) {
         closing = false;
@@ -240,21 +242,24 @@ public class Client extends ConnectionWithStatus 
implements IStatefulObject {
             return;
         }
 
-
-        while (msgs.hasNext()) {
-            TaskMessage message = msgs.next();
-            MessageBatch full = batcher.add(message);
-            if(full != null){
-                flushMessages(channel, full);
+        synchronized (writeLock) {
+            while (msgs.hasNext()) {
+                TaskMessage message = msgs.next();
+                MessageBatch full = batcher.add(message);
+                if(full != null){
+                    flushMessages(channel, full);
+                }
             }
         }
-
+            
         if(channel.isWritable()){
-            // Netty's internal buffer is not full and we still have message 
left in the buffer.
-            // We should write the unfilled MessageBatch immediately to reduce 
latency
-            MessageBatch batch = batcher.drain();
-            if(batch != null) {
-                flushMessages(channel, batch);
+            synchronized (writeLock) {
+                // Netty's internal buffer is not full and we still have 
message left in the buffer.
+                // We should write the unfilled MessageBatch immediately to 
reduce latency
+                MessageBatch batch = batcher.drain();
+                if(batch != null) {
+                    flushMessages(channel, batch);
+                }
             }
         } else {
             // Channel's buffer is full, meaning that we have time to wait 
other messages to arrive, and create a bigger
@@ -434,9 +439,11 @@ public class Client extends ConnectionWithStatus 
implements IStatefulObject {
      */
     public void notifyInterestChanged(Channel channel) {
         if(channel.isWritable()){
-            // Channel is writable again, write if there are any messages 
pending
-            MessageBatch pending = batcher.drain();
-            flushMessages(channel, pending);
+            synchronized (writeLock) {
+                // Channel is writable again, write if there are any messages 
pending
+                MessageBatch pending = batcher.drain();
+                flushMessages(channel, pending);
+            }
         }
     }
 

Reply via email to