Repository: flink
Updated Branches:
  refs/heads/master 7ce42c2e7 -> f79168052


[FLINK-4387] [runtime] Don't wait on termination future on KvStateServer 
shutdown

Due to a bug in Netty that was only fixed in 4.0.33.Final it can
happen that waiting on the termination future never succeeds.

Netty issue: https://github.com/netty/netty/issues/4357


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7916805
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7916805
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7916805

Branch: refs/heads/master
Commit: f79168052c776fe36dc2678d63ffccf715403753
Parents: 7ce42c2
Author: Ufuk Celebi <[email protected]>
Authored: Wed Aug 17 18:04:40 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Wed Aug 17 18:06:21 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/query/netty/KvStateClient.java  | 3 ++-
 .../org/apache/flink/runtime/query/netty/KvStateServer.java  | 8 +-------
 .../apache/flink/runtime/query/netty/KvStateClientTest.java  | 2 --
 3 files changed, 3 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f7916805/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
index 6cfe86b..01093ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
@@ -46,6 +46,7 @@ import java.util.ArrayDeque;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -195,7 +196,7 @@ public class KvStateClient {
                        if (bootstrap != null) {
                                EventLoopGroup group = bootstrap.group();
                                if (group != null) {
-                                       group.shutdownGracefully();
+                                       group.shutdownGracefully(0, 10, 
TimeUnit.SECONDS);
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/f7916805/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
index 0c0c5ec..4787390 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
@@ -29,7 +29,6 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.stream.ChunkedWriteHandler;
-import io.netty.util.concurrent.Future;
 import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServerAddress;
@@ -180,12 +179,7 @@ public class KvStateServer {
                if (bootstrap != null) {
                        EventLoopGroup group = bootstrap.group();
                        if (group != null) {
-                               Future<?> shutDownFuture = 
group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
-                               try {
-                                       shutDownFuture.await();
-                               } catch (InterruptedException e) {
-                                       LOG.error("Interrupted during shut 
down", e);
-                               }
+                               group.shutdownGracefully(0, 10, 
TimeUnit.SECONDS);
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f7916805/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
index 4c42318..ac03f94 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
@@ -44,7 +44,6 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.memory.MemValueState;
 import org.apache.flink.util.NetUtils;
 import org.junit.AfterClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -516,7 +515,6 @@ public class KvStateClientTest {
         * that all ongoing requests are failed.
         */
        @Test
-       @Ignore
        public void testClientServerIntegration() throws Exception {
                // Config
                final int numServers = 2;

Reply via email to