Repository: incubator-apex-core Updated Branches: refs/heads/master 9eba29de8 -> 6ff4743d0
APEXCORE-398 - Ack may not be delivered from buffer server to it's client Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/6ff4743d Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/6ff4743d Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/6ff4743d Branch: refs/heads/master Commit: 6ff4743d0df8edeb16bfa62decfdbc552aa8a28a Parents: 9eba29d Author: Vlad Rozov <[email protected]> Authored: Mon Mar 21 13:22:58 2016 -0700 Committer: Thomas Weise <[email protected]> Committed: Mon Mar 21 15:30:20 2016 -0700 ---------------------------------------------------------------------- .../com/datatorrent/bufferserver/server/Server.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6ff4743d/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 8a1fac7..85030dd 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -191,7 +191,12 @@ public class Server implements ServerListener final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length); System.arraycopy(message, 0, tuple, tuple.length - message.length, message.length); - ctx.write(tuple); + if (ctx.write(tuple)) { + ctx.write(); + } else { + logger.error("Failed to deliver purge ack message. {} send buffers are full.", ctx); + throw new RuntimeException("Failed to deliver purge ack message. " + ctx + "send buffers are full."); + } } private void handleResetRequest(ResetRequestTuple request, final AbstractLengthPrependerClient ctx) throws IOException @@ -213,7 +218,12 @@ public class Server implements ServerListener final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length); System.arraycopy(message, 0, tuple, tuple.length - message.length, message.length); - ctx.write(tuple); + if (ctx.write(tuple)) { + ctx.write(); + } else { + logger.error("Failed to deliver reset ack message. {} send buffers are full.", ctx); + throw new RuntimeException("Failed to deliver reset ack message. " + ctx + "send buffers are full."); + } } /** @@ -376,6 +386,7 @@ public class Server implements ServerListener key.attach(client); key.interestOps(SelectionKey.OP_READ); client.registered(key); + client.connected(); int len = writeOffset - readOffset - size; if (len > 0) {
