Repository: hbase
Updated Branches:
  refs/heads/master f72c3ef34 -> 3275b964c


HBASE-12668 Adapt PayloadCarryingRpcController so it can also be used in an 
async way

Signed-off-by: stack <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3275b964
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3275b964
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3275b964

Branch: refs/heads/master
Commit: 3275b964c1c1fbb20ffe646b37317496b265660b
Parents: f72c3ef
Author: Jurriaan Mous <[email protected]>
Authored: Wed Dec 10 15:08:42 2014 +0100
Committer: stack <[email protected]>
Committed: Fri Dec 12 07:44:04 2014 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/AbstractRpcClient.java     | 126 +++++++++++++++++++
 .../hbase/ipc/PayloadCarryingRpcController.java |   6 +
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  | 102 +--------------
 .../hbase/ipc/TimeLimitedRpcController.java     |  62 +++++++--
 .../hadoop/hbase/client/TestClientTimeouts.java |   3 +-
 5 files changed, 191 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3275b964/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 766ad8f..df43f6f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -19,17 +19,29 @@
 package org.apache.hadoop.hbase.ipc;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PoolMap;
 import org.apache.hadoop.io.compress.CompressionCodec;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 
 /**
@@ -174,4 +186,118 @@ public abstract class AbstractRpcClient implements 
RpcClient {
   protected static int getPoolSize(Configuration config) {
     return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
   }
+
+
+  /**
+   * Make a blocking call. Throws exceptions if there are network problems or 
if the remote code
+   * threw an exception.
+   * @param ticket Be careful which ticket you pass. A new user will mean a 
new Connection.
+   *          {@link UserProvider#getCurrent()} makes a new instance of User 
each time so will be a
+   *          new Connection each time.
+   * @return A pair with the Message response and the Cell data (if any).
+   */
+  Message callBlockingMethod(Descriptors.MethodDescriptor md, 
PayloadCarryingRpcController pcrc,
+      Message param, Message returnType, final User ticket, final 
InetSocketAddress isa)
+      throws ServiceException {
+    long startTime = 0;
+    if (LOG.isTraceEnabled()) {
+      startTime = EnvironmentEdgeManager.currentTime();
+    }
+    int callTimeout = 0;
+    CellScanner cells = null;
+    if (pcrc != null) {
+      callTimeout = pcrc.getCallTimeout();
+      cells = pcrc.cellScanner();
+      // Clear it here so we don't by mistake try and these cells processing 
results.
+      pcrc.setCellScanner(null);
+    }
+    Pair<Message, CellScanner> val;
+    try {
+      val = call(pcrc, md, param, cells, returnType, ticket, isa, callTimeout,
+          pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
+      if (pcrc != null) {
+        // Shove the results into controller so can be carried across the 
proxy/pb service void.
+        if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond());
+      } else if (val.getSecond() != null) {
+        throw new ServiceException("Client dropping data on the floor!");
+      }
+
+      if (LOG.isTraceEnabled()) {
+        long callTime = EnvironmentEdgeManager.currentTime() - startTime;
+        LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
+      }
+      return val.getFirst();
+    } catch (Throwable e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  /**
+   * Make a call, passing <code>param</code>, to the IPC server running at
+   * <code>address</code> which is servicing the <code>protocol</code> 
protocol,
+   * with the <code>ticket</code> credentials, returning the value.
+   * Throws exceptions if there are network problems or if the remote code
+   * threw an exception.
+   * @param ticket Be careful which ticket you pass. A new user will mean a 
new Connection.
+   *          {@link UserProvider#getCurrent()} makes a new instance of User 
each time so will be a
+   *          new Connection each time.
+   * @return A pair with the Message response and the Cell data (if any).
+   * @throws InterruptedException
+   * @throws java.io.IOException
+   */
+  protected abstract Pair<Message, CellScanner> 
call(PayloadCarryingRpcController pcrc,
+      Descriptors.MethodDescriptor md, Message param, CellScanner cells,
+      Message returnType, User ticket, InetSocketAddress addr, int 
callTimeout, int priority) throws
+      IOException, InterruptedException;
+
+  /**
+   * Creates a "channel" that can be used by a blocking protobuf service.  
Useful setting up
+   * protobuf blocking stubs.
+   * @return A blocking rpc channel that goes via this rpc client instance.
+   */
+  @Override
+  public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn,
+      final User ticket, int defaultOperationTimeout) {
+    return new BlockingRpcChannelImplementation(this, sn, ticket, 
defaultOperationTimeout);
+  }
+
+  /**
+   * Blocking rpc channel that goes via hbase rpc.
+   */
+  @VisibleForTesting
+  public static class BlockingRpcChannelImplementation implements 
BlockingRpcChannel {
+    private final InetSocketAddress isa;
+    private final AbstractRpcClient rpcClient;
+    private final User ticket;
+    private final int defaultOperationTimeout;
+
+    /**
+     * @param defaultOperationTimeout - the default timeout when no timeout is 
given
+     *                                   by the caller.
+     */
+    protected BlockingRpcChannelImplementation(final AbstractRpcClient 
rpcClient,
+        final ServerName sn, final User ticket, int defaultOperationTimeout) {
+      this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
+      this.rpcClient = rpcClient;
+      this.ticket = ticket;
+      this.defaultOperationTimeout = defaultOperationTimeout;
+    }
+
+    @Override
+    public Message callBlockingMethod(Descriptors.MethodDescriptor md, 
RpcController controller,
+        Message param, Message returnType) throws ServiceException {
+      PayloadCarryingRpcController pcrc;
+      if (controller != null) {
+        pcrc = (PayloadCarryingRpcController) controller;
+        if (!pcrc.hasCallTimeout()){
+          pcrc.setCallTimeout(defaultOperationTimeout);
+        }
+      } else {
+        pcrc =  new PayloadCarryingRpcController();
+        pcrc.setCallTimeout(defaultOperationTimeout);
+      }
+
+      return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, 
this.ticket, this.isa);
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3275b964/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
index 141f234..80d6fa0 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
@@ -96,4 +96,10 @@ public class PayloadCarryingRpcController
   public int getPriority() {
     return priority;
   }
+
+  @Override public void reset() {
+    super.reset();
+    priority = 0;
+    cellScanner = null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3275b964/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index c5578a1..97fa475 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -19,14 +19,10 @@
 
 package org.apache.hadoop.hbase.ipc;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.BlockingRpcChannel;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
 import com.google.protobuf.Message.Builder;
 import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -1144,7 +1140,8 @@ public class RpcClientImpl extends AbstractRpcClient {
    * @throws InterruptedException
    * @throws IOException
    */
-  Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, 
MethodDescriptor md,
+  @Override
+  protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, 
MethodDescriptor md,
                                   Message param, CellScanner cells,
       Message returnType, User ticket, InetSocketAddress addr, int 
callTimeout, int priority)
       throws IOException, InterruptedException {
@@ -1285,99 +1282,4 @@ public class RpcClientImpl extends AbstractRpcClient {
 
     return connection;
   }
-
-  /**
-   * Make a blocking call. Throws exceptions if there are network problems or 
if the remote code
-   * threw an exception.
-   * @param ticket Be careful which ticket you pass. A new user will mean a 
new Connection.
-   *          {@link UserProvider#getCurrent()} makes a new instance of User 
each time so will be a
-   *          new Connection each time.
-   * @return A pair with the Message response and the Cell data (if any).
-   */
-  Message callBlockingMethod(MethodDescriptor md, PayloadCarryingRpcController 
pcrc,
-      Message param, Message returnType, final User ticket, final 
InetSocketAddress isa)
-  throws ServiceException {
-    long startTime = 0;
-    if (LOG.isTraceEnabled()) {
-      startTime = EnvironmentEdgeManager.currentTime();
-    }
-    int callTimeout = 0;
-    CellScanner cells = null;
-    if (pcrc != null) {
-      callTimeout = pcrc.getCallTimeout();
-      cells = pcrc.cellScanner();
-      // Clear it here so we don't by mistake try and these cells processing 
results.
-      pcrc.setCellScanner(null);
-    }
-    Pair<Message, CellScanner> val;
-    try {
-      val = call(pcrc, md, param, cells, returnType, ticket, isa, callTimeout,
-        pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
-      if (pcrc != null) {
-        // Shove the results into controller so can be carried across the 
proxy/pb service void.
-        if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond());
-      } else if (val.getSecond() != null) {
-        throw new ServiceException("Client dropping data on the floor!");
-      }
-
-      if (LOG.isTraceEnabled()) {
-        long callTime = EnvironmentEdgeManager.currentTime() - startTime;
-        LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
-      }
-      return val.getFirst();
-    } catch (Throwable e) {
-      throw new ServiceException(e);
-    }
-  }
-
-  /**
-   * Creates a "channel" that can be used by a blocking protobuf service.  
Useful setting up
-   * protobuf blocking stubs.
-   * @return A blocking rpc channel that goes via this rpc client instance.
-   */
-  @Override
-  public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn,
-      final User ticket, int defaultOperationTimeout) {
-    return new BlockingRpcChannelImplementation(this, sn, ticket, 
defaultOperationTimeout);
-  }
-
-  /**
-   * Blocking rpc channel that goes via hbase rpc.
-   */
-  @VisibleForTesting
-  public static class BlockingRpcChannelImplementation implements 
BlockingRpcChannel {
-    private final InetSocketAddress isa;
-    private final RpcClientImpl rpcClient;
-    private final User ticket;
-    private final int defaultOperationTimeout;
-
-    /**
-     * @param defaultOperationTimeout - the default timeout when no timeout is 
given
-     *                                   by the caller.
-     */
-    protected BlockingRpcChannelImplementation(final RpcClientImpl rpcClient, 
final ServerName sn,
-        final User ticket, int defaultOperationTimeout) {
-      this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
-      this.rpcClient = rpcClient;
-      this.ticket = ticket;
-      this.defaultOperationTimeout = defaultOperationTimeout;
-    }
-
-    @Override
-    public Message callBlockingMethod(MethodDescriptor md, RpcController 
controller,
-                                      Message param, Message returnType) 
throws ServiceException {
-      PayloadCarryingRpcController pcrc;
-      if (controller != null) {
-        pcrc = (PayloadCarryingRpcController) controller;
-        if (!pcrc.hasCallTimeout()){
-          pcrc.setCallTimeout(defaultOperationTimeout);
-        }
-      } else {
-        pcrc =  new PayloadCarryingRpcController();
-        pcrc.setCallTimeout(defaultOperationTimeout);
-      }
-
-      return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, 
this.ticket, this.isa);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3275b964/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
index 2ab2a5b..94b743f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
@@ -18,8 +18,7 @@
 
 package org.apache.hadoop.hbase.ipc;
 
-
-
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -38,6 +37,11 @@ public class TimeLimitedRpcController implements 
RpcController {
   protected final AtomicReference<RpcCallback<Object>> cancellationCb =
       new AtomicReference<RpcCallback<Object>>(null);
 
+  protected final AtomicReference<RpcCallback<IOException>> failureCb =
+      new AtomicReference<RpcCallback<IOException>>(null);
+
+  private IOException exception;
+
   public Integer getCallTimeout() {
     return callTimeout;
   }
@@ -52,12 +56,20 @@ public class TimeLimitedRpcController implements 
RpcController {
 
   @Override
   public String errorText() {
-    throw new UnsupportedOperationException();
+    if (exception != null) {
+      return exception.getMessage();
+    } else {
+      return null;
+    }
   }
 
+  /**
+   * For use in async rpc clients
+   * @return true if failed
+   */
   @Override
   public boolean failed() {
-    throw new UnsupportedOperationException();
+    return this.exception != null;
   }
 
   @Override
@@ -68,16 +80,52 @@ public class TimeLimitedRpcController implements 
RpcController {
   @Override
   public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
     this.cancellationCb.set(cancellationCb);
+    if (this.cancelled) {
+      cancellationCb.run(null);
+    }
+  }
+
+  /**
+   * Notify a callback on error.
+   * For use in async rpc clients
+   *
+   * @param failureCb the callback to call on error
+   */
+  public void notifyOnFail(RpcCallback<IOException> failureCb) {
+    this.failureCb.set(failureCb);
+    if (this.exception != null) {
+      failureCb.run(this.exception);
+    }
   }
 
   @Override
   public void reset() {
-    throw new UnsupportedOperationException();
+    exception = null;
+    cancelled = false;
+    failureCb.set(null);
+    cancellationCb.set(null);
+    callTimeout = null;
   }
 
   @Override
-  public void setFailed(String arg0) {
-    throw new UnsupportedOperationException();
+  public void setFailed(String reason) {
+    this.exception = new IOException(reason);
+    if (this.failureCb.get() != null) {
+      this.failureCb.get().run(this.exception);
+    }
+  }
+
+  /**
+   * Set failed with an exception to pass on.
+   * For use in async rpc clients
+   *
+   * @param e exception to set with
+   */
+  public void setFailed(IOException e) {
+    this.exception = e;
+    if (this.failureCb.get() != null) {
+      this.failureCb.get().run(this.exception);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/3275b964/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
index 0fbf21e..c04c4f2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcClientImpl;
@@ -146,7 +147,7 @@ public class TestClientTimeouts {
    * Blocking rpc channel that goes via hbase rpc.
    */
   static class RandomTimeoutBlockingRpcChannel
-      extends RpcClientImpl.BlockingRpcChannelImplementation {
+      extends AbstractRpcClient.BlockingRpcChannelImplementation {
     private static final Random RANDOM = new 
Random(System.currentTimeMillis());
     public static final double CHANCE_OF_TIMEOUT = 0.3;
     private static AtomicInteger invokations = new AtomicInteger();

Reply via email to