Repository: flink
Updated Branches:
  refs/heads/master 58b9a3772 -> d433ba9f0


[FLINK-2177] [runtime] Fix possible NPE when closing Netty channel, before it 
is active


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d433ba9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d433ba9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d433ba9f

Branch: refs/heads/master
Commit: d433ba9f032e5361ae894562b7a8be13cd3efe13
Parents: 58b9a37
Author: Ufuk Celebi <u...@apache.org>
Authored: Mon Jun 8 07:14:07 2015 +0200
Committer: Ufuk Celebi <u...@apache.org>
Committed: Mon Jun 8 07:14:07 2015 +0200

----------------------------------------------------------------------
 .../netty/PartitionRequestClientHandler.java        |  6 +++++-
 .../netty/PartitionRequestClientHandlerTest.java    | 16 ++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d433ba9f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 508cac9..51b436b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -66,7 +66,7 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
         */
        private final ConcurrentMap<InputChannelID, InputChannelID> cancelled = 
Maps.newConcurrentMap();
 
-       private ChannelHandlerContext ctx;
+       private volatile ChannelHandlerContext ctx;
 
        // 
------------------------------------------------------------------------
        // Input channel/receiver registration
@@ -85,6 +85,10 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
        }
 
        void cancelRequestFor(InputChannelID inputChannelId) {
+               if (inputChannelId == null || ctx == null) {
+                       return;
+               }
+
                if (cancelled.putIfAbsent(inputChannelId, inputChannelId) == 
null) {
                        ctx.writeAndFlush(new 
NettyMessage.CancelPartitionRequest(inputChannelId));
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/d433ba9f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index b8e9f25..2c08cc5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -142,6 +142,22 @@ public class PartitionRequestClientHandlerTest {
                verify(inputChannel, times(1)).onFailedPartitionRequest();
        }
 
+       @Test
+       public void testCancelBeforeActive() throws Exception {
+
+               final RemoteInputChannel inputChannel = 
mock(RemoteInputChannel.class);
+               when(inputChannel.getInputChannelId()).thenReturn(new 
InputChannelID());
+
+               final PartitionRequestClientHandler client = new 
PartitionRequestClientHandler();
+               client.addInputChannel(inputChannel);
+
+               // Don't throw NPE
+               client.cancelRequestFor(null);
+
+               // Don't throw NPE, because channel is not active yet
+               client.cancelRequestFor(inputChannel.getInputChannelId());
+       }
+
        // 
---------------------------------------------------------------------------------------------
 
        /**

Reply via email to