Repository: flink
Updated Branches:
  refs/heads/master 11643c0cc -> 0dea359b3


[FLINK-2134] Close Netty channel via CloseRequest msg

This closes #773.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0dea359b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0dea359b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0dea359b

Branch: refs/heads/master
Commit: 0dea359b30c15abc07b5c9af8f775adf235a6cb0
Parents: 11643c0
Author: Ufuk Celebi <[email protected]>
Authored: Wed Jun 3 18:41:40 2015 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Thu Jun 4 11:16:03 2015 +0200

----------------------------------------------------------------------
 .../runtime/io/network/netty/NettyMessage.java  | 20 ++++++++++++++++++++
 .../network/netty/PartitionRequestClient.java   | 15 +++++++++++++--
 .../io/network/netty/PartitionRequestQueue.java |  6 ++++++
 .../netty/PartitionRequestServerHandler.java    |  4 ++++
 .../netty/NettyMessageSerializationTest.java    |  7 +++++++
 5 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0dea359b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 53afd03..1540369 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -150,6 +150,9 @@ abstract class NettyMessage {
                        else if (msgId == CancelPartitionRequest.ID) {
                                decodedMsg = new CancelPartitionRequest();
                        }
+                       else if (msgId == CloseRequest.ID) {
+                               decodedMsg = new CloseRequest();
+                       }
                        else {
                                throw new IllegalStateException("Received 
unknown message from producer: " + msg);
                        }
@@ -521,6 +524,23 @@ abstract class NettyMessage {
                }
        }
 
+       static class CloseRequest extends NettyMessage {
+
+               private static final byte ID = 5;
+
+               public CloseRequest() {
+               }
+
+               @Override
+               ByteBuf write(ByteBufAllocator allocator) throws Exception {
+                       return allocateBuffer(allocator, ID, 0);
+               }
+
+               @Override
+               void readFrom(ByteBuf buffer) throws Exception {
+               }
+       }
+
        // 
------------------------------------------------------------------------
 
        private static class ByteBufDataInputView implements DataInputView {

http://git-wip-us.apache.org/repos/asf/flink/blob/0dea359b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index 3049af6..78f6398 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -96,6 +96,8 @@ public class PartitionRequestClient {
                        final RemoteInputChannel inputChannel,
                        int delayMs) throws IOException {
 
+               checkNotClosed();
+
                LOG.debug("Requesting subpartition {} of partition {} with {} 
ms delay.",
                                subpartitionIndex, partitionId, delayMs);
 
@@ -146,6 +148,7 @@ public class PartitionRequestClient {
         * consumer task run pipelined.
         */
        public void sendTaskEvent(ResultPartitionID partitionId, TaskEvent 
event, final RemoteInputChannel inputChannel) throws IOException {
+               checkNotClosed();
 
                tcpChannel.writeAndFlush(new TaskEventRequest(event, 
partitionId, inputChannel.getInputChannelId()))
                                .addListener(
@@ -167,8 +170,10 @@ public class PartitionRequestClient {
                partitionRequestHandler.removeInputChannel(inputChannel);
 
                if (closeReferenceCounter.decrement()) {
-                       // Close the TCP connection
-                       tcpChannel.close();
+                       // Close the TCP connection. Send a close request msg 
to ensure
+                       // that outstanding backwards task events are not 
discarded.
+                       tcpChannel.writeAndFlush(new 
NettyMessage.CloseRequest())
+                                       
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
 
                        // Make sure to remove the client from the factory
                        
clientFactory.destroyPartitionRequestClient(connectionId, this);
@@ -177,4 +182,10 @@ public class PartitionRequestClient {
                        
partitionRequestHandler.cancelRequestFor(inputChannel.getInputChannelId());
                }
        }
+
+       private void checkNotClosed() throws IOException {
+               if (closeReferenceCounter.isDisposed()) {
+                       throw new LocalTransportException("Channel closed.", 
tcpChannel.localAddress());
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0dea359b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 5301195..bb8c851 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -79,6 +79,12 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
                ctx.pipeline().fireUserEventTriggered(receiverId);
        }
 
+       public void close() {
+               if (ctx != null) {
+                       ctx.channel().close();
+               }
+       }
+
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object msg) 
throws Exception {
                if (msg.getClass() == SequenceNumberingSubpartitionView.class) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0dea359b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index 90c93e5..e278d07 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import 
org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionRequest;
+import org.apache.flink.runtime.io.network.netty.NettyMessage.CloseRequest;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
@@ -120,6 +121,9 @@ class PartitionRequestServerHandler extends 
SimpleChannelInboundHandler<NettyMes
 
                                outboundQueue.cancel(request.receiverId);
                        }
+                       else if (msgClazz == CloseRequest.class) {
+                               outboundQueue.close();
+                       }
                        else {
                                LOG.warn("Received unexpected client request: 
{}", msg);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0dea359b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index b1315be..60241e3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -147,6 +147,13 @@ public class NettyMessageSerializationTest {
 
                        assertEquals(expected.receiverId, actual.receiverId);
                }
+
+               {
+                       NettyMessage.CloseRequest expected = new 
NettyMessage.CloseRequest();
+                       NettyMessage.CloseRequest actual = 
encodeAndDecode(expected);
+
+                       assertEquals(expected.getClass(), actual.getClass());
+               }
        }
 
        @SuppressWarnings("unchecked")

Reply via email to