Repository: incubator-apex-core Updated Branches: refs/heads/release-3.2 39d675d61 -> d8416f7e0
APEXCORE-398 - Ack may not be delivered from buffer server to it's client Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d8416f7e Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d8416f7e Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d8416f7e Branch: refs/heads/release-3.2 Commit: d8416f7e00af56df6000e1be1c143b0e18baaf61 Parents: 39d675d Author: Vlad Rozov <[email protected]> Authored: Mon Mar 21 13:22:58 2016 -0700 Committer: Vlad Rozov <[email protected]> Committed: Mon Mar 21 13:22:58 2016 -0700 ---------------------------------------------------------------------- .../com/datatorrent/bufferserver/server/Server.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d8416f7e/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index c39605c..65fcf51 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -188,7 +188,12 @@ public class Server implements ServerListener final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length); System.arraycopy(message, 0, tuple, tuple.length - message.length, message.length); - ctx.write(tuple); + if (ctx.write(tuple)) { + ctx.write(); + } else { + logger.error("Failed to deliver purge ack message. {} send buffers are full.", ctx); + throw new RuntimeException("Failed to deliver purge ack message. " + ctx + "send buffers are full."); + } } private void handleResetRequest(ResetRequestTuple request, final AbstractLengthPrependerClient ctx) throws IOException @@ -210,7 +215,12 @@ public class Server implements ServerListener final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length); System.arraycopy(message, 0, tuple, tuple.length - message.length, message.length); - ctx.write(tuple); + if (ctx.write(tuple)) { + ctx.write(); + } else { + logger.error("Failed to deliver reset ack message. {} send buffers are full.", ctx); + throw new RuntimeException("Failed to deliver reset ack message. " + ctx + "send buffers are full."); + } } /** @@ -369,6 +379,7 @@ public class Server implements ServerListener key.attach(client); key.interestOps(SelectionKey.OP_READ); client.registered(key); + client.connected(); int len = writeOffset - readOffset - size; if (len > 0) {
