Repository: flink Updated Branches: refs/heads/master 7ce42c2e7 -> f79168052
[FLINK-4387] [runtime] Don't wait on termination future on KvStateServer shutdown Due to a bug in Netty that was only fixed in 4.0.33.Final it can happen that waiting on the termination future never succeeds. Netty issue: https://github.com/netty/netty/issues/4357 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7916805 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7916805 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7916805 Branch: refs/heads/master Commit: f79168052c776fe36dc2678d63ffccf715403753 Parents: 7ce42c2 Author: Ufuk Celebi <[email protected]> Authored: Wed Aug 17 18:04:40 2016 +0200 Committer: Ufuk Celebi <[email protected]> Committed: Wed Aug 17 18:06:21 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/runtime/query/netty/KvStateClient.java | 3 ++- .../org/apache/flink/runtime/query/netty/KvStateServer.java | 8 +------- .../apache/flink/runtime/query/netty/KvStateClientTest.java | 2 -- 3 files changed, 3 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f7916805/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java index 6cfe86b..01093ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java @@ -46,6 +46,7 @@ import java.util.ArrayDeque; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -195,7 +196,7 @@ public class KvStateClient { if (bootstrap != null) { EventLoopGroup group = bootstrap.group(); if (group != null) { - group.shutdownGracefully(); + group.shutdownGracefully(0, 10, TimeUnit.SECONDS); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f7916805/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java index 0c0c5ec..4787390 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java @@ -29,7 +29,6 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.stream.ChunkedWriteHandler; -import io.netty.util.concurrent.Future; import org.apache.flink.runtime.io.network.netty.NettyBufferPool; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.KvStateServerAddress; @@ -180,12 +179,7 @@ public class KvStateServer { if (bootstrap != null) { EventLoopGroup group = bootstrap.group(); if (group != null) { - Future<?> shutDownFuture = group.shutdownGracefully(0, 10, TimeUnit.SECONDS); - try { - shutDownFuture.await(); - } catch (InterruptedException e) { - LOG.error("Interrupted during shut down", e); - } + group.shutdownGracefully(0, 10, TimeUnit.SECONDS); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f7916805/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java index 4c42318..ac03f94 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java @@ -44,7 +44,6 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.memory.MemValueState; import org.apache.flink.util.NetUtils; import org.junit.AfterClass; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -516,7 +515,6 @@ public class KvStateClientTest { * that all ongoing requests are failed. */ @Test - @Ignore public void testClientServerIntegration() throws Exception { // Config final int numServers = 2;
