Update client and server to wait to close before returning to avoid spurious allocator test failures.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/92408f19 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/92408f19 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/92408f19 Branch: refs/heads/merge_2014_05_23 Commit: 92408f193043fa3049d114930a97ecf2332e08c0 Parents: 8570365 Author: Jacques Nadeau <jacq...@apache.org> Authored: Thu May 22 20:42:14 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Fri May 23 09:34:03 2014 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/rpc/BasicClient.java | 20 +++++++++++++------- .../org/apache/drill/exec/rpc/BasicServer.java | 7 ++++++- 2 files changed, 19 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/92408f19/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java index 1d9fecf..2a3266a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.rpc; +import java.util.concurrent.ExecutionException; + import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -54,7 +56,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection this.responseClass = responseClass; this.handshakeType = handshakeType; this.handshakeParser = handshakeParser; - + b = new Bootstrap() // .group(eventLoopGroup) // .channel(NioSocketChannel.class) // @@ -69,7 +71,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection // logger.debug("initializing client connection."); connection = initRemoteConnection(ch); ch.closeFuture().addListener(getCloseHandler(connection)); - + ch.pipeline().addLast( // getDecoder(connection.getAllocator()), // new RpcDecoder("c-" + rpcConfig.getName()), // @@ -84,7 +86,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection ; } - + public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator); public boolean isActive(){ @@ -93,7 +95,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection protected abstract void validateHandshake(HANDSHAKE_RESPONSE validateHandshake) throws RpcException; protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R connection); - + protected GenericFutureListener<ChannelFuture> getCloseHandler(Channel channel) { return new ChannelClosedHandler(); } @@ -125,7 +127,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection public ConnectionMultiListener(RpcConnectionHandler<R> l, HANDSHAKE_SEND handshakeValue) { assert l != null; assert handshakeValue != null; - + this.l = l; this.handshakeValue = handshakeValue; } @@ -204,10 +206,14 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection public void setAutoRead(boolean enableAutoRead){ connection.setAutoRead(enableAutoRead); } - + public void close() { logger.debug("Closing client"); - connection.getChannel().close(); + try { + connection.getChannel().close().get(); + } catch (InterruptedException | ExecutionException e) { + logger.warn("Failure whiel shutting {}", this.getClass().getName(), e); + } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/92408f19/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java index 8f533e3..f99365e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java @@ -32,6 +32,7 @@ import io.netty.handler.logging.LoggingHandler; import java.io.IOException; import java.net.BindException; +import java.util.concurrent.ExecutionException; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.memory.BufferAllocator; @@ -162,7 +163,11 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection @Override public void close() throws IOException { - eventLoopGroup.shutdownGracefully(); + try { + eventLoopGroup.shutdownGracefully().get(); + } catch (InterruptedException | ExecutionException e) { + logger.warn("Failure while shutting down {}. ", this.getClass().getName(), e); + } } }