Repository: incubator-apex-core Updated Branches: refs/heads/release-3.3 68964195a -> ef1caa4d4
APEXCORE-398 - Ack may not be delivered from buffer server to it's client Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/ef1caa4d Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/ef1caa4d Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/ef1caa4d Branch: refs/heads/release-3.3 Commit: ef1caa4d4f9d361eeb151c96c5e8f17c8d63cdca Parents: 6896419 Author: Vlad Rozov <[email protected]> Authored: Mon Mar 21 13:22:58 2016 -0700 Committer: Thomas Weise <[email protected]> Committed: Mon Mar 21 15:30:52 2016 -0700 ---------------------------------------------------------------------- .../com/datatorrent/bufferserver/server/Server.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ef1caa4d/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 8a1fac7..85030dd 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -191,7 +191,12 @@ public class Server implements ServerListener final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length); System.arraycopy(message, 0, tuple, tuple.length - message.length, message.length); - ctx.write(tuple); + if (ctx.write(tuple)) { + ctx.write(); + } else { + logger.error("Failed to deliver purge ack message. {} send buffers are full.", ctx); + throw new RuntimeException("Failed to deliver purge ack message. " + ctx + "send buffers are full."); + } } private void handleResetRequest(ResetRequestTuple request, final AbstractLengthPrependerClient ctx) throws IOException @@ -213,7 +218,12 @@ public class Server implements ServerListener final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length); System.arraycopy(message, 0, tuple, tuple.length - message.length, message.length); - ctx.write(tuple); + if (ctx.write(tuple)) { + ctx.write(); + } else { + logger.error("Failed to deliver reset ack message. {} send buffers are full.", ctx); + throw new RuntimeException("Failed to deliver reset ack message. " + ctx + "send buffers are full."); + } } /** @@ -376,6 +386,7 @@ public class Server implements ServerListener key.attach(client); key.interestOps(SelectionKey.OP_READ); client.registered(key); + client.connected(); int len = writeOffset - readOffset - size; if (len > 0) {
