HBASE-15745 Refactor RPC classes to better accept async changes

Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/56358a0f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/56358a0f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/56358a0f

Branch: refs/heads/hbase-12439
Commit: 56358a0fd3e93b7ba8dc2a9cd8d92d961a1c24a9
Parents: 86ca09e
Author: Jurriaan Mous <jurm...@jurmo.us>
Authored: Sun May 1 15:01:01 2016 +0200
Committer: stack <st...@apache.org>
Committed: Fri May 6 13:43:45 2016 -0700

----------------------------------------------------------------------
 .../client/AbstractRegionServerCallable.java    | 158 ++++
 .../client/FastFailInterceptorContext.java      |   5 +-
 .../client/NoOpRetryingInterceptorContext.java  |   4 +-
 .../hbase/client/RegionServerCallable.java      | 107 +--
 .../hadoop/hbase/client/RetryingCallable.java   |  34 +-
 .../hbase/client/RetryingCallableBase.java      |  60 ++
 .../RetryingCallerInterceptorContext.java       |   4 +-
 .../hadoop/hbase/ipc/AsyncRpcChannel.java       | 700 +----------------
 .../hadoop/hbase/ipc/AsyncRpcChannelImpl.java   | 743 +++++++++++++++++++
 .../apache/hadoop/hbase/ipc/AsyncRpcClient.java |   8 +-
 .../hbase/ipc/AsyncServerResponseHandler.java   |   4 +-
 .../hadoop/hbase/ipc/CoprocessorRpcChannel.java |  64 +-
 .../hbase/ipc/MasterCoprocessorRpcChannel.java  |   2 +-
 .../hbase/ipc/RegionCoprocessorRpcChannel.java  |  12 +-
 .../ipc/RegionServerCoprocessorRpcChannel.java  |   2 +-
 .../hbase/ipc/SyncCoprocessorRpcChannel.java    |  79 ++
 16 files changed, 1106 insertions(+), 880 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
new file mode 100644
index 0000000..ee9a781
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Implementations call a RegionServer.
+ * Passed to a {@link RpcRetryingCaller} so we retry on fail.
+ * TODO: this class is actually tied to one region, because most of the paths 
make use of
+ *       the regioninfo part of location when building requests. The only 
reason it works for
+ *       multi-region requests (e.g. batch) is that they happen to not use the 
region parts.
+ *       This could be done cleaner (e.g. having a generic parameter and 2 
derived classes,
+ *       RegionCallable and actual RegionServerCallable with ServerName.
+ * @param <T> the class that the ServerCallable handles
+ */
+@InterfaceAudience.Private
+abstract class AbstractRegionServerCallable<T> implements RetryingCallableBase 
{
+  protected final Connection connection;
+  protected final TableName tableName;
+  protected final byte[] row;
+  protected HRegionLocation location;
+
+  protected final static int MIN_WAIT_DEAD_SERVER = 10000;
+
+  /**
+   * @param connection Connection to use.
+   * @param tableName Table name to which <code>row</code> belongs.
+   * @param row The row we want in <code>tableName</code>.
+   */
+  public AbstractRegionServerCallable(Connection connection, TableName 
tableName, byte[] row) {
+    this.connection = connection;
+    this.tableName = tableName;
+    this.row = row;
+  }
+
+  /**
+   * @return {@link ClusterConnection} instance used by this Callable.
+   */
+  ClusterConnection getConnection() {
+    return (ClusterConnection) this.connection;
+  }
+
+  protected HRegionLocation getLocation() {
+    return this.location;
+  }
+
+  protected void setLocation(final HRegionLocation location) {
+    this.location = location;
+  }
+
+  public TableName getTableName() {
+    return this.tableName;
+  }
+
+  public byte [] getRow() {
+    return this.row;
+  }
+
+  @Override
+  public void throwable(Throwable t, boolean retrying) {
+    if (t instanceof SocketTimeoutException ||
+        t instanceof ConnectException ||
+        t instanceof RetriesExhaustedException ||
+        (location != null && 
getConnection().isDeadServer(location.getServerName()))) {
+      // if thrown these exceptions, we clear all the cache entries that
+      // map to that slow/dead server; otherwise, let cache miss and ask
+      // hbase:meta again to find the new location
+      if (this.location != null) {
+        getConnection().clearCaches(location.getServerName());
+      }
+    } else if (t instanceof RegionMovedException) {
+      getConnection().updateCachedLocations(tableName, row, t, location);
+    } else if (t instanceof NotServingRegionException && !retrying) {
+      // Purge cache entries for this specific region from hbase:meta cache
+      // since we don't call connect(true) when number of retries is 1.
+      getConnection().deleteCachedRegionLocation(location);
+    }
+  }
+
+  @Override
+  public String getExceptionMessageAdditionalDetail() {
+    return "row '" + Bytes.toString(row) + "' on table '" + tableName + "' at 
" + location;
+  }
+
+  @Override
+  public long sleep(long pause, int tries) {
+    // Tries hasn't been bumped up yet so we use "tries + 1" to get right 
pause time
+    long sleep = ConnectionUtils.getPauseTime(pause, tries + 1);
+    if (sleep < MIN_WAIT_DEAD_SERVER
+        && (location == null || 
getConnection().isDeadServer(location.getServerName()))) {
+      sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
+    }
+    return sleep;
+  }
+
+  /**
+   * @return the HRegionInfo for the current region
+   */
+  public HRegionInfo getHRegionInfo() {
+    if (this.location == null) {
+      return null;
+    }
+    return this.location.getRegionInfo();
+  }
+
+  /**
+   * Prepare for connection to the server hosting region with row from 
tablename.  Does lookup
+   * to find region location and hosting server.
+   * @param reload Set this to true if connection should re-find the region
+   * @throws IOException e
+   */
+  @Override
+  public void prepare(final boolean reload) throws IOException {
+    try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) 
{
+      this.location = regionLocator.getRegionLocation(row, reload);
+    }
+    if (this.location == null) {
+      throw new IOException("Failed to find location, tableName=" + tableName +
+          ", row=" + Bytes.toString(row) + ", reload=" + reload);
+    }
+    setClientByServiceName(this.location.getServerName());
+  }
+
+  /**
+   * Set the Rpc client for Client services
+   * @param serviceName to get client for
+   * @throws IOException When client could not be created
+   */
+  abstract void setClientByServiceName(ServerName serviceName) throws 
IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java
index 3cbdfb3..c9d2324 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java
@@ -118,12 +118,11 @@ class FastFailInterceptorContext extends
     tries = 0;
   }
 
-  public FastFailInterceptorContext prepare(RetryingCallable<?> callable) {
+  public FastFailInterceptorContext prepare(RetryingCallableBase callable) {
     return prepare(callable, 0);
   }
 
-  public FastFailInterceptorContext prepare(RetryingCallable<?> callable,
-      int tries) {
+  public FastFailInterceptorContext prepare(RetryingCallableBase callable, int 
tries) {
     if (callable instanceof RegionServerCallable) {
       RegionServerCallable<?> retryingCallable = (RegionServerCallable<?>) 
callable;
       server = retryingCallable.getLocation().getServerName();

http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java
index 1ccf43c..f8542bd 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java
@@ -29,14 +29,14 @@ class NoOpRetryingInterceptorContext extends 
RetryingCallerInterceptorContext {
 
   @Override
   public RetryingCallerInterceptorContext prepare(
-      RetryingCallable<?> callable) {
+      RetryingCallableBase callable) {
     // Do Nothing
     return this;
   }
 
   @Override
   public RetryingCallerInterceptorContext prepare(
-      RetryingCallable<?> callable, int tries) {
+      RetryingCallableBase callable, int tries) {
     // Do Nothing
     return this;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
index 6e0752b..d878bae 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
@@ -21,15 +21,10 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotEnabledException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * Implementations call a RegionServer and implement {@link #call(int)}.
@@ -42,16 +37,10 @@ import org.apache.hadoop.hbase.util.Bytes;
  * @param <T> the class that the ServerCallable handles
  */
 @InterfaceAudience.Private
-public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
-  // Public because used outside of this package over in ipc.
-  private static final Log LOG = LogFactory.getLog(RegionServerCallable.class);
-  protected final Connection connection;
-  protected final TableName tableName;
-  protected final byte[] row;
-  protected HRegionLocation location;
-  private ClientService.BlockingInterface stub;
+public abstract class RegionServerCallable<T> extends 
AbstractRegionServerCallable<T> implements
+    RetryingCallable<T> {
 
-  protected final static int MIN_WAIT_DEAD_SERVER = 10000;
+  private ClientService.BlockingInterface stub;
 
   /**
    * @param connection Connection to use.
@@ -59,97 +48,25 @@ public abstract class RegionServerCallable<T> implements 
RetryingCallable<T> {
    * @param row The row we want in <code>tableName</code>.
    */
   public RegionServerCallable(Connection connection, TableName tableName, byte 
[] row) {
-    this.connection = connection;
-    this.tableName = tableName;
-    this.row = row;
+    super(connection, tableName, row);
   }
 
-  /**
-   * Prepare for connection to the server hosting region with row from 
tablename.  Does lookup
-   * to find region location and hosting server.
-   * @param reload Set to true to re-check the table state
-   * @throws IOException e
-   */
-  @Override
-  public void prepare(final boolean reload) throws IOException {
-    // check table state if this is a retry
-    if (reload &&
-        !tableName.equals(TableName.META_TABLE_NAME) &&
-        getConnection().isTableDisabled(tableName)) {
-      throw new TableNotEnabledException(tableName.getNameAsString() + " is 
disabled.");
-    }
-    try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) 
{
-      this.location = regionLocator.getRegionLocation(row);
-    }
-    if (this.location == null) {
-      throw new IOException("Failed to find location, tableName=" + tableName +
-        ", row=" + Bytes.toString(row) + ", reload=" + reload);
-    }
-    setStub(getConnection().getClient(this.location.getServerName()));
+  void setClientByServiceName(ServerName service) throws IOException {
+    this.setStub(getConnection().getClient(service));
   }
 
   /**
-   * @return {@link HConnection} instance used by this Callable.
+   * @return Client Rpc protobuf communication stub
    */
-  HConnection getConnection() {
-    return (HConnection) this.connection;
-  }
-
   protected ClientService.BlockingInterface getStub() {
     return this.stub;
   }
 
-  void setStub(final ClientService.BlockingInterface stub) {
-    this.stub = stub;
-  }
-
-  protected HRegionLocation getLocation() {
-    return this.location;
-  }
-
-  protected void setLocation(final HRegionLocation location) {
-    this.location = location;
-  }
-
-  public TableName getTableName() {
-    return this.tableName;
-  }
-
-  public byte [] getRow() {
-    return this.row;
-  }
-
-  @Override
-  public void throwable(Throwable t, boolean retrying) {
-    if (location != null) {
-      getConnection().updateCachedLocations(tableName, 
location.getRegionInfo().getRegionName(),
-          row, t, location.getServerName());
-    }
-  }
-
-  @Override
-  public String getExceptionMessageAdditionalDetail() {
-    return "row '" + Bytes.toString(row) + "' on table '" + tableName + "' at 
" + location;
-  }
-
-  @Override
-  public long sleep(long pause, int tries) {
-    // Tries hasn't been bumped up yet so we use "tries + 1" to get right 
pause time
-    long sleep = ConnectionUtils.getPauseTime(pause, tries + 1);
-    if (sleep < MIN_WAIT_DEAD_SERVER
-        && (location == null || 
getConnection().isDeadServer(location.getServerName()))) {
-      sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
-    }
-    return sleep;
-  }
-
   /**
-   * @return the HRegionInfo for the current region
+   * Set the client protobuf communication stub
+   * @param stub to set
    */
-  public HRegionInfo getHRegionInfo() {
-    if (this.location == null) {
-      return null;
-    }
-    return this.location.getRegionInfo();
+  void setStub(final ClientService.BlockingInterface stub) {
+    this.stub = stub;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
index 7a32175..2377a0d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
@@ -19,8 +19,6 @@
 
 package org.apache.hadoop.hbase.client;
 
-import java.io.IOException;
-
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 /**
@@ -29,23 +27,7 @@ import 
org.apache.hadoop.hbase.classification.InterfaceAudience;
  * @param <T> result class from executing <tt>this</tt>
  */
 @InterfaceAudience.Private
-public interface RetryingCallable<T> {
-  /**
-   * Prepare by setting up any connections to servers, etc., ahead of {@link 
#call(int)} invocation.
-   * @param reload Set this to true if need to requery locations
-   * @throws IOException e
-   */
-  void prepare(final boolean reload) throws IOException;
-
-  /**
-   * Called when {@link #call(int)} throws an exception and we are going to 
retry; take action to
-   * make it so we succeed on next call (clear caches, do relookup of 
locations, etc.).
-   * @param t
-   * @param retrying True if we are in retrying mode (we are not in retrying 
mode when max
-   * retries == 1; we ARE in retrying mode if retries &gt; 1 even when we are 
the last attempt)
-   */
-  void throwable(final Throwable t, boolean retrying);
-
+public interface RetryingCallable<T> extends RetryingCallableBase {
   /**
    * Computes a result, or throws an exception if unable to do so.
    *
@@ -54,18 +36,4 @@ public interface RetryingCallable<T> {
    * @throws Exception if unable to compute a result
    */
   T call(int callTimeout) throws Exception;
-
-  /**
-   * @return Some details from the implementation that we would like to add to 
a terminating
-   * exception; i.e. a fatal exception is being thrown ending retries and we 
might like to add
-   * more implementation-specific detail on to the exception being thrown.
-   */
-  String getExceptionMessageAdditionalDetail();
-
-  /**
-   * @param pause
-   * @param tries
-   * @return Suggestion on how much to sleep between retries
-   */
-  long sleep(final long pause, final int tries);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallableBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallableBase.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallableBase.java
new file mode 100644
index 0000000..483f6c2
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallableBase.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * All generic methods for a Callable that can be retried. It is extended with 
Sync and
+ * Async versions.
+ */
+@InterfaceAudience.Private
+public interface RetryingCallableBase {
+  /**
+   * Prepare by setting up any connections to servers, etc., ahead of call 
invocation.
+   * @param reload Set this to true if need to requery locations
+   * @throws IOException e
+   */
+  void prepare(final boolean reload) throws IOException;
+
+  /**
+   * Called when call throws an exception and we are going to retry; take 
action to
+   * make it so we succeed on next call (clear caches, do relookup of 
locations, etc.).
+   * @param t        throwable which was thrown
+   * @param retrying True if we are in retrying mode (we are not in retrying 
mode when max
+   *                 retries == 1; we ARE in retrying mode if retries &gt; 1 
even when we are the
+   *                 last attempt)
+   */
+  void throwable(final Throwable t, boolean retrying);
+
+  /**
+   * @return Some details from the implementation that we would like to add to 
a terminating
+   *         exception; i.e. a fatal exception is being thrown ending retries 
and we might like to
+   *         add more implementation-specific detail on to the exception being 
thrown.
+   */
+  String getExceptionMessageAdditionalDetail();
+
+  /**
+   * @param pause time to pause
+   * @param tries amount of tries until till sleep
+   * @return Suggestion on how much to sleep between retries
+   */
+  long sleep(final long pause, final int tries);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java
index a9f414f..b0ba9f5 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java
@@ -50,7 +50,7 @@ abstract class RetryingCallerInterceptorContext {
    *         used for use in the current retrying call
    */
   public abstract RetryingCallerInterceptorContext prepare(
-      RetryingCallable<?> callable);
+      RetryingCallableBase callable);
 
   /**
    * Telescopic extension that takes which of the many retries we are currently
@@ -65,5 +65,5 @@ abstract class RetryingCallerInterceptorContext {
    *         retrying call
    */
   public abstract RetryingCallerInterceptorContext prepare(
-      RetryingCallable<?> callable, int tries);
+      RetryingCallableBase callable, int tries);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index 53eb824..60dc5e4 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -17,275 +17,22 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import javax.security.sasl.SaslException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
-import 
org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
-import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
-import org.apache.hadoop.hbase.security.AuthMethod;
-import org.apache.hadoop.hbase.security.SaslClientHandler;
-import org.apache.hadoop.hbase.security.SaslUtil;
-import org.apache.hadoop.hbase.security.SecurityInfo;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
-import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
-
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
 
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Promise;
 
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
+
 /**
- * Netty RPC channel
+ * Interface for Async Rpc Channels
  */
 @InterfaceAudience.Private
-public class AsyncRpcChannel {
-  private static final Log LOG = 
LogFactory.getLog(AsyncRpcChannel.class.getName());
-
-  private static final int MAX_SASL_RETRIES = 5;
-
-  protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> 
TOKEN_HANDDLERS
-    = new HashMap<>();
-
-  static {
-    
TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
-      new AuthenticationTokenSelector());
-  }
-
-  final AsyncRpcClient client;
-
-  // Contains the channel to work with.
-  // Only exists when connected
-  private Channel channel;
-
-  String name;
-  final User ticket;
-  final String serviceName;
-  final InetSocketAddress address;
-
-  private int failureCounter = 0;
-
-  boolean useSasl;
-  AuthMethod authMethod;
-  private int reloginMaxBackoff;
-  private Token<? extends TokenIdentifier> token;
-  private String serverPrincipal;
-
-  // NOTE: closed and connected flags below are only changed when a lock on 
pendingCalls
-  private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, 
AsyncCall>();
-  private boolean connected = false;
-  private boolean closed = false;
-
-  private Timeout cleanupTimer;
-
-  private final TimerTask timeoutTask = new TimerTask() {
-    @Override
-    public void run(Timeout timeout) throws Exception {
-      cleanupCalls();
-    }
-  };
-
-  /**
-   * Constructor for netty RPC channel
-   * @param bootstrap to construct channel on
-   * @param client to connect with
-   * @param ticket of user which uses connection
-   * @param serviceName name of service to connect to
-   * @param address to connect to
-   */
-  public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, 
User ticket,
-      String serviceName, InetSocketAddress address) {
-    this.client = client;
-
-    this.ticket = ticket;
-    this.serviceName = serviceName;
-    this.address = address;
-
-    this.channel = connect(bootstrap).channel();
-
-    name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString()
-        + ((ticket == null) ? " from unknown user" : (" from " + 
ticket.getName())));
-  }
-
-  /**
-   * Connect to channel
-   * @param bootstrap to connect to
-   * @return future of connection
-   */
-  private ChannelFuture connect(final Bootstrap bootstrap) {
-    return bootstrap.remoteAddress(address).connect()
-        .addListener(new GenericFutureListener<ChannelFuture>() {
-          @Override
-          public void operationComplete(final ChannelFuture f) throws 
Exception {
-            if (!f.isSuccess()) {
-              retryOrClose(bootstrap, failureCounter++, client.failureSleep, 
f.cause());
-              return;
-            }
-            channel = f.channel();
-
-            setupAuthorization();
-
-            ByteBuf b = channel.alloc().directBuffer(6);
-            createPreamble(b, authMethod);
-            
channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
-            if (useSasl) {
-              UserGroupInformation ticket = 
AsyncRpcChannel.this.ticket.getUGI();
-              if (authMethod == AuthMethod.KERBEROS) {
-                if (ticket != null && ticket.getRealUser() != null) {
-                  ticket = ticket.getRealUser();
-                }
-              }
-              SaslClientHandler saslHandler;
-              if (ticket == null) {
-                throw new FatalConnectionException("ticket/user is null");
-              }
-              final UserGroupInformation realTicket = ticket;
-              saslHandler = ticket.doAs(new 
PrivilegedExceptionAction<SaslClientHandler>() {
-                @Override
-                public SaslClientHandler run() throws IOException {
-                  return getSaslHandler(realTicket, bootstrap);
-                }
-              });
-              if (saslHandler != null) {
-                // Sasl connect is successful. Let's set up Sasl channel 
handler
-                channel.pipeline().addFirst(saslHandler);
-              } else {
-                // fall back to simple auth because server told us so.
-                authMethod = AuthMethod.SIMPLE;
-                useSasl = false;
-              }
-            } else {
-              startHBaseConnection(f.channel());
-            }
-          }
-        });
-  }
-
-  /**
-   * Start HBase connection
-   * @param ch channel to start connection on
-   */
-  private void startHBaseConnection(Channel ch) {
-    ch.pipeline().addLast("frameDecoder",
-      new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
-    ch.pipeline().addLast(new AsyncServerResponseHandler(this));
-    try {
-      writeChannelHeader(ch).addListener(new 
GenericFutureListener<ChannelFuture>() {
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-          if (!future.isSuccess()) {
-            close(future.cause());
-            return;
-          }
-          List<AsyncCall> callsToWrite;
-          synchronized (pendingCalls) {
-            connected = true;
-            callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
-          }
-          for (AsyncCall call : callsToWrite) {
-            writeRequest(call);
-          }
-        }
-      });
-    } catch (IOException e) {
-      close(e);
-    }
-  }
-
-  /**
-   * Get SASL handler
-   * @param bootstrap to reconnect to
-   * @return new SASL handler
-   * @throws java.io.IOException if handler failed to create
-   */
-  private SaslClientHandler getSaslHandler(final UserGroupInformation 
realTicket,
-      final Bootstrap bootstrap) throws IOException {
-    return new SaslClientHandler(realTicket, authMethod, token, 
serverPrincipal,
-        client.fallbackAllowed,
-        client.conf.get("hbase.rpc.protection",
-          SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()),
-        new SaslClientHandler.SaslExceptionHandler() {
-          @Override
-          public void handle(int retryCount, Random random, Throwable cause) {
-            try {
-              // Handle Sasl failure. Try to potentially get new credentials
-              handleSaslConnectionFailure(retryCount, cause, realTicket);
-
-              retryOrClose(bootstrap, failureCounter++, 
random.nextInt(reloginMaxBackoff) + 1,
-                cause);
-            } catch (IOException | InterruptedException e) {
-              close(e);
-            }
-          }
-        }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
-          @Override
-          public void onSuccess(Channel channel) {
-            startHBaseConnection(channel);
-          }
-        });
-  }
-
-  /**
-   * Retry to connect or close
-   * @param bootstrap to connect with
-   * @param failureCount failure count
-   * @param e exception of fail
-   */
-  private void retryOrClose(final Bootstrap bootstrap, int failureCount, long 
timeout,
-      Throwable e) {
-    if (failureCount < client.maxRetries) {
-      client.newTimeout(new TimerTask() {
-        @Override
-        public void run(Timeout timeout) throws Exception {
-          connect(bootstrap);
-        }
-      }, timeout, TimeUnit.MILLISECONDS);
-    } else {
-      client.failedServers.addToFailedServers(address);
-      close(e);
-    }
-  }
+public interface AsyncRpcChannel {
 
   /**
    * Calls method on channel
@@ -294,439 +41,32 @@ public class AsyncRpcChannel {
    * @param request to send
    * @param responsePrototype to construct response with
    */
-  public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
+  Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
       final PayloadCarryingRpcController controller, final Message request,
-      final Message responsePrototype, MetricsConnection.CallStats callStats) {
-    final AsyncCall call = new AsyncCall(channel.eventLoop(), 
client.callIdCnt.getAndIncrement(),
-        method, request, controller, responsePrototype, callStats);
-    controller.notifyOnCancel(new RpcCallback<Object>() {
-      @Override
-      public void run(Object parameter) {
-        // TODO: do not need to call AsyncCall.setFailed?
-        synchronized (pendingCalls) {
-          pendingCalls.remove(call.id);
-        }
-      }
-    });
-    // TODO: this should be handled by PayloadCarryingRpcController.
-    if (controller.isCanceled()) {
-      // To finish if the call was cancelled before we set the notification 
(race condition)
-      call.cancel(true);
-      return call;
-    }
-
-    synchronized (pendingCalls) {
-      if (closed) {
-        Promise<Message> promise = channel.eventLoop().newPromise();
-        promise.setFailure(new ConnectException());
-        return promise;
-      }
-      pendingCalls.put(call.id, call);
-      // Add timeout for cleanup if none is present
-      if (cleanupTimer == null && call.getRpcTimeout() > 0) {
-        cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
-      }
-      if (!connected) {
-        return call;
-      }
-    }
-    writeRequest(call);
-    return call;
-  }
-
-  AsyncCall removePendingCall(int id) {
-    synchronized (pendingCalls) {
-      return pendingCalls.remove(id);
-    }
-  }
+      final Message responsePrototype, MetricsConnection.CallStats callStats);
 
   /**
-   * Write the channel header
-   * @param channel to write to
-   * @return future of write
-   * @throws java.io.IOException on failure to write
+   * Get the EventLoop on which this channel operated
+   * @return EventLoop
    */
-  private ChannelFuture writeChannelHeader(Channel channel) throws IOException 
{
-    RPCProtos.ConnectionHeader.Builder headerBuilder = 
RPCProtos.ConnectionHeader.newBuilder()
-        .setServiceName(serviceName);
-
-    RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), 
authMethod);
-    if (userInfoPB != null) {
-      headerBuilder.setUserInfo(userInfoPB);
-    }
-
-    if (client.codec != null) {
-      
headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
-    }
-    if (client.compressor != null) {
-      
headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
-    }
-
-    headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
-    RPCProtos.ConnectionHeader header = headerBuilder.build();
-
-    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
-
-    ByteBuf b = channel.alloc().directBuffer(totalSize);
-
-    b.writeInt(header.getSerializedSize());
-    b.writeBytes(header.toByteArray());
-
-    return channel.writeAndFlush(b);
-  }
-
-  /**
-   * Write request to channel
-   * @param call to write
-   */
-  private void writeRequest(final AsyncCall call) {
-    try {
-      final RPCProtos.RequestHeader.Builder requestHeaderBuilder = 
RPCProtos.RequestHeader
-          .newBuilder();
-      
requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName())
-          .setRequestParam(call.param != null);
-
-      if (Trace.isTracing()) {
-        Span s = Trace.currentSpan();
-        requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder()
-            .setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
-      }
-
-      ByteBuffer cellBlock = 
client.buildCellBlock(call.controller.cellScanner());
-      if (cellBlock != null) {
-        final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = 
RPCProtos.CellBlockMeta
-            .newBuilder();
-        cellBlockBuilder.setLength(cellBlock.limit());
-        requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
-      }
-      // Only pass priority if there one. Let zero be same as no priority.
-      if (call.controller.getPriority() != 
PayloadCarryingRpcController.PRIORITY_UNSET) {
-        requestHeaderBuilder.setPriority(call.controller.getPriority());
-      }
-
-      RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
-
-      int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
-      if (cellBlock != null) {
-        totalSize += cellBlock.remaining();
-      }
-
-      ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
-      try (ByteBufOutputStream out = new ByteBufOutputStream(b)) {
-        call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, 
cellBlock));
-      }
-
-      channel.writeAndFlush(b).addListener(new CallWriteListener(this, 
call.id));
-    } catch (IOException e) {
-      close(e);
-    }
-  }
-
-  /**
-   * Set up server authorization
-   * @throws java.io.IOException if auth setup failed
-   */
-  private void setupAuthorization() throws IOException {
-    SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
-    this.useSasl = client.userProvider.isHBaseSecurityEnabled();
-
-    this.token = null;
-    if (useSasl && securityInfo != null) {
-      AuthenticationProtos.TokenIdentifier.Kind tokenKind = 
securityInfo.getTokenKind();
-      if (tokenKind != null) {
-        TokenSelector<? extends TokenIdentifier> tokenSelector = 
TOKEN_HANDDLERS.get(tokenKind);
-        if (tokenSelector != null) {
-          token = tokenSelector.selectToken(new Text(client.clusterId),
-            ticket.getUGI().getTokens());
-        } else if (LOG.isDebugEnabled()) {
-          LOG.debug("No token selector found for type " + tokenKind);
-        }
-      }
-      String serverKey = securityInfo.getServerPrincipal();
-      if (serverKey == null) {
-        throw new IOException("Can't obtain server Kerberos config key from 
SecurityInfo");
-      }
-      this.serverPrincipal = 
SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
-        address.getAddress().getCanonicalHostName().toLowerCase());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("RPC Server Kerberos principal name for service=" + 
serviceName + " is "
-            + serverPrincipal);
-      }
-    }
-
-    if (!useSasl) {
-      authMethod = AuthMethod.SIMPLE;
-    } else if (token != null) {
-      authMethod = AuthMethod.DIGEST;
-    } else {
-      authMethod = AuthMethod.KERBEROS;
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(
-        "Use " + authMethod + " authentication for service " + serviceName + 
", sasl=" + useSasl);
-    }
-    reloginMaxBackoff = 
client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
-  }
-
-  /**
-   * Build the user information
-   * @param ugi User Group Information
-   * @param authMethod Authorization method
-   * @return UserInformation protobuf
-   */
-  private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, 
AuthMethod authMethod) {
-    if (ugi == null || authMethod == AuthMethod.DIGEST) {
-      // Don't send user for token auth
-      return null;
-    }
-    RPCProtos.UserInformation.Builder userInfoPB = 
RPCProtos.UserInformation.newBuilder();
-    if (authMethod == AuthMethod.KERBEROS) {
-      // Send effective user for Kerberos auth
-      userInfoPB.setEffectiveUser(ugi.getUserName());
-    } else if (authMethod == AuthMethod.SIMPLE) {
-      // Send both effective user and real user for simple auth
-      userInfoPB.setEffectiveUser(ugi.getUserName());
-      if (ugi.getRealUser() != null) {
-        userInfoPB.setRealUser(ugi.getRealUser().getUserName());
-      }
-    }
-    return userInfoPB.build();
-  }
-
-  /**
-   * Create connection preamble
-   * @param byteBuf to write to
-   * @param authMethod to write
-   */
-  private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
-    byteBuf.writeBytes(HConstants.RPC_HEADER);
-    byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
-    byteBuf.writeByte(authMethod.code);
-  }
-
-  private void close0(Throwable e) {
-    List<AsyncCall> toCleanup;
-    synchronized (pendingCalls) {
-      if (closed) {
-        return;
-      }
-      closed = true;
-      toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
-      pendingCalls.clear();
-    }
-    IOException closeException = null;
-    if (e != null) {
-      if (e instanceof IOException) {
-        closeException = (IOException) e;
-      } else {
-        closeException = new IOException(e);
-      }
-    }
-    // log the info
-    if (LOG.isDebugEnabled() && closeException != null) {
-      LOG.debug(name + ": closing ipc connection to " + address, 
closeException);
-    }
-    if (cleanupTimer != null) {
-      cleanupTimer.cancel();
-      cleanupTimer = null;
-    }
-    for (AsyncCall call : toCleanup) {
-      call.setFailed(closeException != null ? closeException
-          : new ConnectionClosingException(
-              "Call id=" + call.id + " on server " + address + " aborted: 
connection is closing"));
-    }
-    channel.disconnect().addListener(ChannelFutureListener.CLOSE);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(name + ": closed");
-    }
-  }
+  EventExecutor getEventExecutor();
 
   /**
    * Close connection
-   * @param e exception on close
+   * @param cause of closure.
    */
-  public void close(final Throwable e) {
-    client.removeConnection(this);
-
-    // Move closing from the requesting thread to the channel thread
-    if (channel.eventLoop().inEventLoop()) {
-      close0(e);
-    } else {
-      channel.eventLoop().execute(new Runnable() {
-        @Override
-        public void run() {
-          close0(e);
-        }
-      });
-    }
-  }
-
-  /**
-   * Clean up calls.
-   */
-  private void cleanupCalls() {
-    List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
-    long currentTime = EnvironmentEdgeManager.currentTime();
-    long nextCleanupTaskDelay = -1L;
-    synchronized (pendingCalls) {
-      for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); 
iter.hasNext();) {
-        AsyncCall call = iter.next();
-        long timeout = call.getRpcTimeout();
-        if (timeout > 0) {
-          if (currentTime - call.getStartTime() >= timeout) {
-            iter.remove();
-            toCleanup.add(call);
-          } else {
-            if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
-              nextCleanupTaskDelay = timeout;
-            }
-          }
-        }
-      }
-      if (nextCleanupTaskDelay > 0) {
-        cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, 
TimeUnit.MILLISECONDS);
-      } else {
-        cleanupTimer = null;
-      }
-    }
-    for (AsyncCall call : toCleanup) {
-      call.setFailed(new CallTimeoutException("Call id=" + call.id + ", 
waitTime="
-          + (currentTime - call.getStartTime()) + ", rpcTimeout=" + 
call.getRpcTimeout()));
-    }
-  }
+  void close(Throwable cause);
 
   /**
    * Check if the connection is alive
+   *
    * @return true if alive
    */
-  public boolean isAlive() {
-    return channel.isOpen();
-  }
-
-  /**
-   * Check if user should authenticate over Kerberos
-   * @return true if should be authenticated over Kerberos
-   * @throws java.io.IOException on failure of check
-   */
-  private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
-    UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
-    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-    UserGroupInformation realUser = currentUser.getRealUser();
-    return authMethod == AuthMethod.KERBEROS && loginUser != null &&
-      // Make sure user logged in using Kerberos either keytab or TGT
-      loginUser.hasKerberosCredentials() &&
-      // relogin only in case it is the login user (e.g. JT)
-      // or superuser (like oozie).
-      (loginUser.equals(currentUser) || loginUser.equals(realUser));
-  }
+  boolean isAlive();
 
   /**
-   * If multiple clients with the same principal try to connect to the same 
server at the same time,
-   * the server assumes a replay attack is in progress. This is a feature of 
kerberos. In order to
-   * work around this, what is done is that the client backs off randomly and 
tries to initiate the
-   * connection again. The other problem is to do with ticket expiry. To 
handle that, a relogin is
-   * attempted.
-   * <p>
-   * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} 
method. In case when the
-   * user doesn't have valid credentials, we don't need to retry (from cache 
or ticket). In such
-   * cases, it is prudent to throw a runtime exception when we receive a 
SaslException from the
-   * underlying authentication implementation, so there is no retry from other 
high level (for eg,
-   * HCM or HBaseAdmin).
-   * </p>
-   * @param currRetries retry count
-   * @param ex exception describing fail
-   * @param user which is trying to connect
-   * @throws java.io.IOException if IO fail
-   * @throws InterruptedException if thread is interrupted
+   * Get the address on which this channel operates
+   * @return InetSocketAddress
    */
-  private void handleSaslConnectionFailure(final int currRetries, final 
Throwable ex,
-      final UserGroupInformation user) throws IOException, 
InterruptedException {
-    user.doAs(new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws IOException, InterruptedException {
-        if (shouldAuthenticateOverKrb()) {
-          if (currRetries < MAX_SASL_RETRIES) {
-            LOG.debug("Exception encountered while connecting to the server : 
" + ex);
-            // try re-login
-            if (UserGroupInformation.isLoginKeytabBased()) {
-              UserGroupInformation.getLoginUser().reloginFromKeytab();
-            } else {
-              UserGroupInformation.getLoginUser().reloginFromTicketCache();
-            }
-
-            // Should reconnect
-            return null;
-          } else {
-            String msg = "Couldn't setup connection for "
-                + UserGroupInformation.getLoginUser().getUserName() + " to " + 
serverPrincipal;
-            LOG.warn(msg);
-            throw (IOException) new IOException(msg).initCause(ex);
-          }
-        } else {
-          LOG.warn("Exception encountered while connecting to " + "the server 
: " + ex);
-        }
-        if (ex instanceof RemoteException) {
-          throw (RemoteException) ex;
-        }
-        if (ex instanceof SaslException) {
-          String msg = "SASL authentication failed."
-              + " The most likely cause is missing or invalid credentials." + 
" Consider 'kinit'.";
-          LOG.fatal(msg, ex);
-          throw new RuntimeException(msg, ex);
-        }
-        throw new IOException(ex);
-      }
-    });
-  }
-
-  public int getConnectionHashCode() {
-    return ConnectionId.hashCode(ticket, serviceName, address);
-  }
-
-  @Override
-  public int hashCode() {
-    return getConnectionHashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof AsyncRpcChannel) {
-      AsyncRpcChannel channel = (AsyncRpcChannel) obj;
-      return channel.hashCode() == obj.hashCode();
-    }
-    return false;
-  }
-
-  @Override
-  public String toString() {
-    return this.address.toString() + "/" + this.serviceName + "/" + 
this.ticket;
-  }
-
-  /**
-   * Listens to call writes and fails if write failed
-   */
-  private static final class CallWriteListener implements 
ChannelFutureListener {
-    private final AsyncRpcChannel rpcChannel;
-    private final int id;
-
-    public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) {
-      this.rpcChannel = asyncRpcChannel;
-      this.id = id;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      if (!future.isSuccess()) {
-        AsyncCall call = rpcChannel.removePendingCall(id);
-        if (call != null) {
-          if (future.cause() instanceof IOException) {
-            call.setFailed((IOException) future.cause());
-          } else {
-            call.setFailed(new IOException(future.cause()));
-          }
-        }
-      }
-    }
-  }
+  InetSocketAddress getAddress();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
new file mode 100644
index 0000000..7cc9e78
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
@@ -0,0 +1,743 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.EventLoop;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.Promise;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import javax.security.sasl.SaslException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
+import 
org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
+import org.apache.hadoop.hbase.security.AuthMethod;
+import org.apache.hadoop.hbase.security.SaslClientHandler;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.security.SecurityInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
+
+/**
+ * Netty RPC channel
+ */
+@InterfaceAudience.Private
+public class AsyncRpcChannelImpl implements AsyncRpcChannel {
+  private static final Log LOG = 
LogFactory.getLog(AsyncRpcChannelImpl.class.getName());
+
+  private static final int MAX_SASL_RETRIES = 5;
+
+  protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> 
TOKEN_HANDDLERS
+    = new HashMap<>();
+
+  static {
+    
TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
+      new AuthenticationTokenSelector());
+  }
+
+  final AsyncRpcClient client;
+
+  // Contains the channel to work with.
+  // Only exists when connected
+  private Channel channel;
+
+  String name;
+  final User ticket;
+  final String serviceName;
+  final InetSocketAddress address;
+
+  private int failureCounter = 0;
+
+  boolean useSasl;
+  AuthMethod authMethod;
+  private int reloginMaxBackoff;
+  private Token<? extends TokenIdentifier> token;
+  private String serverPrincipal;
+
+  // NOTE: closed and connected flags below are only changed when a lock on 
pendingCalls
+  private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, 
AsyncCall>();
+  private boolean connected = false;
+  private boolean closed = false;
+
+  private Timeout cleanupTimer;
+
+  private final TimerTask timeoutTask = new TimerTask() {
+    @Override
+    public void run(Timeout timeout) throws Exception {
+      cleanupCalls();
+    }
+  };
+
+  /**
+   * Constructor for netty RPC channel
+   * @param bootstrap to construct channel on
+   * @param client to connect with
+   * @param ticket of user which uses connection
+   * @param serviceName name of service to connect to
+   * @param address to connect to
+   */
+  public AsyncRpcChannelImpl(Bootstrap bootstrap, final AsyncRpcClient client, 
User ticket,
+      String serviceName, InetSocketAddress address) {
+    this.client = client;
+
+    this.ticket = ticket;
+    this.serviceName = serviceName;
+    this.address = address;
+
+    this.channel = connect(bootstrap).channel();
+
+    name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString()
+        + ((ticket == null) ? " from unknown user" : (" from " + 
ticket.getName())));
+  }
+
+  /**
+   * Connect to channel
+   * @param bootstrap to connect to
+   * @return future of connection
+   */
+  private ChannelFuture connect(final Bootstrap bootstrap) {
+    return bootstrap.remoteAddress(address).connect()
+        .addListener(new GenericFutureListener<ChannelFuture>() {
+          @Override
+          public void operationComplete(final ChannelFuture f) throws 
Exception {
+            if (!f.isSuccess()) {
+              retryOrClose(bootstrap, failureCounter++, client.failureSleep, 
f.cause());
+              return;
+            }
+            channel = f.channel();
+
+            setupAuthorization();
+
+            ByteBuf b = channel.alloc().directBuffer(6);
+            createPreamble(b, authMethod);
+            
channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+            if (useSasl) {
+              UserGroupInformation ticket = 
AsyncRpcChannelImpl.this.ticket.getUGI();
+              if (authMethod == AuthMethod.KERBEROS) {
+                if (ticket != null && ticket.getRealUser() != null) {
+                  ticket = ticket.getRealUser();
+                }
+              }
+              SaslClientHandler saslHandler;
+              if (ticket == null) {
+                throw new FatalConnectionException("ticket/user is null");
+              }
+              final UserGroupInformation realTicket = ticket;
+              saslHandler = ticket.doAs(new 
PrivilegedExceptionAction<SaslClientHandler>() {
+                @Override
+                public SaslClientHandler run() throws IOException {
+                  return getSaslHandler(realTicket, bootstrap);
+                }
+              });
+              if (saslHandler != null) {
+                // Sasl connect is successful. Let's set up Sasl channel 
handler
+                channel.pipeline().addFirst(saslHandler);
+              } else {
+                // fall back to simple auth because server told us so.
+                authMethod = AuthMethod.SIMPLE;
+                useSasl = false;
+              }
+            } else {
+              startHBaseConnection(f.channel());
+            }
+          }
+        });
+  }
+
+  /**
+   * Start HBase connection
+   * @param ch channel to start connection on
+   */
+  private void startHBaseConnection(Channel ch) {
+    ch.pipeline().addLast("frameDecoder",
+      new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
+    ch.pipeline().addLast(new AsyncServerResponseHandler(this));
+    try {
+      writeChannelHeader(ch).addListener(new 
GenericFutureListener<ChannelFuture>() {
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+          if (!future.isSuccess()) {
+            close(future.cause());
+            return;
+          }
+          List<AsyncCall> callsToWrite;
+          synchronized (pendingCalls) {
+            connected = true;
+            callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
+          }
+          for (AsyncCall call : callsToWrite) {
+            writeRequest(call);
+          }
+        }
+      });
+    } catch (IOException e) {
+      close(e);
+    }
+  }
+
+  /**
+   * Get SASL handler
+   * @param bootstrap to reconnect to
+   * @return new SASL handler
+   * @throws java.io.IOException if handler failed to create
+   */
+  private SaslClientHandler getSaslHandler(final UserGroupInformation 
realTicket,
+      final Bootstrap bootstrap) throws IOException {
+    return new SaslClientHandler(realTicket, authMethod, token, 
serverPrincipal,
+        client.fallbackAllowed,
+        client.conf.get("hbase.rpc.protection",
+          SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()),
+        new SaslClientHandler.SaslExceptionHandler() {
+          @Override
+          public void handle(int retryCount, Random random, Throwable cause) {
+            try {
+              // Handle Sasl failure. Try to potentially get new credentials
+              handleSaslConnectionFailure(retryCount, cause, realTicket);
+
+              retryOrClose(bootstrap, failureCounter++, 
random.nextInt(reloginMaxBackoff) + 1,
+                cause);
+            } catch (IOException | InterruptedException e) {
+              close(e);
+            }
+          }
+        }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
+          @Override
+          public void onSuccess(Channel channel) {
+            startHBaseConnection(channel);
+          }
+        });
+  }
+
+  /**
+   * Retry to connect or close
+   * @param bootstrap to connect with
+   * @param failureCount failure count
+   * @param e exception of fail
+   */
+  private void retryOrClose(final Bootstrap bootstrap, int failureCount, long 
timeout,
+      Throwable e) {
+    if (failureCount < client.maxRetries) {
+      client.newTimeout(new TimerTask() {
+        @Override
+        public void run(Timeout timeout) throws Exception {
+          connect(bootstrap);
+        }
+      }, timeout, TimeUnit.MILLISECONDS);
+    } else {
+      client.failedServers.addToFailedServers(address);
+      close(e);
+    }
+  }
+
+  /**
+   * Calls method on channel
+   * @param method to call
+   * @param controller to run call with
+   * @param request to send
+   * @param responsePrototype to construct response with
+   */
+  public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
+      final PayloadCarryingRpcController controller, final Message request,
+      final Message responsePrototype, MetricsConnection.CallStats callStats) {
+    final AsyncCall call = new AsyncCall(channel.eventLoop(), 
client.callIdCnt.getAndIncrement(),
+        method, request, controller, responsePrototype, callStats);
+    controller.notifyOnCancel(new RpcCallback<Object>() {
+      @Override
+      public void run(Object parameter) {
+        // TODO: do not need to call AsyncCall.setFailed?
+        synchronized (pendingCalls) {
+          pendingCalls.remove(call.id);
+        }
+      }
+    });
+    // TODO: this should be handled by PayloadCarryingRpcController.
+    if (controller.isCanceled()) {
+      // To finish if the call was cancelled before we set the notification 
(race condition)
+      call.cancel(true);
+      return call;
+    }
+
+    synchronized (pendingCalls) {
+      if (closed) {
+        Promise<Message> promise = channel.eventLoop().newPromise();
+        promise.setFailure(new ConnectException());
+        return promise;
+      }
+      pendingCalls.put(call.id, call);
+      // Add timeout for cleanup if none is present
+      if (cleanupTimer == null && call.getRpcTimeout() > 0) {
+        cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
+      }
+      if (!connected) {
+        return call;
+      }
+    }
+    writeRequest(call);
+    return call;
+  }
+
+  @Override
+  public EventLoop getEventExecutor() {
+    return this.channel.eventLoop();
+  }
+
+  AsyncCall removePendingCall(int id) {
+    synchronized (pendingCalls) {
+      return pendingCalls.remove(id);
+    }
+  }
+
+  /**
+   * Write the channel header
+   * @param channel to write to
+   * @return future of write
+   * @throws java.io.IOException on failure to write
+   */
+  private ChannelFuture writeChannelHeader(Channel channel) throws IOException 
{
+    RPCProtos.ConnectionHeader.Builder headerBuilder = 
RPCProtos.ConnectionHeader.newBuilder()
+        .setServiceName(serviceName);
+
+    RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), 
authMethod);
+    if (userInfoPB != null) {
+      headerBuilder.setUserInfo(userInfoPB);
+    }
+
+    if (client.codec != null) {
+      
headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
+    }
+    if (client.compressor != null) {
+      
headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
+    }
+
+    headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
+    RPCProtos.ConnectionHeader header = headerBuilder.build();
+
+    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
+
+    ByteBuf b = channel.alloc().directBuffer(totalSize);
+
+    b.writeInt(header.getSerializedSize());
+    b.writeBytes(header.toByteArray());
+
+    return channel.writeAndFlush(b);
+  }
+
+  /**
+   * Write request to channel
+   * @param call to write
+   */
+  private void writeRequest(final AsyncCall call) {
+    try {
+      final RPCProtos.RequestHeader.Builder requestHeaderBuilder = 
RPCProtos.RequestHeader
+          .newBuilder();
+      
requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName())
+          .setRequestParam(call.param != null);
+
+      if (Trace.isTracing()) {
+        Span s = Trace.currentSpan();
+        requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder()
+            .setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
+      }
+
+      ByteBuffer cellBlock = 
client.buildCellBlock(call.controller.cellScanner());
+      if (cellBlock != null) {
+        final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = 
RPCProtos.CellBlockMeta
+            .newBuilder();
+        cellBlockBuilder.setLength(cellBlock.limit());
+        requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
+      }
+      // Only pass priority if there one. Let zero be same as no priority.
+      if (call.controller.getPriority() != 
PayloadCarryingRpcController.PRIORITY_UNSET) {
+        requestHeaderBuilder.setPriority(call.controller.getPriority());
+      }
+
+      RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
+
+      int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
+      if (cellBlock != null) {
+        totalSize += cellBlock.remaining();
+      }
+
+      ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
+      try (ByteBufOutputStream out = new ByteBufOutputStream(b)) {
+        call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, 
cellBlock));
+      }
+
+      channel.writeAndFlush(b).addListener(new CallWriteListener(this, 
call.id));
+    } catch (IOException e) {
+      close(e);
+    }
+  }
+
+  /**
+   * Set up server authorization
+   * @throws java.io.IOException if auth setup failed
+   */
+  private void setupAuthorization() throws IOException {
+    SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
+    this.useSasl = client.userProvider.isHBaseSecurityEnabled();
+
+    this.token = null;
+    if (useSasl && securityInfo != null) {
+      AuthenticationProtos.TokenIdentifier.Kind tokenKind = 
securityInfo.getTokenKind();
+      if (tokenKind != null) {
+        TokenSelector<? extends TokenIdentifier> tokenSelector = 
TOKEN_HANDDLERS.get(tokenKind);
+        if (tokenSelector != null) {
+          token = tokenSelector.selectToken(new Text(client.clusterId),
+            ticket.getUGI().getTokens());
+        } else if (LOG.isDebugEnabled()) {
+          LOG.debug("No token selector found for type " + tokenKind);
+        }
+      }
+      String serverKey = securityInfo.getServerPrincipal();
+      if (serverKey == null) {
+        throw new IOException("Can't obtain server Kerberos config key from 
SecurityInfo");
+      }
+      this.serverPrincipal = 
SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
+        address.getAddress().getCanonicalHostName().toLowerCase());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("RPC Server Kerberos principal name for service=" + 
serviceName + " is "
+            + serverPrincipal);
+      }
+    }
+
+    if (!useSasl) {
+      authMethod = AuthMethod.SIMPLE;
+    } else if (token != null) {
+      authMethod = AuthMethod.DIGEST;
+    } else {
+      authMethod = AuthMethod.KERBEROS;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+        "Use " + authMethod + " authentication for service " + serviceName + 
", sasl=" + useSasl);
+    }
+    reloginMaxBackoff = 
client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
+  }
+
+  /**
+   * Build the user information
+   * @param ugi User Group Information
+   * @param authMethod Authorization method
+   * @return UserInformation protobuf
+   */
+  private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, 
AuthMethod authMethod) {
+    if (ugi == null || authMethod == AuthMethod.DIGEST) {
+      // Don't send user for token auth
+      return null;
+    }
+    RPCProtos.UserInformation.Builder userInfoPB = 
RPCProtos.UserInformation.newBuilder();
+    if (authMethod == AuthMethod.KERBEROS) {
+      // Send effective user for Kerberos auth
+      userInfoPB.setEffectiveUser(ugi.getUserName());
+    } else if (authMethod == AuthMethod.SIMPLE) {
+      // Send both effective user and real user for simple auth
+      userInfoPB.setEffectiveUser(ugi.getUserName());
+      if (ugi.getRealUser() != null) {
+        userInfoPB.setRealUser(ugi.getRealUser().getUserName());
+      }
+    }
+    return userInfoPB.build();
+  }
+
+  /**
+   * Create connection preamble
+   * @param byteBuf to write to
+   * @param authMethod to write
+   */
+  private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
+    byteBuf.writeBytes(HConstants.RPC_HEADER);
+    byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
+    byteBuf.writeByte(authMethod.code);
+  }
+
+  private void close0(Throwable e) {
+    List<AsyncCall> toCleanup;
+    synchronized (pendingCalls) {
+      if (closed) {
+        return;
+      }
+      closed = true;
+      toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
+      pendingCalls.clear();
+    }
+    IOException closeException = null;
+    if (e != null) {
+      if (e instanceof IOException) {
+        closeException = (IOException) e;
+      } else {
+        closeException = new IOException(e);
+      }
+    }
+    // log the info
+    if (LOG.isDebugEnabled() && closeException != null) {
+      LOG.debug(name + ": closing ipc connection to " + address, 
closeException);
+    }
+    if (cleanupTimer != null) {
+      cleanupTimer.cancel();
+      cleanupTimer = null;
+    }
+    for (AsyncCall call : toCleanup) {
+      call.setFailed(closeException != null ? closeException
+          : new ConnectionClosingException(
+              "Call id=" + call.id + " on server " + address + " aborted: 
connection is closing"));
+    }
+    channel.disconnect().addListener(ChannelFutureListener.CLOSE);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(name + ": closed");
+    }
+  }
+
+  /**
+   * Close connection
+   * @param e exception on close
+   */
+  public void close(final Throwable e) {
+    client.removeConnection(this);
+
+    // Move closing from the requesting thread to the channel thread
+    if (channel.eventLoop().inEventLoop()) {
+      close0(e);
+    } else {
+      channel.eventLoop().execute(new Runnable() {
+        @Override
+        public void run() {
+          close0(e);
+        }
+      });
+    }
+  }
+
+  /**
+   * Clean up calls.
+   */
+  private void cleanupCalls() {
+    List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
+    long currentTime = EnvironmentEdgeManager.currentTime();
+    long nextCleanupTaskDelay = -1L;
+    synchronized (pendingCalls) {
+      for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); 
iter.hasNext();) {
+        AsyncCall call = iter.next();
+        long timeout = call.getRpcTimeout();
+        if (timeout > 0) {
+          if (currentTime - call.getStartTime() >= timeout) {
+            iter.remove();
+            toCleanup.add(call);
+          } else {
+            if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
+              nextCleanupTaskDelay = timeout;
+            }
+          }
+        }
+      }
+      if (nextCleanupTaskDelay > 0) {
+        cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, 
TimeUnit.MILLISECONDS);
+      } else {
+        cleanupTimer = null;
+      }
+    }
+    for (AsyncCall call : toCleanup) {
+      call.setFailed(new CallTimeoutException("Call id=" + call.id + ", 
waitTime="
+          + (currentTime - call.getStartTime()) + ", rpcTimeout=" + 
call.getRpcTimeout()));
+    }
+  }
+
+  /**
+   * Check if the connection is alive
+   * @return true if alive
+   */
+  public boolean isAlive() {
+    return channel.isOpen();
+  }
+
+  @Override
+  public InetSocketAddress getAddress() {
+    return this.address;
+  }
+
+  /**
+   * Check if user should authenticate over Kerberos
+   * @return true if should be authenticated over Kerberos
+   * @throws java.io.IOException on failure of check
+   */
+  private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
+    UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+    UserGroupInformation realUser = currentUser.getRealUser();
+    return authMethod == AuthMethod.KERBEROS && loginUser != null &&
+      // Make sure user logged in using Kerberos either keytab or TGT
+      loginUser.hasKerberosCredentials() &&
+      // relogin only in case it is the login user (e.g. JT)
+      // or superuser (like oozie).
+      (loginUser.equals(currentUser) || loginUser.equals(realUser));
+  }
+
+  /**
+   * If multiple clients with the same principal try to connect to the same 
server at the same time,
+   * the server assumes a replay attack is in progress. This is a feature of 
kerberos. In order to
+   * work around this, what is done is that the client backs off randomly and 
tries to initiate the
+   * connection again. The other problem is to do with ticket expiry. To 
handle that, a relogin is
+   * attempted.
+   * <p>
+   * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} 
method. In case when the
+   * user doesn't have valid credentials, we don't need to retry (from cache 
or ticket). In such
+   * cases, it is prudent to throw a runtime exception when we receive a 
SaslException from the
+   * underlying authentication implementation, so there is no retry from other 
high level (for eg,
+   * HCM or HBaseAdmin).
+   * </p>
+   * @param currRetries retry count
+   * @param ex exception describing fail
+   * @param user which is trying to connect
+   * @throws java.io.IOException if IO fail
+   * @throws InterruptedException if thread is interrupted
+   */
+  private void handleSaslConnectionFailure(final int currRetries, final 
Throwable ex,
+      final UserGroupInformation user) throws IOException, 
InterruptedException {
+    user.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws IOException, InterruptedException {
+        if (shouldAuthenticateOverKrb()) {
+          if (currRetries < MAX_SASL_RETRIES) {
+            LOG.debug("Exception encountered while connecting to the server : 
" + ex);
+            // try re-login
+            if (UserGroupInformation.isLoginKeytabBased()) {
+              UserGroupInformation.getLoginUser().reloginFromKeytab();
+            } else {
+              UserGroupInformation.getLoginUser().reloginFromTicketCache();
+            }
+
+            // Should reconnect
+            return null;
+          } else {
+            String msg = "Couldn't setup connection for "
+                + UserGroupInformation.getLoginUser().getUserName() + " to " + 
serverPrincipal;
+            LOG.warn(msg);
+            throw (IOException) new IOException(msg).initCause(ex);
+          }
+        } else {
+          LOG.warn("Exception encountered while connecting to " + "the server 
: " + ex);
+        }
+        if (ex instanceof RemoteException) {
+          throw (RemoteException) ex;
+        }
+        if (ex instanceof SaslException) {
+          String msg = "SASL authentication failed."
+              + " The most likely cause is missing or invalid credentials." + 
" Consider 'kinit'.";
+          LOG.fatal(msg, ex);
+          throw new RuntimeException(msg, ex);
+        }
+        throw new IOException(ex);
+      }
+    });
+  }
+
+  public int getConnectionHashCode() {
+    return ConnectionId.hashCode(ticket, serviceName, address);
+  }
+
+  @Override
+  public int hashCode() {
+    return getConnectionHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof AsyncRpcChannelImpl) {
+      AsyncRpcChannelImpl channel = (AsyncRpcChannelImpl) obj;
+      return channel.hashCode() == obj.hashCode();
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return this.address.toString() + "/" + this.serviceName + "/" + 
this.ticket;
+  }
+
+  /**
+   * Listens to call writes and fails if write failed
+   */
+  private static final class CallWriteListener implements 
ChannelFutureListener {
+    private final AsyncRpcChannelImpl rpcChannel;
+    private final int id;
+
+    public CallWriteListener(AsyncRpcChannelImpl asyncRpcChannelImpl, int id) {
+      this.rpcChannel = asyncRpcChannelImpl;
+      this.id = id;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      if (!future.isSuccess()) {
+        AsyncCall call = rpcChannel.removePendingCall(id);
+        if (call != null) {
+          if (future.cause() instanceof IOException) {
+            call.setFailed((IOException) future.cause());
+          } else {
+            call.setFailed(new IOException(future.cause()));
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
index c2bd457..8d9a5b3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
@@ -388,11 +388,11 @@ public class AsyncRpcClient extends AbstractRpcClient {
       }
       rpcChannel = connections.get(hashCode);
       if (rpcChannel != null && !rpcChannel.isAlive()) {
-        LOG.debug("Removing dead channel from 
server="+rpcChannel.address.toString());
+        LOG.debug("Removing dead channel from 
server="+rpcChannel.getAddress().toString());
         connections.remove(hashCode);
       }
       if (rpcChannel == null) {
-        rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, 
serviceName, location);
+        rpcChannel = new AsyncRpcChannelImpl(this.bootstrap, this, ticket, 
serviceName, location);
         connections.put(hashCode, rpcChannel);
       }
     }
@@ -415,8 +415,8 @@ public class AsyncRpcClient extends AbstractRpcClient {
     synchronized (connections) {
       for (AsyncRpcChannel rpcChannel : connections.values()) {
         if (rpcChannel.isAlive() &&
-            rpcChannel.address.getPort() == sn.getPort() &&
-            rpcChannel.address.getHostName().contentEquals(sn.getHostname())) {
+            rpcChannel.getAddress().getPort() == sn.getPort() &&
+            
rpcChannel.getAddress().getHostName().contentEquals(sn.getHostname())) {
           LOG.info("The server on " + sn.toString() +
               " is dead - stopping the connection " + rpcChannel.toString());
           rpcChannel.close(null);

http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
index e0c7586..a0928b1 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
@@ -37,13 +37,13 @@ import io.netty.channel.SimpleChannelInboundHandler;
  */
 @InterfaceAudience.Private
 public class AsyncServerResponseHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
-  private final AsyncRpcChannel channel;
+  private final AsyncRpcChannelImpl channel;
 
   /**
    * Constructor
    * @param channel on which this response handler operates
    */
-  public AsyncServerResponseHandler(AsyncRpcChannel channel) {
+  public AsyncServerResponseHandler(AsyncRpcChannelImpl channel) {
     this.channel = channel;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java
index b1d54a4..45c3700 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,68 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.ipc;
 
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-
 import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 
 /**
- * Base class which provides clients with an RPC connection to
+ * Base interface which provides clients with an RPC connection to
  * call coprocessor endpoint {@link com.google.protobuf.Service}s.
  * Note that clients should not use this class directly, except through
- * {@link 
org.apache.hadoop.hbase.client.HTableInterface#coprocessorService(byte[])}.
+ * {@link org.apache.hadoop.hbase.client.Table#coprocessorService(byte[])}.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public abstract class CoprocessorRpcChannel implements RpcChannel, 
BlockingRpcChannel {
-  private static final Log LOG = 
LogFactory.getLog(CoprocessorRpcChannel.class);
-
-  @Override
-  @InterfaceAudience.Private
-  public void callMethod(Descriptors.MethodDescriptor method,
-                         RpcController controller,
-                         Message request, Message responsePrototype,
-                         RpcCallback<Message> callback) {
-    Message response = null;
-    try {
-      response = callExecService(controller, method, request, 
responsePrototype);
-    } catch (IOException ioe) {
-      LOG.warn("Call failed on IOException", ioe);
-      ResponseConverter.setControllerException(controller, ioe);
-    }
-    if (callback != null) {
-      callback.run(response);
-    }
-  }
-
-  @Override
-  @InterfaceAudience.Private
-  public Message callBlockingMethod(Descriptors.MethodDescriptor method,
-                                    RpcController controller,
-                                    Message request, Message responsePrototype)
-      throws ServiceException {
-    try {
-      return callExecService(controller, method, request, responsePrototype);
-    } catch (IOException ioe) {
-      throw new ServiceException("Error calling method "+method.getFullName(), 
ioe);
-    }
-  }
-
-  protected abstract Message callExecService(RpcController controller,
-      Descriptors.MethodDescriptor method, Message request, Message 
responsePrototype)
-          throws IOException;
-}
+public interface CoprocessorRpcChannel extends RpcChannel, BlockingRpcChannel {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
index ac6a022..68798ed 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
@@ -42,7 +42,7 @@ import com.google.protobuf.RpcController;
  * @see org.apache.hadoop.hbase.client.HBaseAdmin#coprocessorService()
  */
 @InterfaceAudience.Private
-public class MasterCoprocessorRpcChannel extends CoprocessorRpcChannel{
+public class MasterCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
   private static final Log LOG = 
LogFactory.getLog(MasterCoprocessorRpcChannel.class);
 
   private final ClusterConnection connection;

http://git-wip-us.apache.org/repos/asf/hbase/blob/56358a0f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
index 0f06af1..4d3a453 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
@@ -45,7 +45,7 @@ import com.google.protobuf.RpcController;
  * @see org.apache.hadoop.hbase.client.Table#coprocessorService(byte[])
  */
 @InterfaceAudience.Private
-public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
+public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
   private static final Log LOG = 
LogFactory.getLog(RegionCoprocessorRpcChannel.class);
 
   private final ClusterConnection connection;
@@ -57,6 +57,12 @@ public class RegionCoprocessorRpcChannel extends 
CoprocessorRpcChannel{
   private RpcRetryingCallerFactory rpcCallerFactory;
   private RpcControllerFactory rpcControllerFactory;
 
+  /**
+   * Constructor
+   * @param conn connection to use
+   * @param table to connect to
+   * @param row to locate region with
+   */
   public RegionCoprocessorRpcChannel(ClusterConnection conn, TableName table, 
byte[] row) {
     this.connection = conn;
     this.table = table;
@@ -114,6 +120,10 @@ public class RegionCoprocessorRpcChannel extends 
CoprocessorRpcChannel{
     return response;
   }
 
+  /**
+   * Get last region this RpcChannel communicated with
+   * @return region name as byte array
+   */
   public byte[] getLastRegion() {
     return lastRegion;
   }

Reply via email to