Repository: flink Updated Branches: refs/heads/master 11643c0cc -> 0dea359b3
[FLINK-2134] Close Netty channel via CloseRequest msg This closes #773. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0dea359b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0dea359b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0dea359b Branch: refs/heads/master Commit: 0dea359b30c15abc07b5c9af8f775adf235a6cb0 Parents: 11643c0 Author: Ufuk Celebi <[email protected]> Authored: Wed Jun 3 18:41:40 2015 +0200 Committer: Ufuk Celebi <[email protected]> Committed: Thu Jun 4 11:16:03 2015 +0200 ---------------------------------------------------------------------- .../runtime/io/network/netty/NettyMessage.java | 20 ++++++++++++++++++++ .../network/netty/PartitionRequestClient.java | 15 +++++++++++++-- .../io/network/netty/PartitionRequestQueue.java | 6 ++++++ .../netty/PartitionRequestServerHandler.java | 4 ++++ .../netty/NettyMessageSerializationTest.java | 7 +++++++ 5 files changed, 50 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0dea359b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java index 53afd03..1540369 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java @@ -150,6 +150,9 @@ abstract class NettyMessage { else if (msgId == CancelPartitionRequest.ID) { decodedMsg = new CancelPartitionRequest(); } + else if (msgId == CloseRequest.ID) { + decodedMsg = new CloseRequest(); + } else { throw new IllegalStateException("Received unknown message from producer: " + msg); } @@ -521,6 +524,23 @@ abstract class NettyMessage { } } + static class CloseRequest extends NettyMessage { + + private static final byte ID = 5; + + public CloseRequest() { + } + + @Override + ByteBuf write(ByteBufAllocator allocator) throws Exception { + return allocateBuffer(allocator, ID, 0); + } + + @Override + void readFrom(ByteBuf buffer) throws Exception { + } + } + // ------------------------------------------------------------------------ private static class ByteBufDataInputView implements DataInputView { http://git-wip-us.apache.org/repos/asf/flink/blob/0dea359b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java index 3049af6..78f6398 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java @@ -96,6 +96,8 @@ public class PartitionRequestClient { final RemoteInputChannel inputChannel, int delayMs) throws IOException { + checkNotClosed(); + LOG.debug("Requesting subpartition {} of partition {} with {} ms delay.", subpartitionIndex, partitionId, delayMs); @@ -146,6 +148,7 @@ public class PartitionRequestClient { * consumer task run pipelined. */ public void sendTaskEvent(ResultPartitionID partitionId, TaskEvent event, final RemoteInputChannel inputChannel) throws IOException { + checkNotClosed(); tcpChannel.writeAndFlush(new TaskEventRequest(event, partitionId, inputChannel.getInputChannelId())) .addListener( @@ -167,8 +170,10 @@ public class PartitionRequestClient { partitionRequestHandler.removeInputChannel(inputChannel); if (closeReferenceCounter.decrement()) { - // Close the TCP connection - tcpChannel.close(); + // Close the TCP connection. Send a close request msg to ensure + // that outstanding backwards task events are not discarded. + tcpChannel.writeAndFlush(new NettyMessage.CloseRequest()) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); // Make sure to remove the client from the factory clientFactory.destroyPartitionRequestClient(connectionId, this); @@ -177,4 +182,10 @@ public class PartitionRequestClient { partitionRequestHandler.cancelRequestFor(inputChannel.getInputChannelId()); } } + + private void checkNotClosed() throws IOException { + if (closeReferenceCounter.isDisposed()) { + throw new LocalTransportException("Channel closed.", tcpChannel.localAddress()); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/0dea359b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java index 5301195..bb8c851 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java @@ -79,6 +79,12 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { ctx.pipeline().fireUserEventTriggered(receiverId); } + public void close() { + if (ctx != null) { + ctx.channel().close(); + } + } + @Override public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg.getClass() == SequenceNumberingSubpartitionView.class) { http://git-wip-us.apache.org/repos/asf/flink/blob/0dea359b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java index 90c93e5..e278d07 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionRequest; +import org.apache.flink.runtime.io.network.netty.NettyMessage.CloseRequest; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; @@ -120,6 +121,9 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes outboundQueue.cancel(request.receiverId); } + else if (msgClazz == CloseRequest.class) { + outboundQueue.close(); + } else { LOG.warn("Received unexpected client request: {}", msg); } http://git-wip-us.apache.org/repos/asf/flink/blob/0dea359b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java index b1315be..60241e3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java @@ -147,6 +147,13 @@ public class NettyMessageSerializationTest { assertEquals(expected.receiverId, actual.receiverId); } + + { + NettyMessage.CloseRequest expected = new NettyMessage.CloseRequest(); + NettyMessage.CloseRequest actual = encodeAndDecode(expected); + + assertEquals(expected.getClass(), actual.getClass()); + } } @SuppressWarnings("unchecked")
