Repository: incubator-apex-core
Updated Branches:
  refs/heads/master 9eba29de8 -> 6ff4743d0


APEXCORE-398 - Ack may not be delivered from buffer server to it's client


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/6ff4743d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/6ff4743d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/6ff4743d

Branch: refs/heads/master
Commit: 6ff4743d0df8edeb16bfa62decfdbc552aa8a28a
Parents: 9eba29d
Author: Vlad Rozov <[email protected]>
Authored: Mon Mar 21 13:22:58 2016 -0700
Committer: Thomas Weise <[email protected]>
Committed: Mon Mar 21 15:30:20 2016 -0700

----------------------------------------------------------------------
 .../com/datatorrent/bufferserver/server/Server.java  | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6ff4743d/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 8a1fac7..85030dd 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -191,7 +191,12 @@ public class Server implements ServerListener
 
     final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length);
     System.arraycopy(message, 0, tuple, tuple.length - message.length, 
message.length);
-    ctx.write(tuple);
+    if (ctx.write(tuple)) {
+      ctx.write();
+    } else {
+      logger.error("Failed to deliver purge ack message. {} send buffers are 
full.", ctx);
+      throw new RuntimeException("Failed to deliver purge ack message. " + ctx 
+ "send buffers are full.");
+    }
   }
 
   private void handleResetRequest(ResetRequestTuple request, final 
AbstractLengthPrependerClient ctx) throws IOException
@@ -213,7 +218,12 @@ public class Server implements ServerListener
 
     final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length);
     System.arraycopy(message, 0, tuple, tuple.length - message.length, 
message.length);
-    ctx.write(tuple);
+    if (ctx.write(tuple)) {
+      ctx.write();
+    } else {
+      logger.error("Failed to deliver reset ack message. {} send buffers are 
full.", ctx);
+      throw new RuntimeException("Failed to deliver reset ack message. " + ctx 
+ "send buffers are full.");
+    }
   }
 
   /**
@@ -376,6 +386,7 @@ public class Server implements ServerListener
       key.attach(client);
       key.interestOps(SelectionKey.OP_READ);
       client.registered(key);
+      client.connected();
 
       int len = writeOffset - readOffset - size;
       if (len > 0) {

Reply via email to