Repository: storm
Updated Branches:
  refs/heads/master ff9142dd0 -> 260830e33


Netty Fix.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/11886c0c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/11886c0c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/11886c0c

Branch: refs/heads/master
Commit: 11886c0c4c3aa2547b1ef571712bed83229dc4e4
Parents: 4368781
Author: Kyle Nusbaum <[email protected]>
Authored: Tue Sep 1 12:13:05 2015 -0500
Committer: Kyle Nusbaum <[email protected]>
Committed: Tue Sep 1 12:13:05 2015 -0500

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 37 ++++++++++++--------
 1 file changed, 22 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/11886c0c/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java 
b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 5b130fa..8300b1a 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -122,6 +122,8 @@ public class Client extends ConnectionWithStatus implements 
IStatefulObject {
 
     private final MessageBuffer batcher;
 
+    private final Object writeLock = new Object();
+    
     @SuppressWarnings("rawtypes")
     Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, 
String host, int port, Context context) {
         this.stormConf = stormConf;
@@ -245,21 +247,24 @@ public class Client extends ConnectionWithStatus 
implements IStatefulObject {
             return;
         }
 
-
-        while (msgs.hasNext()) {
-            TaskMessage message = msgs.next();
-            MessageBatch full = batcher.add(message);
-            if(full != null){
-                flushMessages(channel, full);
+        synchronized (writeLock) {
+            while (msgs.hasNext()) {
+                TaskMessage message = msgs.next();
+                MessageBatch full = batcher.add(message);
+                if(full != null){
+                    flushMessages(channel, full);
+                }
             }
         }
-
+            
         if(channel.isWritable()){
-            // Netty's internal buffer is not full and we still have message 
left in the buffer.
-            // We should write the unfilled MessageBatch immediately to reduce 
latency
-            MessageBatch batch = batcher.drain();
-            if(batch != null) {
-                flushMessages(channel, batch);
+            synchronized (writeLock) {
+                // Netty's internal buffer is not full and we still have 
message left in the buffer.
+                // We should write the unfilled MessageBatch immediately to 
reduce latency
+                MessageBatch batch = batcher.drain();
+                if(batch != null) {
+                    flushMessages(channel, batch);
+                }
             }
         } else {
             // Channel's buffer is full, meaning that we have time to wait 
other messages to arrive, and create a bigger
@@ -444,9 +449,11 @@ public class Client extends ConnectionWithStatus 
implements IStatefulObject {
      */
     public void notifyInterestChanged(Channel channel) {
         if(channel.isWritable()){
-            // Channel is writable again, write if there are any messages 
pending
-            MessageBatch pending = batcher.drain();
-            flushMessages(channel, pending);
+            synchronized (writeLock) {
+                // Channel is writable again, write if there are any messages 
pending
+                MessageBatch pending = batcher.drain();
+                flushMessages(channel, pending);
+            }
         }
     }
 

Reply via email to