Update client and server to wait to close before returning to avoid spurious 
allocator test failures.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/92408f19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/92408f19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/92408f19

Branch: refs/heads/merge_2014_05_23
Commit: 92408f193043fa3049d114930a97ecf2332e08c0
Parents: 8570365
Author: Jacques Nadeau <jacq...@apache.org>
Authored: Thu May 22 20:42:14 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Fri May 23 09:34:03 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/rpc/BasicClient.java  | 20 +++++++++++++-------
 .../org/apache/drill/exec/rpc/BasicServer.java  |  7 ++++++-
 2 files changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/92408f19/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 1d9fecf..2a3266a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.rpc;
 
+import java.util.concurrent.ExecutionException;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -54,7 +56,7 @@ public abstract class BasicClient<T extends EnumLite, R 
extends RemoteConnection
     this.responseClass = responseClass;
     this.handshakeType = handshakeType;
     this.handshakeParser = handshakeParser;
-    
+
     b = new Bootstrap() //
         .group(eventLoopGroup) //
         .channel(NioSocketChannel.class) //
@@ -69,7 +71,7 @@ public abstract class BasicClient<T extends EnumLite, R 
extends RemoteConnection
 //            logger.debug("initializing client connection.");
             connection = initRemoteConnection(ch);
             ch.closeFuture().addListener(getCloseHandler(connection));
-            
+
             ch.pipeline().addLast( //
                 getDecoder(connection.getAllocator()), //
                 new RpcDecoder("c-" + rpcConfig.getName()), //
@@ -84,7 +86,7 @@ public abstract class BasicClient<T extends EnumLite, R 
extends RemoteConnection
 
     ;
   }
-  
+
   public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator);
 
   public boolean isActive(){
@@ -93,7 +95,7 @@ public abstract class BasicClient<T extends EnumLite, R 
extends RemoteConnection
 
   protected abstract void validateHandshake(HANDSHAKE_RESPONSE 
validateHandshake) throws RpcException;
   protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R 
connection);
-  
+
   protected GenericFutureListener<ChannelFuture> getCloseHandler(Channel 
channel) {
     return new ChannelClosedHandler();
   }
@@ -125,7 +127,7 @@ public abstract class BasicClient<T extends EnumLite, R 
extends RemoteConnection
     public ConnectionMultiListener(RpcConnectionHandler<R> l, HANDSHAKE_SEND 
handshakeValue) {
       assert l != null;
       assert handshakeValue != null;
-          
+
       this.l = l;
       this.handshakeValue = handshakeValue;
     }
@@ -204,10 +206,14 @@ public abstract class BasicClient<T extends EnumLite, R 
extends RemoteConnection
   public void setAutoRead(boolean enableAutoRead){
     connection.setAutoRead(enableAutoRead);
   }
-  
+
   public void close() {
     logger.debug("Closing client");
-    connection.getChannel().close();
+    try {
+      connection.getChannel().close().get();
+    } catch (InterruptedException | ExecutionException e) {
+      logger.warn("Failure whiel shutting {}", this.getClass().getName(), e);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/92408f19/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 8f533e3..f99365e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -32,6 +32,7 @@ import io.netty.handler.logging.LoggingHandler;
 
 import java.io.IOException;
 import java.net.BindException;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -162,7 +163,11 @@ public abstract class BasicServer<T extends EnumLite, C 
extends RemoteConnection
 
   @Override
   public void close() throws IOException {
-    eventLoopGroup.shutdownGracefully();
+    try {
+      eventLoopGroup.shutdownGracefully().get();
+    } catch (InterruptedException | ExecutionException e) {
+      logger.warn("Failure while shutting down {}. ", 
this.getClass().getName(), e);
+    }
   }
 
 }

Reply via email to