Repository: hbase
Updated Branches:
  refs/heads/master ac31ceb83 -> fa033b6a0


HBASE-15793 Port over AsyncCall improvements
Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fa033b6a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fa033b6a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fa033b6a

Branch: refs/heads/master
Commit: fa033b6a08020282004ec87353f743fc205140ef
Parents: ac31ceb
Author: Jurriaan Mous <jurm...@jurmo.us>
Authored: Sat May 7 12:46:58 2016 +0200
Committer: stack <st...@apache.org>
Committed: Sat May 7 10:38:38 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/Future.java  |  31 ++++++
 .../hbase/client/ResponseFutureListener.java    |  30 ++++++
 .../org/apache/hadoop/hbase/ipc/AsyncCall.java  |  87 ++++++++++++----
 .../hadoop/hbase/ipc/AsyncRpcChannel.java       |  21 +++-
 .../hadoop/hbase/ipc/AsyncRpcChannelImpl.java   |  50 ++++-----
 .../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 101 ++++++++++++-------
 .../hadoop/hbase/ipc/IOExceptionConverter.java  |  34 +++++++
 .../hadoop/hbase/ipc/MessageConverter.java      |  47 +++++++++
 .../org/apache/hadoop/hbase/ipc/Promise.java    |  38 +++++++
 9 files changed, 346 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java
new file mode 100644
index 0000000..1247fd4
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Promise for responses
+ * @param <V> Value type
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface Future<V> extends io.netty.util.concurrent.Future<V> {
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java
new file mode 100644
index 0000000..f23dc8f
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Specific interface for the Response future listener
+ * @param <V> Value type.
+ */
+@InterfaceAudience.Private
+public interface ResponseFutureListener<V>
+    extends GenericFutureListener<Future<V>> {
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
index a5da0dc..3acf280 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
@@ -19,8 +19,7 @@ package org.apache.hadoop.hbase.ipc;
 
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
-import io.netty.channel.EventLoop;
-import io.netty.util.concurrent.DefaultPromise;
+import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.CellScanner;
@@ -31,51 +30,72 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.ipc.RemoteException;
 
-import java.io.IOException;
-
 /**
  * Represents an Async Hbase call and its response.
  *
  * Responses are passed on to its given doneHandler and failures to the 
rpcController
+ *
+ * @param <T> Type of message returned
+ * @param <M> Message returned in communication to be converted
  */
 @InterfaceAudience.Private
-public class AsyncCall extends DefaultPromise<Message> {
+public class AsyncCall<M extends Message, T> extends Promise<T> {
   private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName());
 
   final int id;
 
+  private final AsyncRpcChannelImpl channel;
+
   final Descriptors.MethodDescriptor method;
   final Message param;
-  final PayloadCarryingRpcController controller;
   final Message responseDefaultType;
+
+  private final MessageConverter<M,T> messageConverter;
   final long startTime;
   final long rpcTimeout;
+  private final IOExceptionConverter exceptionConverter;
+
+  // For only the request
+  private final CellScanner cellScanner;
+  private final int priority;
+
   final MetricsConnection.CallStats callStats;
 
   /**
    * Constructor
    *
-   * @param eventLoop           for call
+   * @param channel             which initiated call
    * @param connectId           connection id
    * @param md                  the method descriptor
    * @param param               parameters to send to Server
-   * @param controller          controller for response
+   * @param cellScanner         cellScanner containing cells to send as request
    * @param responseDefaultType the default response type
+   * @param messageConverter    converts the messages to what is the expected 
output
+   * @param rpcTimeout          timeout for this call in ms
+   * @param priority            for this request
    */
-  public AsyncCall(EventLoop eventLoop, int connectId, 
Descriptors.MethodDescriptor md, Message
-      param, PayloadCarryingRpcController controller, Message 
responseDefaultType,
+  public AsyncCall(AsyncRpcChannelImpl channel, int connectId, 
Descriptors.MethodDescriptor
+        md, Message param, CellScanner cellScanner, M responseDefaultType, 
MessageConverter<M, T>
+        messageConverter, IOExceptionConverter exceptionConverter, long 
rpcTimeout, int priority,
       MetricsConnection.CallStats callStats) {
-    super(eventLoop);
+    super(channel.getEventExecutor());
+    this.channel = channel;
 
     this.id = connectId;
 
     this.method = md;
     this.param = param;
-    this.controller = controller;
     this.responseDefaultType = responseDefaultType;
 
+    this.messageConverter = messageConverter;
+    this.exceptionConverter = exceptionConverter;
+
     this.startTime = EnvironmentEdgeManager.currentTime();
-    this.rpcTimeout = controller.hasCallTimeout() ? 
controller.getCallTimeout() : 0;
+    this.rpcTimeout = rpcTimeout;
+
+    this.priority = priority;
+    this.cellScanner = cellScanner;
+
     this.callStats = callStats;
   }
 
@@ -101,17 +121,19 @@ public class AsyncCall extends DefaultPromise<Message> {
    * @param value            to set
    * @param cellBlockScanner to set
    */
-  public void setSuccess(Message value, CellScanner cellBlockScanner) {
-    if (cellBlockScanner != null) {
-      controller.setCellScanner(cellBlockScanner);
-    }
-
+  public void setSuccess(M value, CellScanner cellBlockScanner) {
     if (LOG.isTraceEnabled()) {
       long callTime = EnvironmentEdgeManager.currentTime() - startTime;
       LOG.trace("Call: " + method.getName() + ", callTime: " + callTime + 
"ms");
     }
 
-    this.setSuccess(value);
+    try {
+      this.setSuccess(
+          this.messageConverter.convert(value, cellBlockScanner)
+      );
+    } catch (IOException e) {
+      this.setFailed(e);
+    }
   }
 
   /**
@@ -127,6 +149,10 @@ public class AsyncCall extends DefaultPromise<Message> {
       exception = ((RemoteException) exception).unwrapRemoteException();
     }
 
+    if (this.exceptionConverter != null) {
+      exception = this.exceptionConverter.convert(exception);
+    }
+
     this.setFailure(exception);
   }
 
@@ -138,4 +164,27 @@ public class AsyncCall extends DefaultPromise<Message> {
   public long getRpcTimeout() {
     return rpcTimeout;
   }
+
+
+  /**
+   * @return Priority for this call
+   */
+  public int getPriority() {
+    return priority;
+  }
+
+  /**
+   * Get the cellScanner for this request.
+   * @return CellScanner
+   */
+  public CellScanner cellScanner() {
+    return cellScanner;
+  }
+
+  @Override
+  public boolean cancel(boolean mayInterupt){
+    this.channel.removePendingCall(this.id);
+    return super.cancel(mayInterupt);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index 60dc5e4..bd4be5a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -21,11 +21,12 @@ import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
 
 import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Promise;
 
 import java.net.InetSocketAddress;
 
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Future;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 
 /**
@@ -37,13 +38,23 @@ public interface AsyncRpcChannel {
   /**
    * Calls method on channel
    * @param method to call
-   * @param controller to run call with
    * @param request to send
+   * @param cellScanner with cells to send
    * @param responsePrototype to construct response with
+   * @param messageConverter for the messages to expected result
+   * @param exceptionConverter for converting exceptions
+   * @param rpcTimeout timeout for request
+   * @param priority for request
+   * @param callStats collects stats of the call
+   * @return Promise for the response Message
    */
-  Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
-      final PayloadCarryingRpcController controller, final Message request,
-      final Message responsePrototype, MetricsConnection.CallStats callStats);
+
+  <R extends Message, O> Future<O> callMethod(
+      final Descriptors.MethodDescriptor method,
+      final Message request,final CellScanner cellScanner,
+      R responsePrototype, MessageConverter<R, O> messageConverter, 
IOExceptionConverter
+      exceptionConverter, long rpcTimeout, int priority, 
MetricsConnection.CallStats callStats);
+
 
   /**
    * Get the EventLoop on which this channel operated

http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
index 7cc9e78..5af2354 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.ipc;
 
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufOutputStream;
@@ -32,7 +30,6 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
 
 import java.io.IOException;
 import java.net.ConnectException;
@@ -51,8 +48,10 @@ import javax.security.sasl.SaslException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Future;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -291,36 +290,25 @@ public class AsyncRpcChannelImpl implements 
AsyncRpcChannel {
   /**
    * Calls method on channel
    * @param method to call
-   * @param controller to run call with
    * @param request to send
+   * @param cellScanner with cells to send
    * @param responsePrototype to construct response with
+   * @param rpcTimeout timeout for request
+   * @param priority for request
+   * @return Promise for the response Message
    */
-  public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
-      final PayloadCarryingRpcController controller, final Message request,
-      final Message responsePrototype, MetricsConnection.CallStats callStats) {
-    final AsyncCall call = new AsyncCall(channel.eventLoop(), 
client.callIdCnt.getAndIncrement(),
-        method, request, controller, responsePrototype, callStats);
-    controller.notifyOnCancel(new RpcCallback<Object>() {
-      @Override
-      public void run(Object parameter) {
-        // TODO: do not need to call AsyncCall.setFailed?
-        synchronized (pendingCalls) {
-          pendingCalls.remove(call.id);
-        }
-      }
-    });
-    // TODO: this should be handled by PayloadCarryingRpcController.
-    if (controller.isCanceled()) {
-      // To finish if the call was cancelled before we set the notification 
(race condition)
-      call.cancel(true);
-      return call;
-    }
-
+  public <R extends Message, O> Future<O> callMethod(
+      final Descriptors.MethodDescriptor method,
+      final Message request,final CellScanner cellScanner,
+      R responsePrototype, MessageConverter<R, O> messageConverter, 
IOExceptionConverter
+      exceptionConverter, long rpcTimeout, int priority, 
MetricsConnection.CallStats callStats) {
+    final AsyncCall<R, O> call = new AsyncCall<>(this, 
client.callIdCnt.getAndIncrement(),
+        method, request, cellScanner, responsePrototype, messageConverter, 
exceptionConverter,
+        rpcTimeout, priority, callStats);
     synchronized (pendingCalls) {
       if (closed) {
-        Promise<Message> promise = channel.eventLoop().newPromise();
-        promise.setFailure(new ConnectException());
-        return promise;
+        call.setFailure(new ConnectException());
+        return call;
       }
       pendingCalls.put(call.id, call);
       // Add timeout for cleanup if none is present
@@ -398,7 +386,7 @@ public class AsyncRpcChannelImpl implements AsyncRpcChannel 
{
             .setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
       }
 
-      ByteBuffer cellBlock = 
client.buildCellBlock(call.controller.cellScanner());
+      ByteBuffer cellBlock = client.buildCellBlock(call.cellScanner());
       if (cellBlock != null) {
         final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = 
RPCProtos.CellBlockMeta
             .newBuilder();
@@ -406,8 +394,8 @@ public class AsyncRpcChannelImpl implements AsyncRpcChannel 
{
         requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
       }
       // Only pass priority if there one. Let zero be same as no priority.
-      if (call.controller.getPriority() != 
PayloadCarryingRpcController.PRIORITY_UNSET) {
-        requestHeaderBuilder.setPriority(call.controller.getPriority());
+      if (call.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
+        requestHeaderBuilder.setPriority(call.getPriority());
       }
 
       RPCProtos.RequestHeader rh = requestHeaderBuilder.build();

http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
index 8d9a5b3..2fdc1ec 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
@@ -17,6 +17,12 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
@@ -30,9 +36,6 @@ import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -52,7 +55,9 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Future;
 import org.apache.hadoop.hbase.client.MetricsConnection;
+import org.apache.hadoop.hbase.client.ResponseFutureListener;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.JVM;
@@ -60,13 +65,6 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PoolMap;
 import org.apache.hadoop.hbase.util.Threads;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-
 /**
  * Netty client for the requests and responses
  */
@@ -242,7 +240,18 @@ public class AsyncRpcClient extends AbstractRpcClient {
     }
     final AsyncRpcChannel connection = 
createRpcChannel(md.getService().getName(), addr, ticket);
 
-    Promise<Message> promise = connection.callMethod(md, pcrc, param, 
returnType, callStats);
+    final Future<Message> promise = connection.callMethod(md, param, 
pcrc.cellScanner(), returnType,
+        getMessageConverterWithRpcController(pcrc), null, 
pcrc.getCallTimeout(), pcrc.getPriority(),
+        callStats);
+
+    pcrc.notifyOnCancel(new RpcCallback<Object>() {
+      @Override
+      public void run(Object parameter) {
+        // Will automatically fail the promise with CancellationException
+        promise.cancel(true);
+      }
+    });
+
     long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
     try {
       Message response = timeout > 0 ? promise.get(timeout, 
TimeUnit.MILLISECONDS) : promise.get();
@@ -259,6 +268,18 @@ public class AsyncRpcClient extends AbstractRpcClient {
     }
   }
 
+  private MessageConverter<Message, Message> 
getMessageConverterWithRpcController(
+      final PayloadCarryingRpcController pcrc) {
+    return new
+      MessageConverter<Message, Message>() {
+        @Override
+        public Message convert(Message msg, CellScanner cellScanner) {
+          pcrc.setCellScanner(cellScanner);
+          return msg;
+        }
+      };
+  }
+
   /**
    * Call method async
    */
@@ -269,42 +290,46 @@ public class AsyncRpcClient extends AbstractRpcClient {
     try {
       connection = createRpcChannel(md.getService().getName(), addr, ticket);
       final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
-      GenericFutureListener<Future<Message>> listener =
-          new GenericFutureListener<Future<Message>>() {
-            @Override
-            public void operationComplete(Future<Message> future) throws 
Exception {
-              cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - 
cs.getStartTime());
-              if (metrics != null) {
-                metrics.updateRpc(md, param, cs);
-              }
-              if (LOG.isTraceEnabled()) {
-                LOG.trace("Call: " + md.getName() + ", callTime: " + 
cs.getCallTimeMs() + "ms");
+
+      ResponseFutureListener<Message> listener =
+        new ResponseFutureListener<Message>() {
+          @Override
+          public void operationComplete(Future<Message> future) throws 
Exception {
+            cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - 
cs.getStartTime());
+            if (metrics != null) {
+              metrics.updateRpc(md, param, cs);
+            }
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Call: " + md.getName() + ", callTime: " + 
cs.getCallTimeMs() + "ms");
+            }
+            if (!future.isSuccess()) {
+              Throwable cause = future.cause();
+              if (cause instanceof IOException) {
+                pcrc.setFailed((IOException) cause);
+              } else {
+                pcrc.setFailed(new IOException(cause));
               }
-              if (!future.isSuccess()) {
-                Throwable cause = future.cause();
+            } else {
+              try {
+                done.run(future.get());
+              } catch (ExecutionException e) {
+                Throwable cause = e.getCause();
                 if (cause instanceof IOException) {
                   pcrc.setFailed((IOException) cause);
                 } else {
                   pcrc.setFailed(new IOException(cause));
                 }
-              } else {
-                try {
-                  done.run(future.get());
-                } catch (ExecutionException e) {
-                  Throwable cause = e.getCause();
-                  if (cause instanceof IOException) {
-                    pcrc.setFailed((IOException) cause);
-                  } else {
-                    pcrc.setFailed(new IOException(cause));
-                  }
-                } catch (InterruptedException e) {
-                  pcrc.setFailed(new IOException(e));
-                }
+              } catch (InterruptedException e) {
+                pcrc.setFailed(new IOException(e));
               }
             }
-          };
+          }
+        };
       cs.setStartTime(EnvironmentEdgeManager.currentTime());
-      connection.callMethod(md, pcrc, param, returnType, 
cs).addListener(listener);
+      connection.callMethod(md, param, pcrc.cellScanner(), returnType,
+          getMessageConverterWithRpcController(pcrc), null,
+          pcrc.getCallTimeout(), pcrc.getPriority(), cs)
+          .addListener(listener);
     } catch (StoppedRpcClientException|FailedServerException e) {
       pcrc.setFailed(e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java
new file mode 100644
index 0000000..09dda09
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Converts exceptions to other exceptions
+ */
+@InterfaceAudience.Private
+public interface IOExceptionConverter {
+  /**
+   * Converts given IOException
+   * @param e exception to convert
+   * @return converted IOException
+   */
+  IOException convert(IOException e);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java
new file mode 100644
index 0000000..527ac95
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.Message;
+import java.io.IOException;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Interface to convert Messages to specific types
+ * @param <M> Message Type to convert
+ * @param <O> Output Type
+ */
+@InterfaceAudience.Private
+public interface MessageConverter<M,O> {
+  /**
+   * Converts Message to Output
+   * @param msg to convert
+   * @param cellScanner to use for conversion
+   * @return Output
+   * @throws IOException if message could not be converted to response
+   */
+  O convert(M msg, CellScanner cellScanner) throws IOException;
+
+  MessageConverter<Message,Message> NO_CONVERTER = new 
MessageConverter<Message, Message>() {
+    @Override
+    public Message convert(Message msg, CellScanner cellScanner) throws 
IOException {
+      return null;
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java
new file mode 100644
index 0000000..0d05db8
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Future;
+
+/**
+ * Abstract response promise
+ * @param <T> Type of result contained in Promise
+ */
+@InterfaceAudience.Private
+public class Promise<T> extends DefaultPromise<T> implements Future<T> {
+  /**
+   * Constructor
+   * @param eventLoop to handle events on
+   */
+  public Promise(EventExecutor eventLoop) {
+    super(eventLoop);
+  }
+}

Reply via email to