Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.3 68964195a -> ef1caa4d4


APEXCORE-398 - Ack may not be delivered from buffer server to it's client


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/ef1caa4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/ef1caa4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/ef1caa4d

Branch: refs/heads/release-3.3
Commit: ef1caa4d4f9d361eeb151c96c5e8f17c8d63cdca
Parents: 6896419
Author: Vlad Rozov <[email protected]>
Authored: Mon Mar 21 13:22:58 2016 -0700
Committer: Thomas Weise <[email protected]>
Committed: Mon Mar 21 15:30:52 2016 -0700

----------------------------------------------------------------------
 .../com/datatorrent/bufferserver/server/Server.java  | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ef1caa4d/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 8a1fac7..85030dd 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -191,7 +191,12 @@ public class Server implements ServerListener
 
     final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length);
     System.arraycopy(message, 0, tuple, tuple.length - message.length, 
message.length);
-    ctx.write(tuple);
+    if (ctx.write(tuple)) {
+      ctx.write();
+    } else {
+      logger.error("Failed to deliver purge ack message. {} send buffers are 
full.", ctx);
+      throw new RuntimeException("Failed to deliver purge ack message. " + ctx 
+ "send buffers are full.");
+    }
   }
 
   private void handleResetRequest(ResetRequestTuple request, final 
AbstractLengthPrependerClient ctx) throws IOException
@@ -213,7 +218,12 @@ public class Server implements ServerListener
 
     final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length);
     System.arraycopy(message, 0, tuple, tuple.length - message.length, 
message.length);
-    ctx.write(tuple);
+    if (ctx.write(tuple)) {
+      ctx.write();
+    } else {
+      logger.error("Failed to deliver reset ack message. {} send buffers are 
full.", ctx);
+      throw new RuntimeException("Failed to deliver reset ack message. " + ctx 
+ "send buffers are full.");
+    }
   }
 
   /**
@@ -376,6 +386,7 @@ public class Server implements ServerListener
       key.attach(client);
       key.interestOps(SelectionKey.OP_READ);
       client.registered(key);
+      client.connected();
 
       int len = writeOffset - readOffset - size;
       if (len > 0) {

Reply via email to