HBASE-15798 Add Async RpcChannels to all RpcClients

Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a11091c4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a11091c4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a11091c4

Branch: refs/heads/hbase-12439
Commit: a11091c49cdd67c0ad37e8a55e87f430e24da4f1
Parents: 3b74b6f
Author: Jurriaan Mous <jurm...@jurmo.us>
Authored: Sun May 8 10:20:15 2016 +0200
Committer: stack <st...@apache.org>
Committed: Tue May 10 07:44:51 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/AbstractRpcClient.java     |  34 +++-
 .../org/apache/hadoop/hbase/ipc/AsyncCall.java  |  26 ++-
 .../hadoop/hbase/ipc/AsyncRpcChannel.java       |   9 +-
 .../hadoop/hbase/ipc/AsyncRpcChannelImpl.java   |   8 +-
 .../apache/hadoop/hbase/ipc/AsyncRpcClient.java |  66 +++---
 .../hadoop/hbase/ipc/MessageConverter.java      |   2 +-
 .../org/apache/hadoop/hbase/ipc/RpcClient.java  |  33 ++-
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  | 201 +++++++++++++++++-
 .../hadoop/hbase/ipc/AbstractTestIPC.java       | 203 +++++++++++++++----
 .../apache/hadoop/hbase/ipc/TestAsyncIPC.java   | 121 +----------
 .../org/apache/hadoop/hbase/ipc/TestIPC.java    |   7 +-
 11 files changed, 483 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a11091c4/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index c091d1d..71c8875 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -271,6 +271,27 @@ public abstract class AbstractRpcClient implements 
RpcClient {
   }
 
   /**
+   * Configure a payload carrying controller
+   * @param controller to configure
+   * @param channelOperationTimeout timeout for operation
+   * @return configured payload controller
+   */
+  static PayloadCarryingRpcController configurePayloadCarryingRpcController(
+      RpcController controller, int channelOperationTimeout) {
+    PayloadCarryingRpcController pcrc;
+    if (controller != null && controller instanceof 
PayloadCarryingRpcController) {
+      pcrc = (PayloadCarryingRpcController) controller;
+      if (!pcrc.hasCallTimeout()) {
+        pcrc.setCallTimeout(channelOperationTimeout);
+      }
+    } else {
+      pcrc = new PayloadCarryingRpcController();
+      pcrc.setCallTimeout(channelOperationTimeout);
+    }
+    return pcrc;
+  }
+
+  /**
    * Takes an Exception and the address we were trying to connect to and 
return an IOException with
    * the input exception as the cause. The new exception provides the stack 
trace of the place where
    * the exception is thrown and some extra diagnostics information. If the 
exception is
@@ -321,16 +342,9 @@ public abstract class AbstractRpcClient implements 
RpcClient {
     @Override
     public Message callBlockingMethod(Descriptors.MethodDescriptor md, 
RpcController controller,
         Message param, Message returnType) throws ServiceException {
-      PayloadCarryingRpcController pcrc;
-      if (controller != null && controller instanceof 
PayloadCarryingRpcController) {
-        pcrc = (PayloadCarryingRpcController) controller;
-        if (!pcrc.hasCallTimeout()) {
-          pcrc.setCallTimeout(channelOperationTimeout);
-        }
-      } else {
-        pcrc = new PayloadCarryingRpcController();
-        pcrc.setCallTimeout(channelOperationTimeout);
-      }
+      PayloadCarryingRpcController pcrc = 
configurePayloadCarryingRpcController(
+          controller,
+          channelOperationTimeout);
 
       return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, 
this.ticket, this.isa);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a11091c4/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
index 3acf280..89e6ca4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
@@ -51,14 +51,15 @@ public class AsyncCall<M extends Message, T> extends 
Promise<T> {
   final Message responseDefaultType;
 
   private final MessageConverter<M,T> messageConverter;
-  final long startTime;
-  final long rpcTimeout;
   private final IOExceptionConverter exceptionConverter;
 
+  final long rpcTimeout;
+
   // For only the request
   private final CellScanner cellScanner;
   private final int priority;
 
+  final MetricsConnection clientMetrics;
   final MetricsConnection.CallStats callStats;
 
   /**
@@ -71,13 +72,15 @@ public class AsyncCall<M extends Message, T> extends 
Promise<T> {
    * @param cellScanner         cellScanner containing cells to send as request
    * @param responseDefaultType the default response type
    * @param messageConverter    converts the messages to what is the expected 
output
+   * @param exceptionConverter  converts exceptions to expected format. Can be 
null
    * @param rpcTimeout          timeout for this call in ms
    * @param priority            for this request
+   * @param metrics             MetricsConnection to which the metrics are 
stored for this request
    */
   public AsyncCall(AsyncRpcChannelImpl channel, int connectId, 
Descriptors.MethodDescriptor
         md, Message param, CellScanner cellScanner, M responseDefaultType, 
MessageConverter<M, T>
         messageConverter, IOExceptionConverter exceptionConverter, long 
rpcTimeout, int priority,
-      MetricsConnection.CallStats callStats) {
+      MetricsConnection metrics) {
     super(channel.getEventExecutor());
     this.channel = channel;
 
@@ -90,13 +93,15 @@ public class AsyncCall<M extends Message, T> extends 
Promise<T> {
     this.messageConverter = messageConverter;
     this.exceptionConverter = exceptionConverter;
 
-    this.startTime = EnvironmentEdgeManager.currentTime();
     this.rpcTimeout = rpcTimeout;
 
     this.priority = priority;
     this.cellScanner = cellScanner;
 
-    this.callStats = callStats;
+    this.callStats = MetricsConnection.newCallStats();
+    callStats.setStartTime(EnvironmentEdgeManager.currentTime());
+
+    this.clientMetrics = metrics;
   }
 
   /**
@@ -105,7 +110,7 @@ public class AsyncCall<M extends Message, T> extends 
Promise<T> {
    * @return start time for the call
    */
   public long getStartTime() {
-    return this.startTime;
+    return this.callStats.getStartTime();
   }
 
   @Override
@@ -122,9 +127,14 @@ public class AsyncCall<M extends Message, T> extends 
Promise<T> {
    * @param cellBlockScanner to set
    */
   public void setSuccess(M value, CellScanner cellBlockScanner) {
+    callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - 
callStats.getStartTime());
+
     if (LOG.isTraceEnabled()) {
-      long callTime = EnvironmentEdgeManager.currentTime() - startTime;
-      LOG.trace("Call: " + method.getName() + ", callTime: " + callTime + 
"ms");
+      LOG.trace("Call: " + method.getName() + ", callTime: " + 
callStats.getCallTimeMs() + "ms");
+    }
+
+    if (clientMetrics != null) {
+      clientMetrics.updateRpc(method, param, callStats);
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a11091c4/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index bd4be5a..8cc730f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -27,7 +27,6 @@ import java.net.InetSocketAddress;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Future;
-import org.apache.hadoop.hbase.client.MetricsConnection;
 
 /**
  * Interface for Async Rpc Channels
@@ -45,15 +44,13 @@ public interface AsyncRpcChannel {
    * @param exceptionConverter for converting exceptions
    * @param rpcTimeout timeout for request
    * @param priority for request
-   * @param callStats collects stats of the call
    * @return Promise for the response Message
    */
-
   <R extends Message, O> Future<O> callMethod(
       final Descriptors.MethodDescriptor method,
-      final Message request,final CellScanner cellScanner,
-      R responsePrototype, MessageConverter<R, O> messageConverter, 
IOExceptionConverter
-      exceptionConverter, long rpcTimeout, int priority, 
MetricsConnection.CallStats callStats);
+      final Message request, final CellScanner cellScanner,
+      R responsePrototype, MessageConverter<R, O> messageConverter,
+      IOExceptionConverter exceptionConverter, long rpcTimeout, int priority);
 
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/a11091c4/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
index 5af2354..cd61b61 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
@@ -30,7 +30,6 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import io.netty.util.concurrent.GenericFutureListener;
-
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
@@ -52,7 +51,6 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Future;
-import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
@@ -297,14 +295,16 @@ public class AsyncRpcChannelImpl implements 
AsyncRpcChannel {
    * @param priority for request
    * @return Promise for the response Message
    */
+  @Override
   public <R extends Message, O> Future<O> callMethod(
       final Descriptors.MethodDescriptor method,
       final Message request,final CellScanner cellScanner,
       R responsePrototype, MessageConverter<R, O> messageConverter, 
IOExceptionConverter
-      exceptionConverter, long rpcTimeout, int priority, 
MetricsConnection.CallStats callStats) {
+      exceptionConverter, long rpcTimeout, int priority) {
     final AsyncCall<R, O> call = new AsyncCall<>(this, 
client.callIdCnt.getAndIncrement(),
         method, request, cellScanner, responsePrototype, messageConverter, 
exceptionConverter,
-        rpcTimeout, priority, callStats);
+        rpcTimeout, priority, client.metrics);
+
     synchronized (pendingCalls) {
       if (closed) {
         call.setFailure(new ConnectException());

http://git-wip-us.apache.org/repos/asf/hbase/blob/a11091c4/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
index 2fdc1ec..c1ed748 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
@@ -27,6 +27,7 @@ import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollSocketChannel;
@@ -59,7 +60,6 @@ import org.apache.hadoop.hbase.client.Future;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.client.ResponseFutureListener;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.JVM;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PoolMap;
@@ -103,7 +103,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
   @VisibleForTesting
   static Pair<EventLoopGroup, Class<? extends Channel>> 
GLOBAL_EVENT_LOOP_GROUP;
 
-  private synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
+  synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
       getGlobalEventLoopGroup(Configuration conf) {
     if (GLOBAL_EVENT_LOOP_GROUP == null) {
       GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf);
@@ -241,8 +241,8 @@ public class AsyncRpcClient extends AbstractRpcClient {
     final AsyncRpcChannel connection = 
createRpcChannel(md.getService().getName(), addr, ticket);
 
     final Future<Message> promise = connection.callMethod(md, param, 
pcrc.cellScanner(), returnType,
-        getMessageConverterWithRpcController(pcrc), null, 
pcrc.getCallTimeout(), pcrc.getPriority(),
-        callStats);
+        getMessageConverterWithRpcController(pcrc), null, 
pcrc.getCallTimeout(),
+        pcrc.getPriority());
 
     pcrc.notifyOnCancel(new RpcCallback<Object>() {
       @Override
@@ -289,19 +289,11 @@ public class AsyncRpcClient extends AbstractRpcClient {
     final AsyncRpcChannel connection;
     try {
       connection = createRpcChannel(md.getService().getName(), addr, ticket);
-      final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
 
       ResponseFutureListener<Message> listener =
         new ResponseFutureListener<Message>() {
           @Override
           public void operationComplete(Future<Message> future) throws 
Exception {
-            cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - 
cs.getStartTime());
-            if (metrics != null) {
-              metrics.updateRpc(md, param, cs);
-            }
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Call: " + md.getName() + ", callTime: " + 
cs.getCallTimeMs() + "ms");
-            }
             if (!future.isSuccess()) {
               Throwable cause = future.cause();
               if (cause instanceof IOException) {
@@ -325,10 +317,9 @@ public class AsyncRpcClient extends AbstractRpcClient {
             }
           }
         };
-      cs.setStartTime(EnvironmentEdgeManager.currentTime());
       connection.callMethod(md, param, pcrc.cellScanner(), returnType,
           getMessageConverterWithRpcController(pcrc), null,
-          pcrc.getCallTimeout(), pcrc.getPriority(), cs)
+          pcrc.getCallTimeout(), pcrc.getPriority())
           .addListener(listener);
     } catch (StoppedRpcClientException|FailedServerException e) {
       pcrc.setFailed(e);
@@ -360,6 +351,11 @@ public class AsyncRpcClient extends AbstractRpcClient {
     }
   }
 
+  @Override
+  public EventLoop getEventExecutor() {
+    return this.bootstrap.group().next();
+  }
+
   /**
    * Create a cell scanner
    *
@@ -382,10 +378,17 @@ public class AsyncRpcClient extends AbstractRpcClient {
     return ipcUtil.buildCellBlock(this.codec, this.compressor, cells);
   }
 
+  @Override
+  public AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, 
User user)
+      throws StoppedRpcClientException, FailedServerException {
+    return this.createRpcChannel(serviceName,
+        new InetSocketAddress(sn.getHostname(), sn.getPort()), user);
+  }
+
   /**
    * Creates an RPC client
    *
-   * @param serviceName    name of servicce
+   * @param serviceName    name of service
    * @param location       to connect to
    * @param ticket         for current user
    * @return new RpcChannel
@@ -452,6 +455,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
 
   /**
    * Remove connection from pool
+   * @param connection to remove
    */
   public void removeConnection(AsyncRpcChannel connection) {
     int connectionHashCode = connection.hashCode();
@@ -469,17 +473,8 @@ public class AsyncRpcClient extends AbstractRpcClient {
     }
   }
 
-  /**
-   * Creates a "channel" that can be used by a protobuf service.  Useful 
setting up
-   * protobuf stubs.
-   *
-   * @param sn server name describing location of server
-   * @param user which is to use the connection
-   * @param rpcTimeout default rpc operation timeout
-   *
-   * @return A rpc channel that goes via this rpc client instance.
-   */
-  public RpcChannel createRpcChannel(final ServerName sn, final User user, int 
rpcTimeout) {
+  @Override
+  public RpcChannel createProtobufRpcChannel(final ServerName sn, final User 
user, int rpcTimeout) {
     return new RpcChannelImplementation(this, sn, user, rpcTimeout);
   }
 
@@ -507,21 +502,20 @@ public class AsyncRpcClient extends AbstractRpcClient {
     @Override
     public void callMethod(Descriptors.MethodDescriptor md, RpcController 
controller,
         Message param, Message returnType, RpcCallback<Message> done) {
-      PayloadCarryingRpcController pcrc;
-      if (controller != null) {
-        pcrc = (PayloadCarryingRpcController) controller;
-        if (!pcrc.hasCallTimeout()) {
-          pcrc.setCallTimeout(channelOperationTimeout);
-        }
-      } else {
-        pcrc = new PayloadCarryingRpcController();
-        pcrc.setCallTimeout(channelOperationTimeout);
-      }
+      PayloadCarryingRpcController pcrc =
+          configurePayloadCarryingRpcController(controller, 
channelOperationTimeout);
 
       this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, 
this.isa, done);
     }
   }
 
+  /**
+   * Get a new timeout on this RPC client
+   * @param task to run at timeout
+   * @param delay for the timeout
+   * @param unit time unit for the timeout
+   * @return Timeout
+   */
   Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
     return WHEEL_TIMER.newTimeout(task, delay, unit);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a11091c4/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java
index 527ac95..a85225a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java
@@ -41,7 +41,7 @@ public interface MessageConverter<M,O> {
   MessageConverter<Message,Message> NO_CONVERTER = new 
MessageConverter<Message, Message>() {
     @Override
     public Message convert(Message msg, CellScanner cellScanner) throws 
IOException {
-      return null;
+      return msg;
     }
   };
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a11091c4/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
index f77f1ec..9d05c21 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
@@ -18,7 +18,8 @@
 package org.apache.hadoop.hbase.ipc;
 
 import com.google.protobuf.BlockingRpcChannel;
-
+import com.google.protobuf.RpcChannel;
+import io.netty.util.concurrent.EventExecutor;
 import java.io.Closeable;
 import java.io.IOException;
 
@@ -69,6 +70,30 @@ import org.apache.hadoop.hbase.security.User;
       throws IOException;
 
   /**
+   * Create or fetch AsyncRpcChannel
+   * @param serviceName to connect to
+   * @param sn ServerName of the channel to create
+   * @param user for the service
+   * @return An async RPC channel fitting given parameters
+   * @throws FailedServerException if server failed
+   * @throws StoppedRpcClientException if the RPC client has stopped
+   */
+  AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User 
user)
+      throws StoppedRpcClientException, FailedServerException;
+
+  /**
+   * Creates a "channel" that can be used by a protobuf service.  Useful 
setting up
+   * protobuf stubs.
+   *
+   * @param sn server name describing location of server
+   * @param user which is to use the connection
+   * @param rpcTimeout default rpc operation timeout
+   *
+   * @return A rpc channel that goes via this rpc client instance.
+   */
+  RpcChannel createProtobufRpcChannel(final ServerName sn, final User user, 
int rpcTimeout);
+
+  /**
    * Interrupt the connections to the given server. This should be called if 
the server
    * is known as actually dead. This will not prevent current operation to be 
retried, and,
    * depending on their own behavior, they may retry on the same server. This 
can be a feature,
@@ -91,4 +116,10 @@ import org.apache.hadoop.hbase.security.User;
    *         supports cell blocks.
    */
   boolean hasCellBlockSupport();
+
+  /**
+   * Get an event loop to operate on
+   * @return EventLoop
+   */
+  EventExecutor getEventExecutor();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a11091c4/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index 7b2500c..a5d2482 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -22,6 +22,9 @@ import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
 import com.google.protobuf.Message.Builder;
 import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
+import io.netty.util.concurrent.EventExecutor;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -51,7 +54,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import javax.net.SocketFactory;
 import javax.security.sasl.SaslException;
 
@@ -63,6 +65,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Future;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
@@ -118,9 +121,7 @@ public class RpcClientImpl extends AbstractRpcClient {
   protected final SocketFactory socketFactory;           // how to create 
sockets
 
   protected final static Map<AuthenticationProtos.TokenIdentifier.Kind,
-      TokenSelector<? extends TokenIdentifier>> tokenHandlers =
-      new HashMap<AuthenticationProtos.TokenIdentifier.Kind,
-        TokenSelector<? extends TokenIdentifier>>();
+      TokenSelector<? extends TokenIdentifier>> tokenHandlers = new 
HashMap<>();
   static {
     
tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
         new AuthenticationTokenSelector());
@@ -1217,7 +1218,13 @@ public class RpcClientImpl extends AbstractRpcClient {
     }
   }
 
-  /** Make a call, passing <code>param</code>, to the IPC server running at
+  @Override
+  public EventExecutor getEventExecutor() {
+    return AsyncRpcClient.getGlobalEventLoopGroup(this.conf).getFirst().next();
+  }
+
+  /**
+   * Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code> which is servicing the <code>protocol</code> 
protocol,
    * with the <code>ticket</code> credentials, returning the value.
    * Throws exceptions if there are network problems or if the remote code
@@ -1226,7 +1233,7 @@ public class RpcClientImpl extends AbstractRpcClient {
    *          {@link UserProvider#getCurrent()} makes a new instance of User 
each time so will be a
    *          new Connection each time.
    * @return A pair with the Message response and the Cell data (if any).
-   * @throws InterruptedException if the call is interupted
+   * @throws InterruptedException if the call is interrupted
    * @throws IOException if something fails on the connection
    */
   @Override
@@ -1237,10 +1244,35 @@ public class RpcClientImpl extends AbstractRpcClient {
     if (pcrc == null) {
       pcrc = new PayloadCarryingRpcController();
     }
+
+    Call call = this.call(md, param, returnType, pcrc, ticket, addr, 
callStats);
+
+    return new Pair<>(call.response, call.cells);
+  }
+
+
+  /**
+   * Make a call, passing <code>param</code>, to the IPC server running at
+   * <code>address</code> which is servicing the <code>protocol</code> 
protocol,
+   * with the <code>ticket</code> credentials, returning the value.
+   * Throws exceptions if there are network problems or if the remote code
+   * threw an exception.
+   * @param ticket Be careful which ticket you pass. A new user will mean a 
new Connection.
+   *          {@link UserProvider#getCurrent()} makes a new instance of User 
each time so will be a
+   *          new Connection each time.
+   * @return A Call
+   * @throws InterruptedException if the call is interrupted
+   * @throws IOException if something fails on the connection
+   */
+  private <R extends Message> Call call(MethodDescriptor method, Message 
request,
+      R responsePrototype, PayloadCarryingRpcController pcrc, User ticket,
+      InetSocketAddress addr, MetricsConnection.CallStats callStats)
+      throws IOException, InterruptedException {
+
     CellScanner cells = pcrc.cellScanner();
 
-    final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, 
cells, returnType,
-        pcrc.getCallTimeout(), MetricsConnection.newCallStats());
+    final Call call = new Call(callIdCnt.getAndIncrement(), method, request, 
cells,
+        responsePrototype, pcrc.getCallTimeout(), callStats);
 
     final Connection connection = getConnection(ticket, call, addr);
 
@@ -1256,7 +1288,7 @@ public class RpcClientImpl extends AbstractRpcClient {
       if (pcrc.isCanceled()) {
         // To finish if the call was cancelled before we set the notification 
(race condition)
         call.callComplete();
-        return new Pair<>(call.response, call.cells);
+        return call;
       }
     } else {
       cts = null;
@@ -1299,9 +1331,19 @@ public class RpcClientImpl extends AbstractRpcClient {
       throw wrapException(addr, call.error);
     }
 
-    return new Pair<>(call.response, call.cells);
+    return call;
   }
 
+  @Override
+  public org.apache.hadoop.hbase.ipc.AsyncRpcChannel createRpcChannel(String 
serviceName,
+      ServerName sn, User user) throws StoppedRpcClientException, 
FailedServerException {
+    return new AsyncRpcChannel(sn, user);
+  }
+
+  @Override
+  public RpcChannel createProtobufRpcChannel(ServerName sn, User user, int 
rpcTimeout) {
+    return new RpcChannelImplementation(sn, user, rpcTimeout);
+  }
 
   /**
    * Interrupt the connections to the given ip:port server. This should be 
called if the server
@@ -1349,4 +1391,143 @@ public class RpcClientImpl extends AbstractRpcClient {
 
     return connection;
   }
+
+  /**
+   * Simulated async call
+   */
+  private class RpcChannelImplementation implements RpcChannel {
+    private final InetSocketAddress isa;
+    private final User ticket;
+    private final int channelOperationTimeout;
+    private final EventExecutor executor;
+
+    /**
+     * @param channelOperationTimeout - the default timeout when no timeout is 
given
+     */
+    protected RpcChannelImplementation(
+        final ServerName sn, final User ticket, int channelOperationTimeout) {
+      this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
+      this.ticket = ticket;
+      this.channelOperationTimeout = channelOperationTimeout;
+
+      this.executor = RpcClientImpl.this.getEventExecutor();
+    }
+
+    @Override
+    public void callMethod(final MethodDescriptor method, RpcController 
controller,
+        final Message request, final Message responsePrototype, final 
RpcCallback<Message> done) {
+      final PayloadCarryingRpcController pcrc = 
configurePayloadCarryingRpcController(
+          controller,
+          channelOperationTimeout);
+
+      executor.execute(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            final MetricsConnection.CallStats cs = 
MetricsConnection.newCallStats();
+            cs.setStartTime(EnvironmentEdgeManager.currentTime());
+            Call call = call(method, request, responsePrototype, pcrc, ticket, 
isa, cs);
+            cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - 
cs.getStartTime());
+            if (metrics != null) {
+              metrics.updateRpc(method, request, cs);
+            }
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Call: " + method.getName() + ", callTime: " + 
cs.getCallTimeMs() + "ms");
+            }
+
+            done.run(call.response);
+          } catch (IOException e) {
+            pcrc.setFailed(e);
+          } catch (InterruptedException e) {
+            pcrc.startCancel();
+          }
+        }
+      });
+    }
+  }
+
+  /**
+   * Wraps the call in an async channel.
+   */
+  private class AsyncRpcChannel implements 
org.apache.hadoop.hbase.ipc.AsyncRpcChannel {
+    private final EventExecutor executor;
+    private final InetSocketAddress isa;
+
+    private final User ticket;
+
+    /**
+     * Constructor
+     * @param sn servername to connect to
+     * @param user to connect with
+     */
+    public AsyncRpcChannel(ServerName sn, User user) {
+      this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
+      this.executor = RpcClientImpl.this.getEventExecutor();
+      this.ticket = user;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <R extends Message, O> Future<O> callMethod(final MethodDescriptor 
method,
+        final Message request, CellScanner cellScanner, final R 
responsePrototype,
+        final MessageConverter<R, O> messageConverter,
+        final IOExceptionConverter exceptionConverter, long rpcTimeout, int 
priority) {
+      final PayloadCarryingRpcController pcrc = new 
PayloadCarryingRpcController(cellScanner);
+      pcrc.setPriority(priority);
+      pcrc.setCallTimeout((int) rpcTimeout);
+
+      final Promise<O> promise = new Promise<>(executor);
+
+      executor.execute(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            final MetricsConnection.CallStats cs = 
MetricsConnection.newCallStats();
+            cs.setStartTime(EnvironmentEdgeManager.currentTime());
+            Call call = call(method, request, responsePrototype, pcrc, ticket, 
isa, cs);
+            cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - 
cs.getStartTime());
+            if (metrics != null) {
+              metrics.updateRpc(method, request, cs);
+            }
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Call: " + method.getName() + ", callTime: " + 
cs.getCallTimeMs() + "ms");
+            }
+
+            promise.setSuccess(
+                messageConverter.convert((R) call.response, call.cells)
+            );
+          } catch (InterruptedException e) {
+            promise.cancel(true);
+          } catch (IOException e) {
+            if(exceptionConverter != null) {
+              e = exceptionConverter.convert(e);
+            }
+            promise.setFailure(e);
+          }
+        }
+      });
+
+      return promise;
+    }
+
+    @Override
+    public EventExecutor getEventExecutor() {
+      return this.executor;
+    }
+
+    @Override
+    public void close(Throwable cause) {
+      this.executor.shutdownGracefully();
+    }
+
+    @Override
+    public boolean isAlive() {
+      return !this.executor.isShuttingDown() && !this.executor.isShutdown();
+    }
+
+    @Override
+    public InetSocketAddress getAddress() {
+      return isa;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a11091c4/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 69c8fe2..ceb945b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.ipc;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyObject;
@@ -25,6 +26,15 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.internal.verification.VerificationModeFactory.times;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetAddress;
@@ -32,7 +42,9 @@ import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.List;
-
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,9 +52,12 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.Future;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
@@ -60,15 +75,6 @@ import org.apache.hadoop.util.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-
 /**
  * Some basic ipc tests.
  */
@@ -76,8 +82,10 @@ public abstract class AbstractTestIPC {
 
   private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class);
 
-  private static byte[] CELL_BYTES = Bytes.toBytes("xyz");
-  private static KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, 
CELL_BYTES, CELL_BYTES);
+  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
+  private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, 
CELL_BYTES, CELL_BYTES);
   static byte[] BIG_CELL_BYTES = new byte[10 * 1024];
   static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, 
BIG_CELL_BYTES);
   static final Configuration CONF = HBaseConfiguration.create();
@@ -114,7 +122,7 @@ public abstract class AbstractTestIPC {
                 CellScanner cellScanner = pcrc.cellScanner();
                 List<Cell> list = null;
                 if (cellScanner != null) {
-                  list = new ArrayList<Cell>();
+                  list = new ArrayList<>();
                   try {
                     while (cellScanner.advance()) {
                       list.add(cellScanner.current());
@@ -168,9 +176,8 @@ public abstract class AbstractTestIPC {
   @Test
   public void testNoCodec() throws InterruptedException, IOException {
     Configuration conf = HBaseConfiguration.create();
-    AbstractRpcClient client = createRpcClientNoCodec(conf);
     TestRpcServer rpcServer = new TestRpcServer();
-    try {
+    try (AbstractRpcClient client = createRpcClientNoCodec(conf)) {
       rpcServer.start();
       MethodDescriptor md = 
SERVICE.getDescriptorForType().findMethodByName("echo");
       final String message = "hello";
@@ -186,7 +193,6 @@ public abstract class AbstractTestIPC {
       // Silly assertion that the message is in the returned pb.
       assertTrue(r.getFirst().toString().contains(message));
     } finally {
-      client.close();
       rpcServer.stop();
     }
   }
@@ -207,14 +213,13 @@ public abstract class AbstractTestIPC {
       NoSuchMethodException, ServiceException {
     Configuration conf = new Configuration(HBaseConfiguration.create());
     conf.set("hbase.client.rpc.compressor", 
GzipCodec.class.getCanonicalName());
-    List<Cell> cells = new ArrayList<Cell>();
+    List<Cell> cells = new ArrayList<>();
     int count = 3;
     for (int i = 0; i < count; i++) {
       cells.add(CELL);
     }
-    AbstractRpcClient client = createRpcClient(conf);
     TestRpcServer rpcServer = new TestRpcServer();
-    try {
+    try (AbstractRpcClient client = createRpcClient(conf)) {
       rpcServer.start();
       MethodDescriptor md = 
SERVICE.getDescriptorForType().findMethodByName("echo");
       EchoRequestProto param = 
EchoRequestProto.newBuilder().setMessage("hello").build();
@@ -234,7 +239,6 @@ public abstract class AbstractTestIPC {
       }
       assertEquals(count, index);
     } finally {
-      client.close();
       rpcServer.stop();
     }
   }
@@ -246,8 +250,7 @@ public abstract class AbstractTestIPC {
   public void testRTEDuringConnectionSetup() throws Exception {
     Configuration conf = HBaseConfiguration.create();
     TestRpcServer rpcServer = new TestRpcServer();
-    AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf);
-    try {
+    try (AbstractRpcClient client = 
createRpcClientRTEDuringConnectionSetup(conf)) {
       rpcServer.start();
       MethodDescriptor md = 
SERVICE.getDescriptorForType().findMethodByName("echo");
       EchoRequestProto param = 
EchoRequestProto.newBuilder().setMessage("hello").build();
@@ -262,7 +265,6 @@ public abstract class AbstractTestIPC {
       LOG.info("Caught expected exception: " + e.toString());
       assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
     } finally {
-      client.close();
       rpcServer.stop();
     }
   }
@@ -332,7 +334,7 @@ public abstract class AbstractTestIPC {
    */
   static class TestRpcServer1 extends RpcServer {
 
-    private static BlockingInterface SERVICE1 =
+    private static final BlockingInterface SERVICE1 =
         new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
           @Override
           public EmptyResponseProto ping(RpcController unused, 
EmptyRequestProto request)
@@ -378,26 +380,22 @@ public abstract class AbstractTestIPC {
     final RpcScheduler scheduler = new FifoRpcScheduler(CONF, 1);
     final TestRpcServer1 rpcServer = new TestRpcServer1(scheduler);
     final InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
-    final AbstractRpcClient client =
-        new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT, localAddr, 
null);
-    try {
+    try (AbstractRpcClient client = new RpcClientImpl(CONF, 
HConstants.CLUSTER_ID_DEFAULT,
+        localAddr, null)) {
       rpcServer.start();
       final InetSocketAddress isa = rpcServer.getListenerAddress();
       if (isa == null) {
         throw new IOException("Listener channel is closed");
       }
-      final BlockingRpcChannel channel =
-          client.createBlockingRpcChannel(
-            ServerName.valueOf(isa.getHostName(), isa.getPort(), 
System.currentTimeMillis()),
-            User.getCurrent(), 0);
-      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
-          TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
+      final BlockingRpcChannel channel = client.createBlockingRpcChannel(
+          ServerName.valueOf(isa.getHostName(), isa.getPort(), 
System.currentTimeMillis()),
+          User.getCurrent(), 0);
+      BlockingInterface stub = 
TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
       final EchoRequestProto echoRequest =
           EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build();
       final EchoResponseProto echoResponse = stub.echo(null, echoRequest);
       Assert.assertEquals(localAddr.getAddress().getHostAddress(), 
echoResponse.getMessage());
     } finally {
-      client.close();
       rpcServer.stop();
     }
   }
@@ -416,4 +414,141 @@ public abstract class AbstractTestIPC {
         .wrapException(address, new CallTimeoutException("Test 
AbstractRpcClient#wrapException"))
         .getCause() instanceof CallTimeoutException);
   }
+
+  @Test
+  public void testAsyncProtobufConnectionSetup() throws Exception {
+    TestRpcServer rpcServer = new TestRpcServer();
+    try (RpcClient client = createRpcClient(CONF)) {
+      rpcServer.start();
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
+      MethodDescriptor md = 
SERVICE.getDescriptorForType().findMethodByName("echo");
+      EchoRequestProto param = 
EchoRequestProto.newBuilder().setMessage("hello").build();
+
+      RpcChannel channel = client.createProtobufRpcChannel(
+          ServerName.valueOf(address.getHostName(), address.getPort(), 
System.currentTimeMillis()),
+          User.getCurrent(), 0);
+
+      final AtomicBoolean done = new AtomicBoolean(false);
+
+      channel
+          .callMethod(md, new PayloadCarryingRpcController(), param, 
md.getOutputType().toProto(),
+              new com.google.protobuf.RpcCallback<Message>() {
+                @Override
+                public void run(Message parameter) {
+                  done.set(true);
+                }
+              });
+
+      TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return done.get();
+        }
+      });
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
+  @Test
+  public void testRTEDuringAsyncProtobufConnectionSetup() throws Exception {
+    TestRpcServer rpcServer = new TestRpcServer();
+    try (RpcClient client = createRpcClientRTEDuringConnectionSetup(CONF)) {
+      rpcServer.start();
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
+      MethodDescriptor md = 
SERVICE.getDescriptorForType().findMethodByName("echo");
+      EchoRequestProto param = 
EchoRequestProto.newBuilder().setMessage("hello").build();
+
+      RpcChannel channel = client.createProtobufRpcChannel(
+          ServerName.valueOf(address.getHostName(), address.getPort(), 
System.currentTimeMillis()),
+          User.getCurrent(), 0);
+
+      final AtomicBoolean done = new AtomicBoolean(false);
+
+      PayloadCarryingRpcController controller = new 
PayloadCarryingRpcController();
+      controller.notifyOnFail(new 
com.google.protobuf.RpcCallback<IOException>() {
+        @Override
+        public void run(IOException e) {
+          done.set(true);
+          LOG.info("Caught expected exception: " + e.toString());
+          assertTrue(StringUtils.stringifyException(e).contains("Injected 
fault"));
+        }
+      });
+
+      channel.callMethod(md, controller, param, md.getOutputType().toProto(),
+          new com.google.protobuf.RpcCallback<Message>() {
+            @Override
+            public void run(Message parameter) {
+              done.set(true);
+              fail("Expected an exception to have been thrown!");
+            }
+          });
+
+      TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return done.get();
+        }
+      });
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
+  @Test
+  public void testAsyncConnectionSetup() throws Exception {
+    TestRpcServer rpcServer = new TestRpcServer();
+    try (RpcClient client = createRpcClient(CONF)) {
+      rpcServer.start();
+      Message msg = setupAsyncConnection(rpcServer, client);
+
+      assertNotNull(msg);
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
+  @Test
+  public void testRTEDuringAsyncConnectionSetup() throws Exception {
+    TestRpcServer rpcServer = new TestRpcServer();
+    try (RpcClient client = createRpcClientRTEDuringConnectionSetup(CONF)) {
+      rpcServer.start();
+      setupAsyncConnection(rpcServer, client);
+
+      fail("Expected an exception to have been thrown!");
+    } catch (ExecutionException e) {
+      assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
+  private Message setupAsyncConnection(TestRpcServer rpcServer, RpcClient 
client)
+      throws IOException, InterruptedException, ExecutionException,
+      java.util.concurrent.TimeoutException {
+    InetSocketAddress address = rpcServer.getListenerAddress();
+    if (address == null) {
+      throw new IOException("Listener channel is closed");
+    }
+    MethodDescriptor md = 
SERVICE.getDescriptorForType().findMethodByName("echo");
+    EchoRequestProto param = 
EchoRequestProto.newBuilder().setMessage("hello").build();
+
+    ServerName serverName =
+        ServerName.valueOf(address.getHostName(), address.getPort(), 
System.currentTimeMillis());
+
+    AsyncRpcChannel channel =
+        client.createRpcChannel(md.getService().getName(), serverName, 
User.getCurrent());
+
+    final Future<Message> f = channel
+        .callMethod(md, param, null, md.getOutputType().toProto(), 
MessageConverter.NO_CONVERTER,
+            null, 1000, HConstants.NORMAL_QOS);
+
+    return f.get(1, TimeUnit.SECONDS);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a11091c4/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
index b9d390a..7efe198 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
@@ -17,8 +17,8 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors.MethodDescriptor;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
@@ -32,7 +32,6 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,12 +39,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RowMutations;
@@ -60,30 +56,20 @@ import 
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Re
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.util.StringUtils;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-
 @RunWith(Parameterized.class)
 @Category({ RPCTests.class, SmallTests.class })
 public class TestAsyncIPC extends AbstractTestIPC {
 
   private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
 
-  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
-
   @Parameters
   public static Collection<Object[]> parameters() {
-    List<Object[]> paramList = new ArrayList<Object[]>();
+    List<Object[]> paramList = new ArrayList<>();
     paramList.add(new Object[] { false, false });
     paramList.add(new Object[] { false, true });
     paramList.add(new Object[] { true, false });
@@ -150,95 +136,6 @@ public class TestAsyncIPC extends AbstractTestIPC {
         });
   }
 
-  @Test
-  public void testAsyncConnectionSetup() throws Exception {
-    TestRpcServer rpcServer = new TestRpcServer();
-    AsyncRpcClient client = createRpcClient(CONF);
-    try {
-      rpcServer.start();
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      MethodDescriptor md = 
SERVICE.getDescriptorForType().findMethodByName("echo");
-      EchoRequestProto param = 
EchoRequestProto.newBuilder().setMessage("hello").build();
-
-      RpcChannel channel =
-          client.createRpcChannel(ServerName.valueOf(address.getHostName(), 
address.getPort(),
-            System.currentTimeMillis()), User.getCurrent(), 0);
-
-      final AtomicBoolean done = new AtomicBoolean(false);
-
-      channel.callMethod(md, new PayloadCarryingRpcController(), param, 
md.getOutputType()
-          .toProto(), new RpcCallback<Message>() {
-        @Override
-        public void run(Message parameter) {
-          done.set(true);
-        }
-      });
-
-      TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return done.get();
-        }
-      });
-    } finally {
-      client.close();
-      rpcServer.stop();
-    }
-  }
-
-  @Test
-  public void testRTEDuringAsyncConnectionSetup() throws Exception {
-    TestRpcServer rpcServer = new TestRpcServer();
-    AsyncRpcClient client = createRpcClientRTEDuringConnectionSetup(CONF);
-    try {
-      rpcServer.start();
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      MethodDescriptor md = 
SERVICE.getDescriptorForType().findMethodByName("echo");
-      EchoRequestProto param = 
EchoRequestProto.newBuilder().setMessage("hello").build();
-
-      RpcChannel channel =
-          client.createRpcChannel(ServerName.valueOf(address.getHostName(), 
address.getPort(),
-            System.currentTimeMillis()), User.getCurrent(), 0);
-
-      final AtomicBoolean done = new AtomicBoolean(false);
-
-      PayloadCarryingRpcController controller = new 
PayloadCarryingRpcController();
-      controller.notifyOnFail(new RpcCallback<IOException>() {
-        @Override
-        public void run(IOException e) {
-          done.set(true);
-          LOG.info("Caught expected exception: " + e.toString());
-          assertTrue(StringUtils.stringifyException(e).contains("Injected 
fault"));
-        }
-      });
-
-      channel.callMethod(md, controller, param, md.getOutputType().toProto(),
-        new RpcCallback<Message>() {
-          @Override
-          public void run(Message parameter) {
-            done.set(true);
-            fail("Expected an exception to have been thrown!");
-          }
-        });
-
-      TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return done.get();
-        }
-      });
-    } finally {
-      client.close();
-      rpcServer.stop();
-    }
-  }
-
   public static void main(String[] args) throws IOException, SecurityException,
       NoSuchMethodException, InterruptedException {
     if (args.length != 2) {
@@ -253,7 +150,6 @@ public class TestAsyncIPC extends AbstractTestIPC {
     TestRpcServer rpcServer = new TestRpcServer();
     MethodDescriptor md = 
SERVICE.getDescriptorForType().findMethodByName("echo");
     EchoRequestProto param = 
EchoRequestProto.newBuilder().setMessage("hello").build();
-    AsyncRpcClient client = new AsyncRpcClient(conf);
     KeyValue kv = BIG_CELL;
     Put p = new Put(CellUtil.cloneRow(kv));
     for (int i = 0; i < cellcount; i++) {
@@ -261,7 +157,7 @@ public class TestAsyncIPC extends AbstractTestIPC {
     }
     RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
     rm.add(p);
-    try {
+    try (AsyncRpcClient client = new AsyncRpcClient(conf)) {
       rpcServer.start();
       InetSocketAddress address = rpcServer.getListenerAddress();
       if (address == null) {
@@ -270,17 +166,17 @@ public class TestAsyncIPC extends AbstractTestIPC {
       long startTime = System.currentTimeMillis();
       User user = User.getCurrent();
       for (int i = 0; i < cycles; i++) {
-        List<CellScannable> cells = new ArrayList<CellScannable>();
+        List<CellScannable> cells = new ArrayList<>();
         // Message param = 
RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
         ClientProtos.RegionAction.Builder builder =
             
RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
-              RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
-              MutationProto.newBuilder());
+                RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
+                MutationProto.newBuilder());
         builder.setRegion(RegionSpecifier
             .newBuilder()
             .setType(RegionSpecifierType.REGION_NAME)
             .setValue(
-              
ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
+                
ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
         if (i % 100000 == 0) {
           LOG.info("" + i);
           // Uncomment this for a thread dump every so often.
@@ -300,7 +196,6 @@ public class TestAsyncIPC extends AbstractTestIPC {
       LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) 
in "
           + (System.currentTimeMillis() - startTime) + "ms");
     } finally {
-      client.close();
       rpcServer.stop();
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a11091c4/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
index 3fc1259..56de07d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
@@ -22,6 +22,8 @@ import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors.MethodDescriptor;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -59,9 +61,6 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-
 @Category({ RPCTests.class, SmallTests.class })
 public class TestIPC extends AbstractTestIPC {
 
@@ -129,7 +128,7 @@ public class TestIPC extends AbstractTestIPC {
         throw new IOException("Listener channel is closed");
       }
       for (int i = 0; i < cycles; i++) {
-        List<CellScannable> cells = new ArrayList<CellScannable>();
+        List<CellScannable> cells = new ArrayList<>();
         // Message param = 
RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
         ClientProtos.RegionAction.Builder builder =
             
RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,

Reply via email to