Repository: hbase
Updated Branches:
  refs/heads/master 4127fd2a7 -> 3fe750829


HBASE-15921 Add first AsyncTable impl and create TableImpl based on it


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3fe75082
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3fe75082
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3fe75082

Branch: refs/heads/master
Commit: 3fe7508295255c44dcdadfde460e4be5f829cb63
Parents: 4127fd2
Author: zhangduo <zhang...@apache.org>
Authored: Fri Oct 14 18:38:02 2016 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Fri Oct 14 22:52:52 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncConnection.java    |  62 +++++
 .../client/AsyncConnectionConfiguration.java    | 106 +++++++++
 .../hbase/client/AsyncConnectionImpl.java       | 144 ++++++++++++
 .../hadoop/hbase/client/AsyncRegionLocator.java |  64 ++++++
 .../client/AsyncRpcRetryingCallerFactory.java   | 106 +++++++++
 .../AsyncSingleRequestRpcRetryingCaller.java    | 229 +++++++++++++++++++
 .../apache/hadoop/hbase/client/AsyncTable.java  | 126 ++++++++++
 .../hadoop/hbase/client/AsyncTableImpl.java     | 192 ++++++++++++++++
 .../hbase/client/AsyncTableRegionLocator.java   |  60 +++++
 .../client/AsyncTableRegionLocatorImpl.java     |  50 ++++
 .../hadoop/hbase/client/ClusterRegistry.java    |  41 ++++
 .../hbase/client/ClusterRegistryFactory.java    |  43 ++++
 .../hbase/client/ConnectionImplementation.java  |  53 ++---
 .../hadoop/hbase/client/ConnectionUtils.java    |  35 +++
 .../hbase/client/RpcRetryingCallerImpl.java     |  11 +-
 .../hadoop/hbase/client/ZKClusterRegistry.java  |  78 +++++++
 .../hadoop/hbase/util/CollectionUtils.java      |  24 ++
 .../hadoop/hbase/util/ReflectionUtils.java      |   4 +-
 ...TestAsyncSingleRequestRpcRetryingCaller.java | 196 ++++++++++++++++
 .../hadoop/hbase/client/TestAsyncTable.java     | 132 +++++++++++
 20 files changed, 1712 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
new file mode 100644
index 0000000..6dc0300
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * The asynchronous version of Connection.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface AsyncConnection extends Closeable {
+
+  /**
+   * Returns the {@link org.apache.hadoop.conf.Configuration} object used by 
this instance.
+   * <p>
+   * The reference returned is not a copy, so any change made to it will 
affect this instance.
+   */
+  Configuration getConfiguration();
+
+  /**
+   * Retrieve a AsyncRegionLocator implementation to inspect region 
information on a table. The
+   * returned AsyncRegionLocator is not thread-safe, so a new instance should 
be created for each
+   * using thread. This is a lightweight operation. Pooling or caching of the 
returned
+   * AsyncRegionLocator is neither required nor desired.
+   * @param tableName Name of the table who's region is to be examined
+   * @return An AsyncRegionLocator instance
+   */
+  AsyncTableRegionLocator getRegionLocator(TableName tableName);
+
+  /**
+   * Retrieve an AsyncTable implementation for accessing a table. The returned 
Table is not thread
+   * safe, a new instance should be created for each using thread. This is a 
lightweight operation,
+   * pooling or caching of the returned AsyncTable is neither required nor 
desired.
+   * <p>
+   * This method no longer checks table existence. An exception will be thrown 
if the table does not
+   * exist only when the first operation is attempted.
+   * @param tableName the name of the table
+   * @return an AsyncTable to use for interactions with this table
+   */
+  AsyncTable getTable(TableName tableName);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
new file mode 100644
index 0000000..ba2e660
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static 
org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
+import static 
org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+import static 
org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
+import static 
org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
+import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
+import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
+import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
+import static 
org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
+import static 
org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Timeout configs.
+ */
+@InterfaceAudience.Private
+class AsyncConnectionConfiguration {
+
+  private final long metaOperationTimeoutNs;
+
+  private final long operationTimeoutNs;
+
+  // timeout for each read rpc request
+  private final long readRpcTimeoutNs;
+
+  // timeout for each write rpc request
+  private final long writeRpcTimeoutNs;
+
+  private final long pauseNs;
+
+  private final int maxRetries;
+
+  /** How many retries are allowed before we start to log */
+  private final int startLogErrorsCnt;
+
+  AsyncConnectionConfiguration(Configuration conf) {
+    this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
+      conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 
DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
+    this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
+      conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, 
DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
+    this.readRpcTimeoutNs = 
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY,
+      conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
+    this.writeRpcTimeoutNs = 
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY,
+      conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
+    this.pauseNs = TimeUnit.MILLISECONDS
+        .toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
+    this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, 
DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+    this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY,
+      DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
+  }
+
+  long getMetaOperationTimeoutNs() {
+    return metaOperationTimeoutNs;
+  }
+
+  long getOperationTimeoutNs() {
+    return operationTimeoutNs;
+  }
+
+  long getReadRpcTimeoutNs() {
+    return readRpcTimeoutNs;
+  }
+
+  long getWriteRpcTimeoutNs() {
+    return writeRpcTimeoutNs;
+  }
+
+  long getPauseNs() {
+    return pauseNs;
+  }
+
+  int getMaxRetries() {
+    return maxRetries;
+  }
+
+  int getStartLogErrorsCnt() {
+    return startLogErrorsCnt;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
new file mode 100644
index 0000000..c50e244
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.CLUSTER_ID_DEFAULT;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
+
+import io.netty.util.HashedWheelTimer;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.security.User;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.util.CollectionUtils;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ * The implementation of AsyncConnection.
+ */
+@InterfaceAudience.Private
+class AsyncConnectionImpl implements AsyncConnection {
+
+  private static final Log LOG = LogFactory.getLog(AsyncConnectionImpl.class);
+
+  private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
+      Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, 
TimeUnit.MILLISECONDS);
+
+  private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = 
"hbase.resolve.hostnames.on.failure";
+
+  private final Configuration conf;
+
+  final AsyncConnectionConfiguration connConf;
+
+  private final User user;
+
+  private final ClusterRegistry registry;
+
+  private final String clusterId;
+
+  private final int rpcTimeout;
+
+  private final RpcClient rpcClient;
+
+  final RpcControllerFactory rpcControllerFactory;
+
+  private final boolean hostnameCanChange;
+
+  private final AsyncRegionLocator locator;
+
+  final AsyncRpcRetryingCallerFactory callerFactory;
+
+  private final ConcurrentMap<String, ClientService.Interface> rsStubs = new 
ConcurrentHashMap<>();
+
+  public AsyncConnectionImpl(Configuration conf, User user) throws IOException 
{
+    this.conf = conf;
+    this.user = user;
+
+    this.connConf = new AsyncConnectionConfiguration(conf);
+
+    this.locator = new AsyncRegionLocator(conf);
+
+    // action below will not throw exception so no need to catch and close.
+    this.registry = ClusterRegistryFactory.getRegistry(conf);
+    this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() 
-> {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("cluster id came back null, using default " + 
CLUSTER_ID_DEFAULT);
+      }
+      return CLUSTER_ID_DEFAULT;
+    });
+    this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
+    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
+    this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, 
true);
+    this.rpcTimeout = conf.getInt(HBASE_RPC_TIMEOUT_KEY, 
DEFAULT_HBASE_RPC_TIMEOUT);
+    this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public void close() {
+    IOUtils.closeQuietly(locator);
+    IOUtils.closeQuietly(rpcClient);
+    IOUtils.closeQuietly(registry);
+  }
+
+  @Override
+  public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
+    return new AsyncTableRegionLocatorImpl(tableName, locator);
+  }
+
+  // we will override this method for testing retry caller, so do not remove 
this method.
+  AsyncRegionLocator getLocator() {
+    return locator;
+  }
+
+  private ClientService.Interface createRegionServerStub(ServerName 
serverName) throws IOException {
+    return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, 
rpcTimeout));
+  }
+
+  ClientService.Interface getRegionServerStub(ServerName serverName) throws 
IOException {
+    return CollectionUtils.computeIfAbsentEx(rsStubs,
+      getStubKey(ClientService.Interface.class.getSimpleName(), serverName, 
hostnameCanChange),
+      () -> createRegionServerStub(serverName));
+  }
+
+  @Override
+  public AsyncTable getTable(TableName tableName) {
+    return new AsyncTableImpl(this, tableName);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
new file mode 100644
index 0000000..dc75ba6
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * TODO: reimplement using aync connection when the scan logic is ready. The 
current implementation
+ * is based on the blocking client.
+ */
+@InterfaceAudience.Private
+class AsyncRegionLocator implements Closeable {
+
+  private final ConnectionImplementation conn;
+
+  AsyncRegionLocator(Configuration conf) throws IOException {
+    conn = (ConnectionImplementation) ConnectionFactory.createConnection(conf);
+  }
+
+  CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, 
byte[] row,
+      boolean reload) {
+    CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
+    try {
+      future.complete(conn.getRegionLocation(tableName, row, reload));
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  void updateCachedLocations(TableName tableName, byte[] regionName, byte[] 
row, Object exception,
+      ServerName source) {
+    conn.updateCachedLocations(tableName, regionName, row, exception, source);
+  }
+
+  @Override
+  public void close() {
+    IOUtils.closeQuietly(conn);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
new file mode 100644
index 0000000..c5ac9a5
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.util.HashedWheelTimer;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Factory to create an AsyncRpcRetryCaller.
+ */
+@InterfaceAudience.Private
+class AsyncRpcRetryingCallerFactory {
+
+  private final AsyncConnectionImpl conn;
+
+  private final HashedWheelTimer retryTimer;
+
+  public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, 
HashedWheelTimer retryTimer) {
+    this.conn = conn;
+    this.retryTimer = retryTimer;
+  }
+
+  public class SingleRequestCallerBuilder<T> {
+
+    private TableName tableName;
+
+    private byte[] row;
+
+    private AsyncSingleRequestRpcRetryingCaller.Callable<T> callable;
+
+    private long operationTimeoutNs = -1L;
+
+    private long rpcTimeoutNs = -1L;
+
+    public SingleRequestCallerBuilder<T> table(TableName tableName) {
+      this.tableName = tableName;
+      return this;
+    }
+
+    public SingleRequestCallerBuilder<T> row(byte[] row) {
+      this.row = row;
+      return this;
+    }
+
+    public SingleRequestCallerBuilder<T> action(
+        AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
+      this.callable = callable;
+      return this;
+    }
+
+    public SingleRequestCallerBuilder<T> operationTimeout(long 
operationTimeout, TimeUnit unit) {
+      this.operationTimeoutNs = unit.toNanos(operationTimeout);
+      return this;
+    }
+
+    public SingleRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit 
unit) {
+      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
+      return this;
+    }
+
+    public AsyncSingleRequestRpcRetryingCaller<T> build() {
+      return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
+          Preconditions.checkNotNull(tableName, "tableName is null"),
+          Preconditions.checkNotNull(row, "row is null"),
+          Preconditions.checkNotNull(callable, "action is null"), 
conn.connConf.getPauseNs(),
+          conn.connConf.getMaxRetries(), operationTimeoutNs, rpcTimeoutNs,
+          conn.connConf.getStartLogErrorsCnt());
+    }
+
+    /**
+     * Shortcut for {@code build().call()}
+     */
+    public CompletableFuture<T> call() {
+      return build().call();
+    }
+  }
+
+  /**
+   * Create retry caller for single action, such as get, put, delete, etc.
+   */
+  public <T> SingleRequestCallerBuilder<T> single() {
+    return new SingleRequestCallerBuilder<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
new file mode 100644
index 0000000..8acde94
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
+
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * Retry caller for a single request, such as get, put, delete, etc.
+ */
+@InterfaceAudience.Private
+class AsyncSingleRequestRpcRetryingCaller<T> {
+
+  private static final Log LOG = 
LogFactory.getLog(AsyncSingleRequestRpcRetryingCaller.class);
+
+  @FunctionalInterface
+  public interface Callable<T> {
+    CompletableFuture<T> call(HBaseRpcController controller, HRegionLocation 
loc,
+        ClientService.Interface stub);
+  }
+
+  private final HashedWheelTimer retryTimer;
+
+  private final AsyncConnectionImpl conn;
+
+  private final TableName tableName;
+
+  private final byte[] row;
+
+  private final Callable<T> callable;
+
+  private final long pauseNs;
+
+  private final int maxAttempts;
+
+  private final long operationTimeoutNs;
+
+  private final long rpcTimeoutNs;
+
+  private final int startLogErrorsCnt;
+
+  private final CompletableFuture<T> future;
+
+  private final HBaseRpcController controller;
+
+  private final List<RetriesExhaustedException.ThrowableWithExtraContext> 
exceptions;
+
+  private final long startNs;
+
+  public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, 
AsyncConnectionImpl conn,
+      TableName tableName, byte[] row, Callable<T> callable, long pauseNs, int 
maxRetries,
+      long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+    this.retryTimer = retryTimer;
+    this.conn = conn;
+    this.tableName = tableName;
+    this.row = row;
+    this.callable = callable;
+    this.pauseNs = pauseNs;
+    this.maxAttempts = retries2Attempts(maxRetries);
+    this.operationTimeoutNs = operationTimeoutNs;
+    this.rpcTimeoutNs = rpcTimeoutNs;
+    this.startLogErrorsCnt = startLogErrorsCnt;
+    this.future = new CompletableFuture<>();
+    this.controller = conn.rpcControllerFactory.newController();
+    this.exceptions = new ArrayList<>();
+    this.startNs = System.nanoTime();
+  }
+
+  private int tries = 1;
+
+  private long elapsedMs() {
+    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
+  }
+
+  private static Throwable translateException(Throwable t) {
+    if (t instanceof UndeclaredThrowableException && t.getCause() != null) {
+      t = t.getCause();
+    }
+    if (t instanceof RemoteException) {
+      t = ((RemoteException) t).unwrapRemoteException();
+    }
+    if (t instanceof ServiceException && t.getCause() != null) {
+      t = translateException(t.getCause());
+    }
+    return t;
+  }
+
+  private void completeExceptionally() {
+    future.completeExceptionally(new RetriesExhaustedException(tries, 
exceptions));
+  }
+
+  private void onError(Throwable error, Supplier<String> errMsg,
+      Consumer<Throwable> updateCachedLocation) {
+    error = translateException(error);
+    if (tries > startLogErrorsCnt) {
+      LOG.warn(errMsg.get(), error);
+    }
+    RetriesExhaustedException.ThrowableWithExtraContext qt = new 
RetriesExhaustedException.ThrowableWithExtraContext(
+        error, EnvironmentEdgeManager.currentTime(), "");
+    exceptions.add(qt);
+    if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
+      completeExceptionally();
+      return;
+    }
+    long delayNs;
+    if (operationTimeoutNs > 0) {
+      long maxDelayNs = operationTimeoutNs - (System.nanoTime() - startNs);
+      if (maxDelayNs <= 0) {
+        completeExceptionally();
+        return;
+      }
+      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
+    } else {
+      delayNs = getPauseTime(pauseNs, tries - 1);
+    }
+    updateCachedLocation.accept(error);
+    tries++;
+    retryTimer.newTimeout(new TimerTask() {
+
+      @Override
+      public void run(Timeout timeout) throws Exception {
+        // always restart from beginning.
+        locateThenCall();
+      }
+    }, delayNs, TimeUnit.NANOSECONDS);
+  }
+
+  private void resetController() {
+    controller.reset();
+    if (rpcTimeoutNs >= 0) {
+      controller.setCallTimeout(
+        (int) Math.min(Integer.MAX_VALUE, 
TimeUnit.NANOSECONDS.toMillis(rpcTimeoutNs)));
+    }
+  }
+
+  private void call(HRegionLocation loc) {
+    ClientService.Interface stub;
+    try {
+      stub = conn.getRegionServerStub(loc.getServerName());
+    } catch (IOException e) {
+      onError(e,
+        () -> "Get async stub to " + loc.getServerName() + " for '" + 
Bytes.toStringBinary(row)
+            + "' in " + loc.getRegionInfo().getEncodedName() + " of " + 
tableName
+            + " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + 
", timeout = "
+            + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time 
elapsed = "
+            + elapsedMs() + " ms",
+        err -> conn.getLocator().updateCachedLocations(tableName,
+          loc.getRegionInfo().getRegionName(), row, err, loc.getServerName()));
+      return;
+    }
+    resetController();
+    callable.call(controller, loc, stub).whenComplete((result, error) -> {
+      if (error != null) {
+        onError(error,
+          () -> "Call to " + loc.getServerName() + " for '" + 
Bytes.toStringBinary(row) + "' in "
+              + loc.getRegionInfo().getEncodedName() + " of " + tableName + " 
failed, tries = "
+              + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
+              + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time 
elapsed = "
+              + elapsedMs() + " ms",
+          err -> conn.getLocator().updateCachedLocations(tableName,
+            loc.getRegionInfo().getRegionName(), row, err, 
loc.getServerName()));
+        return;
+      }
+      future.complete(result);
+    });
+  }
+
+  private void locateThenCall() {
+    conn.getLocator().getRegionLocation(tableName, row, tries > 
1).whenComplete((loc, error) -> {
+      if (error != null) {
+        onError(error,
+          () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + 
" failed, tries = "
+              + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
+              + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time 
elapsed = "
+              + elapsedMs() + " ms",
+          err -> {
+          });
+        return;
+      }
+      call(loc);
+    });
+  }
+
+  public CompletableFuture<T> call() {
+    locateThenCall();
+    return future;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
new file mode 100644
index 0000000..c4e7cec
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * The asynchronous version of Table. Obtain an instance from a {@link 
AsyncConnection}.
+ * <p>
+ * The implementation is NOT required to be thread safe. Do NOT access it from 
multiple threads
+ * concurrently.
+ * <p>
+ * Usually the implementations will not throw any exception directly, you need 
to get the exception
+ * from the returned {@link CompletableFuture}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface AsyncTable {
+
+  /**
+   * Gets the fully qualified table name instance of this table.
+   */
+  TableName getName();
+
+  /**
+   * Returns the {@link org.apache.hadoop.conf.Configuration} object used by 
this instance.
+   * <p>
+   * The reference returned is not a copy, so any change made to it will 
affect this instance.
+   */
+  Configuration getConfiguration();
+
+  /**
+   * Set timeout of each rpc read request in operations of this Table 
instance, will override the
+   * value of {@code hbase.rpc.read.timeout} in configuration. If a rpc read 
request waiting too
+   * long, it will stop waiting and send a new request to retry until retries 
exhausted or operation
+   * timeout reached.
+   */
+  void setReadRpcTimeout(long timeout, TimeUnit unit);
+
+  /**
+   * Get timeout of each rpc read request in this Table instance.
+   */
+  long getReadRpcTimeout(TimeUnit unit);
+
+  /**
+   * Set timeout of each rpc write request in operations of this Table 
instance, will override the
+   * value of {@code hbase.rpc.write.timeout} in configuration. If a rpc write 
request waiting too
+   * long, it will stop waiting and send a new request to retry until retries 
exhausted or operation
+   * timeout reached.
+   */
+  void setWriteRpcTimeout(long timeout, TimeUnit unit);
+
+  /**
+   * Get timeout of each rpc write request in this Table instance.
+   */
+  long getWriteRpcTimeout(TimeUnit unit);
+
+  /**
+   * Set timeout of each operation in this Table instance, will override the 
value of
+   * {@code hbase.client.operation.timeout} in configuration.
+   * <p>
+   * Operation timeout is a top-level restriction that makes sure an operation 
will not be blocked
+   * more than this. In each operation, if rpc request fails because of 
timeout or other reason, it
+   * will retry until success or throw a RetriesExhaustedException. But if the 
total time elapsed
+   * reach the operation timeout before retries exhausted, it will break early 
and throw
+   * SocketTimeoutException.
+   */
+  void setOperationTimeout(long timeout, TimeUnit unit);
+
+  /**
+   * Get timeout of each operation in Table instance.
+   */
+  long getOperationTimeout(TimeUnit unit);
+
+  /**
+   * Test for the existence of columns in the table, as specified by the Get.
+   * <p>
+   * This will return true if the Get matches one or more keys, false if not.
+   * <p>
+   * This is a server-side call so it prevents any data from being transfered 
to the client.
+   */
+  CompletableFuture<Boolean> exists(Get get);
+
+  /**
+   * Extracts certain cells from a given row.
+   * <p>
+   * Return the data coming from the specified row, if it exists. If the row 
specified doesn't
+   * exist, the {@link Result} instance returned won't contain any
+   * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link 
Result#isEmpty()}.
+   * @param get The object that specifies what data to fetch and from which 
row.
+   */
+  CompletableFuture<Result> get(Get get);
+
+  /**
+   * Puts some data to the table.
+   * @param put The data to put.
+   */
+  CompletableFuture<Void> put(Put put);
+
+  /**
+   * Deletes the specified cells/row.
+   * @param delete The object that specifies what to delete.
+   */
+  CompletableFuture<Void> delete(Delete delete);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
new file mode 100644
index 0000000..cbb4988
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import 
org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * The implementation of AsyncTable.
+ */
+@InterfaceAudience.Private
+class AsyncTableImpl implements AsyncTable {
+
+  private final AsyncConnectionImpl conn;
+
+  private final TableName tableName;
+
+  private long readRpcTimeoutNs;
+
+  private long writeRpcTimeoutNs;
+
+  private long operationTimeoutNs;
+
+  public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) {
+    this.conn = conn;
+    this.tableName = tableName;
+    this.readRpcTimeoutNs = conn.connConf.getReadRpcTimeoutNs();
+    this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs();
+    this.operationTimeoutNs = tableName.isSystemTable() ? 
conn.connConf.getMetaOperationTimeoutNs()
+        : conn.connConf.getOperationTimeoutNs();
+  }
+
+  @Override
+  public TableName getName() {
+    return tableName;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conn.getConfiguration();
+  }
+
+  @FunctionalInterface
+  private interface Converter<D, I, S> {
+    D convert(I info, S src) throws IOException;
+  }
+
+  @FunctionalInterface
+  private interface RpcCall<RESP, REQ> {
+    void call(ClientService.Interface stub, HBaseRpcController controller, REQ 
req,
+        RpcCallback<RESP> done);
+  }
+
+  private static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call(
+      HBaseRpcController controller, HRegionLocation loc, 
ClientService.Interface stub, REQ req,
+      Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall,
+      Converter<RESP, HBaseRpcController, PRESP> respConverter) {
+    CompletableFuture<RESP> future = new CompletableFuture<>();
+    try {
+      rpcCall.call(stub, controller, 
reqConvert.convert(loc.getRegionInfo().getRegionName(), req),
+        new RpcCallback<PRESP>() {
+
+          @Override
+          public void run(PRESP resp) {
+            if (controller.failed()) {
+              future.completeExceptionally(controller.getFailed());
+            } else {
+              try {
+                future.complete(respConverter.convert(controller, resp));
+              } catch (IOException e) {
+                future.completeExceptionally(e);
+              }
+            }
+          }
+        });
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long 
rpcTimeoutNs) {
+    return conn.callerFactory.<T> single().table(tableName).row(row.getRow())
+        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS);
+  }
+
+  @Override
+  public CompletableFuture<Boolean> exists(Get get) {
+    if (!get.isCheckExistenceOnly()) {
+      get = ReflectionUtils.newInstance(get.getClass(), get);
+      get.setCheckExistenceOnly(true);
+    }
+    return get(get).thenApply(r -> r.getExists());
+  }
+
+  @Override
+  public CompletableFuture<Result> get(Get get) {
+    return this.<Result> newCaller(get, readRpcTimeoutNs)
+        .action((controller, loc, stub) -> AsyncTableImpl
+            .<Get, GetRequest, GetResponse, Result> call(controller, loc, 
stub, get,
+              RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, 
req, done),
+              (c, resp) -> ProtobufUtil.toResult(resp.getResult(), 
c.cellScanner())))
+        .call();
+  }
+
+  @Override
+  public CompletableFuture<Void> put(Put put) {
+    return this.<Void> newCaller(put, writeRpcTimeoutNs)
+        .action(
+          (controller, loc, stub) -> AsyncTableImpl.<Put, MutateRequest, 
MutateResponse, Void> call(
+            controller, loc, stub, put, RequestConverter::buildMutateRequest,
+            (s, c, req, done) -> s.mutate(c, req, done), (c, resp) -> {
+              return null;
+            }))
+        .call();
+  }
+
+  @Override
+  public CompletableFuture<Void> delete(Delete delete) {
+    return this.<Void> newCaller(delete, writeRpcTimeoutNs)
+        .action((controller, loc, stub) -> AsyncTableImpl
+            .<Delete, MutateRequest, MutateResponse, Void> call(controller, 
loc, stub, delete,
+              RequestConverter::buildMutateRequest, (s, c, req, done) -> 
s.mutate(c, req, done),
+              (c, resp) -> {
+                return null;
+              }))
+        .call();
+  }
+
+  @Override
+  public void setReadRpcTimeout(long timeout, TimeUnit unit) {
+    this.readRpcTimeoutNs = unit.toNanos(timeout);
+  }
+
+  @Override
+  public long getReadRpcTimeout(TimeUnit unit) {
+    return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
+  }
+
+  @Override
+  public void setWriteRpcTimeout(long timeout, TimeUnit unit) {
+    this.writeRpcTimeoutNs = unit.toNanos(timeout);
+  }
+
+  @Override
+  public long getWriteRpcTimeout(TimeUnit unit) {
+    return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
+  }
+
+  @Override
+  public void setOperationTimeout(long timeout, TimeUnit unit) {
+    this.operationTimeoutNs = unit.toNanos(timeout);
+  }
+
+  @Override
+  public long getOperationTimeout(TimeUnit unit) {
+    return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
new file mode 100644
index 0000000..989e8d9
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * The asynchronous version of RegionLocator.
+ * <p>
+ * Usually the implementations will not throw any exception directly, you need 
to get the exception
+ * from the returned {@link CompletableFuture}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface AsyncTableRegionLocator {
+
+  /**
+   * Gets the fully qualified table name instance of the table whose region we 
want to locate.
+   */
+  TableName getName();
+
+  /**
+   * Finds the region on which the given row is being served. Does not reload 
the cache.
+   * <p>
+   * Returns the location of the region to which the row belongs.
+   * @param row Row to find.
+   */
+  default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row) {
+    return getRegionLocation(row, false);
+  }
+
+  /**
+   * Finds the region on which the given row is being served.
+   * <p>
+   * Returns the location of the region to which the row belongs.
+   * @param row Row to find.
+   * @param reload true to reload information or false to use cached 
information
+   */
+  CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean 
reload);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
new file mode 100644
index 0000000..d715e24
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * The implementation of AsyncRegionLocator.
+ */
+@InterfaceAudience.Private
+class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
+
+  private final TableName tableName;
+
+  private final AsyncRegionLocator locator;
+
+  public AsyncTableRegionLocatorImpl(TableName tableName, AsyncRegionLocator 
locator) {
+    this.tableName = tableName;
+    this.locator = locator;
+  }
+
+  @Override
+  public TableName getName() {
+    return tableName;
+  }
+
+  @Override
+  public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, 
boolean reload) {
+    return locator.getRegionLocation(tableName, row, reload);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistry.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistry.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistry.java
new file mode 100644
index 0000000..c1918a7
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistry.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Implementations hold cluster information such as this cluster's id.
+ * <p>
+ * Internal use only.
+ */
+@InterfaceAudience.Private
+interface ClusterRegistry extends Closeable {
+
+  /**
+   * Should only be called once.
+   * <p>
+   * The upper layer should store this value somewhere as it will not be 
change any more.
+   */
+  String getClusterId();
+
+  @Override
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java
new file mode 100644
index 0000000..a6b3e39
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * Get instance of configured Registry.
+ */
+@InterfaceAudience.Private
+final class ClusterRegistryFactory {
+
+  static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";
+
+  private ClusterRegistryFactory() {
+  }
+
+  /**
+   * @return The cluster registry implementation to use.
+   */
+  static ClusterRegistry getRegistry(Configuration conf) {
+    Class<? extends ClusterRegistry> clazz =
+        conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKClusterRegistry.class, 
ClusterRegistry.class);
+    return ReflectionUtils.newInstance(clazz, conf);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 9cf63dc..ae8c57e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -18,14 +18,16 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
 import static 
org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.reflect.UndeclaredThrowableException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
@@ -38,8 +40,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import edu.umd.cs.findbugs.annotations.Nullable;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -63,6 +63,11 @@ import 
org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
@@ -79,8 +84,6 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCa
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
-import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
@@ -92,11 +95,7 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import edu.umd.cs.findbugs.annotations.Nullable;
 
 /**
  * Main implementation of {@link Connection} and {@link ClusterConnection} 
interfaces.
@@ -196,7 +195,7 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
     this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
       HConstants.DEFAULT_USE_META_REPLICAS);
     // how many times to try, one more than max *retry* time
-    this.numTries = connectionConfig.getRetriesNumber() + 1;
+    this.numTries = retries2Attempts(connectionConfig.getRetriesNumber());
     this.rpcTimeout = conf.getInt(
         HConstants.HBASE_RPC_TIMEOUT_KEY,
         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
@@ -1094,8 +1093,7 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
           throw new MasterNotRunningException(sn + " is dead.");
         }
         // Use the security info interface name as our stub key
-        String key = getStubKey(getServiceName(),
-            sn.getHostname(), sn.getPort(), hostnamesCanChange);
+        String key = getStubKey(getServiceName(), sn, hostnamesCanChange);
         connectionLock.putIfAbsent(key, key);
         Object stub = null;
         synchronized (connectionLock.get(key)) {
@@ -1176,8 +1174,8 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
     if (isDeadServer(serverName)) {
       throw new RegionServerStoppedException(serverName + " is dead.");
     }
-    String key = 
getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(),
-      serverName.getHostname(), serverName.getPort(), this.hostnamesCanChange);
+    String key = 
getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), 
serverName,
+      this.hostnamesCanChange);
     this.connectionLock.putIfAbsent(key, key);
     AdminProtos.AdminService.BlockingInterface stub;
     synchronized (this.connectionLock.get(key)) {
@@ -1198,8 +1196,8 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
     if (isDeadServer(sn)) {
       throw new RegionServerStoppedException(sn + " is dead.");
     }
-    String key = 
getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), 
sn.getHostname(),
-      sn.getPort(), this.hostnamesCanChange);
+    String key = 
getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), sn,
+      this.hostnamesCanChange);
     this.connectionLock.putIfAbsent(key, key);
     ClientProtos.ClientService.BlockingInterface stub = null;
     synchronized (this.connectionLock.get(key)) {
@@ -1215,25 +1213,6 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
     return stub;
   }
 
-  static String getStubKey(final String serviceName,
-                           final String rsHostname,
-                           int port,
-                           boolean resolveHostnames) {
-    // Sometimes, servers go down and they come back up with the same hostname 
but a different
-    // IP address. Force a resolution of the rsHostname by trying to 
instantiate an
-    // InetSocketAddress, and this way we will rightfully get a new stubKey.
-    // Also, include the hostname in the key so as to take care of those cases 
where the
-    // DNS name is different but IP address remains the same.
-    String address = rsHostname;
-    if (resolveHostnames) {
-      InetAddress i = new InetSocketAddress(rsHostname, port).getAddress();
-      if (i != null) {
-        address = i.getHostAddress() + "-" + rsHostname;
-      }
-    }
-    return serviceName + "@" + address + ":" + port;
-  }
-
   private ZooKeeperKeepAliveConnection keepAliveZookeeper;
   private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 83655f0..2f5d2b1 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -20,10 +20,13 @@ package org.apache.hadoop.hbase.client;
 import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
@@ -40,6 +43,8 @@ import org.apache.hadoop.hbase.security.UserProvider;
 @InterfaceAudience.Private
 public final class ConnectionUtils {
 
+  private static final Log LOG = LogFactory.getLog(ConnectionUtils.class);
+
   private ConnectionUtils() {}
 
   /**
@@ -167,4 +172,34 @@ public final class ConnectionUtils {
       return false;
     }
   }
+
+  /**
+   * Return retires + 1. The returned value will be in range [1, 
Integer.MAX_VALUE].
+   */
+  static int retries2Attempts(int retries) {
+    return Math.max(1, retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : 
retries + 1);
+  }
+
+  /**
+   * Get a unique key for the rpc stub to the given server.
+   */
+  static String getStubKey(String serviceName, ServerName serverName,
+      boolean hostnameCanChange) {
+    // Sometimes, servers go down and they come back up with the same hostname 
but a different
+    // IP address. Force a resolution of the rsHostname by trying to 
instantiate an
+    // InetSocketAddress, and this way we will rightfully get a new stubKey.
+    // Also, include the hostname in the key so as to take care of those cases 
where the
+    // DNS name is different but IP address remains the same.
+    String hostname = serverName.getHostname();
+    int port = serverName.getPort();
+    if (hostnameCanChange) {
+      try {
+        InetAddress ip = InetAddress.getByName(hostname);
+        return serviceName + "@" + hostname + "-" + ip.getHostAddress() + ":" 
+ port;
+      } catch (UnknownHostException e) {
+        LOG.warn("Can not resolve " + hostname + ", please check your 
network", e);
+      }
+    }
+    return serviceName + "@" + hostname + ":" + port;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
index e940143..91a20ec 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.reflect.UndeclaredThrowableException;
@@ -32,12 +34,11 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.ipc.RemoteException;
 
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
-
 /**
  * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client
  * threadlocal outstanding timeouts as so we don't persist too much.
@@ -66,18 +67,18 @@ public class RpcRetryingCallerImpl<T> implements 
RpcRetryingCaller<T> {
   public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) 
{
     this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 
startLogErrorsCnt, 0);
   }
-  
+
   public RpcRetryingCallerImpl(long pause, int retries,
       RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int 
rpcTimeout) {
     this.pause = pause;
-    this.maxAttempts = retries + 1;
+    this.maxAttempts = retries2Attempts(retries);
     this.interceptor = interceptor;
     context = interceptor.createEmptyContext();
     this.startLogErrorsCnt = startLogErrorsCnt;
     this.tracker = new RetryingTimeTracker();
     this.rpcTimeout = rpcTimeout;
   }
-  
+
   @Override
   public void cancel(){
     cancelled.set(true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKClusterRegistry.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKClusterRegistry.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKClusterRegistry.java
new file mode 100644
index 0000000..d385136
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKClusterRegistry.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+
+/**
+ * Cache the cluster registry data in memory and use zk watcher to update. The 
only exception is
+ * {@link #getClusterId()}, it will fetch the data from zk directly.
+ */
+@InterfaceAudience.Private
+class ZKClusterRegistry implements ClusterRegistry {
+
+  private static final Log LOG = LogFactory.getLog(ZKClusterRegistry.class);
+
+  private final RecoverableZooKeeper zk;
+
+  private final ZNodePaths znodePaths;
+
+  ZKClusterRegistry(Configuration conf) throws IOException {
+    this.znodePaths = new ZNodePaths(conf);
+    int zkSessionTimeout = conf.getInt(ZK_SESSION_TIMEOUT, 
DEFAULT_ZK_SESSION_TIMEOUT);
+    int zkRetry = conf.getInt("zookeeper.recovery.retry", 3);
+    int zkRetryIntervalMs = 
conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
+    this.zk = new 
RecoverableZooKeeper(ZKConfig.getZKQuorumServersString(conf), zkSessionTimeout,
+        null, zkRetry, zkRetryIntervalMs);
+  }
+
+  @Override
+  public String getClusterId() {
+    try {
+      byte[] data = zk.getData(znodePaths.clusterIdZNode, false, null);
+      if (data == null || data.length == 0) {
+        return null;
+      }
+      return ClusterId.parseFrom(data).toString();
+    } catch (Exception e) {
+      LOG.warn("failed to get cluster id", e);
+      return null;
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      zk.close();
+    } catch (InterruptedException e) {
+      LOG.warn("close zookeeper failed", e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java
index b7b9beb..775f8bd 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java
@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.hbase.util;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
@@ -104,4 +106,26 @@ public class CollectionUtils {
     }
     return list.get(list.size() - 1);
   }
+
+  /**
+   * A supplier that throws IOException when get.
+   */
+  @FunctionalInterface
+  public interface IOExceptionSupplier<V> {
+    V get() throws IOException;
+  }
+
+  /**
+   * In HBASE-16648 we found that ConcurrentHashMap.get is much faster than 
computeIfAbsent if the
+   * value already exists. So here we copy the implementation of
+   * {@link ConcurrentMap#computeIfAbsent(Object, 
java.util.function.Function)}. It uses get and
+   * putIfAbsent to implement computeIfAbsent. And notice that the 
implementation does not guarantee
+   * that the supplier will only be executed once.
+   */
+  public static <K, V> V computeIfAbsentEx(ConcurrentMap<K, V> map, K key,
+      IOExceptionSupplier<V> supplier) throws IOException {
+    V v, newValue;
+    return ((v = map.get(key)) == null && (newValue = supplier.get()) != null
+        && (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
index 15b3930..740f9ee 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
@@ -52,6 +52,7 @@ public class ReflectionUtils {
 
   private static <T> T instantiate(final String className, Constructor<T> 
ctor, Object[] ctorArgs) {
     try {
+      ctor.setAccessible(true);
       return ctor.newInstance(ctorArgs);
     } catch (IllegalAccessException e) {
       throw new UnsupportedOperationException(
@@ -65,14 +66,13 @@ public class ReflectionUtils {
     }
   }
 
-  @SuppressWarnings("unchecked")
   public static <T> T newInstance(Class<T> type, Object... params) {
     return instantiate(type.getName(), findConstructor(type, params), params);
   }
 
   @SuppressWarnings("unchecked")
   public static <T> Constructor<T> findConstructor(Class<T> type, Object... 
paramTypes) {
-    Constructor<T>[] constructors = (Constructor<T>[])type.getConstructors();
+    Constructor<T>[] constructors = (Constructor<T>[]) 
type.getDeclaredConstructors();
     for (Constructor<T> ctor : constructors) {
       Class<?>[] ctorParamTypes = ctor.getParameterTypes();
       if (ctorParamTypes.length != paramTypes.length) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
new file mode 100644
index 0000000..fd3938e
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncSingleRequestRpcRetryingCaller {
+
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[] QUALIFIER = Bytes.toBytes("cq");
+
+  private static byte[] ROW = Bytes.toBytes("row");
+
+  private static byte[] VALUE = Bytes.toBytes("value");
+
+  private AsyncConnectionImpl asyncConn;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(2);
+    TEST_UTIL.getAdmin().setBalancerRunning(false, true);
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @After
+  public void tearDown() {
+    if (asyncConn != null) {
+      asyncConn.close();
+      asyncConn = null;
+    }
+  }
+
+  private void initConn(int startLogErrorsCnt, long pauseMs, int maxRetires) 
throws IOException {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, 
startLogErrorsCnt);
+    conf.setLong(HConstants.HBASE_CLIENT_PAUSE, pauseMs);
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, maxRetires);
+    asyncConn = new AsyncConnectionImpl(conf, User.getCurrent());
+  }
+
+  @Test
+  public void testRegionMove() throws InterruptedException, 
ExecutionException, IOException {
+    initConn(0, 100, 30);
+    // This will leave a cached entry in location cache
+    HRegionLocation loc = 
asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
+    int index = 
TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegionInfo().getRegionName());
+    TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), 
Bytes.toBytes(
+      TEST_UTIL.getHBaseCluster().getRegionServer(1 - 
index).getServerName().getServerName()));
+    AsyncTable table = asyncConn.getTable(TABLE_NAME);
+    table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
+
+    // move back
+    TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(),
+      Bytes.toBytes(loc.getServerName().getServerName()));
+    Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
+    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
+  }
+
+  private <T> CompletableFuture<T> failedFuture() {
+    CompletableFuture<T> future = new CompletableFuture<>();
+    future.completeExceptionally(new RuntimeException("Inject error!"));
+    return future;
+  }
+
+  @Test
+  public void testMaxRetries() throws IOException, InterruptedException {
+    initConn(0, 10, 2);
+    try {
+      
asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, 
TimeUnit.DAYS)
+          .action((controller, loc, stub) -> failedFuture()).call().get();
+      fail();
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
+    }
+  }
+
+  @Test
+  public void testOperationTimeout() throws IOException, InterruptedException {
+    initConn(0, 100, Integer.MAX_VALUE);
+    long startNs = System.nanoTime();
+    try {
+      asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW)
+          .operationTimeout(1, TimeUnit.SECONDS).action((controller, loc, 
stub) -> failedFuture())
+          .call().get();
+      fail();
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
+    }
+    long costNs = System.nanoTime() - startNs;
+    assertTrue(costNs >= TimeUnit.SECONDS.toNanos(1));
+    assertTrue(costNs < TimeUnit.SECONDS.toNanos(2));
+  }
+
+  @Test
+  public void testLocateError() throws IOException, InterruptedException, 
ExecutionException {
+    initConn(0, 100, 5);
+    AtomicBoolean errorTriggered = new AtomicBoolean(false);
+    AtomicInteger count = new AtomicInteger(0);
+    HRegionLocation loc = 
asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
+
+    try (AsyncRegionLocator mockedLocator = new 
AsyncRegionLocator(asyncConn.getConfiguration()) {
+      @Override
+      CompletableFuture<HRegionLocation> getRegionLocation(TableName 
tableName, byte[] row,
+          boolean reload) {
+        if (tableName.equals(TABLE_NAME)) {
+          CompletableFuture<HRegionLocation> future = new 
CompletableFuture<>();
+          if (count.getAndIncrement() == 0) {
+            errorTriggered.set(true);
+            future.completeExceptionally(new RuntimeException("Inject 
error!"));
+          } else {
+            future.complete(loc);
+          }
+          return future;
+        } else {
+          return super.getRegionLocation(tableName, row, reload);
+        }
+      }
+
+      @Override
+      void updateCachedLocations(TableName tableName, byte[] regionName, 
byte[] row,
+          Object exception, ServerName source) {
+      }
+    };
+        AsyncConnectionImpl mockedConn = new 
AsyncConnectionImpl(asyncConn.getConfiguration(),
+            User.getCurrent()) {
+
+          @Override
+          AsyncRegionLocator getLocator() {
+            return mockedLocator;
+          }
+        }) {
+      AsyncTable table = new AsyncTableImpl(mockedConn, TABLE_NAME);
+      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
+      assertTrue(errorTriggered.get());
+      errorTriggered.set(false);
+      count.set(0);
+      Result result = table.get(new Get(ROW).addColumn(FAMILY, 
QUALIFIER)).get();
+      assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
+      assertTrue(errorTriggered.get());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe75082/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
new file mode 100644
index 0000000..0667de3
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTable {
+
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[] QUALIFIER = Bytes.toBytes("cq");
+
+  private static byte[] ROW = Bytes.toBytes("row");
+
+  private static byte[] VALUE = Bytes.toBytes("value");
+
+  private static AsyncConnection ASYNC_CONN;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), 
User.getCurrent());
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    ASYNC_CONN.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
+    assertTrue(table.exists(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get());
+    Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
+    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
+    table.delete(new Delete(ROW)).get();
+    result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
+    assertTrue(result.isEmpty());
+    assertFalse(table.exists(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get());
+  }
+
+  private byte[] concat(byte[] base, int index) {
+    return Bytes.toBytes(Bytes.toString(base) + "-" + index);
+  }
+
+  @Test
+  public void testMultiple() throws Exception {
+    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    int count = 100;
+    CountDownLatch putLatch = new CountDownLatch(count);
+    IntStream.range(0, count).forEach(
+      i -> table.put(new Put(concat(ROW, i)).addColumn(FAMILY, QUALIFIER, 
concat(VALUE, i)))
+          .thenAccept(x -> putLatch.countDown()));
+    putLatch.await();
+    BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count);
+    IntStream.range(0, count)
+        .forEach(i -> table.exists(new Get(concat(ROW, i)).addColumn(FAMILY, 
QUALIFIER))
+            .thenAccept(x -> existsResp.add(x)));
+    for (int i = 0; i < count; i++) {
+      assertTrue(existsResp.take());
+    }
+    BlockingQueue<Pair<Integer, Result>> getResp = new 
ArrayBlockingQueue<>(count);
+    IntStream.range(0, count)
+        .forEach(i -> table.get(new Get(concat(ROW, i)).addColumn(FAMILY, 
QUALIFIER))
+            .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
+    for (int i = 0; i < count; i++) {
+      Pair<Integer, Result> pair = getResp.take();
+      assertArrayEquals(concat(VALUE, pair.getFirst()),
+        pair.getSecond().getValue(FAMILY, QUALIFIER));
+    }
+    CountDownLatch deleteLatch = new CountDownLatch(count);
+    IntStream.range(0, count).forEach(
+      i -> table.delete(new Delete(concat(ROW, i))).thenAccept(x -> 
deleteLatch.countDown()));
+    deleteLatch.await();
+    IntStream.range(0, count)
+        .forEach(i -> table.exists(new Get(concat(ROW, i)).addColumn(FAMILY, 
QUALIFIER))
+            .thenAccept(x -> existsResp.add(x)));
+    for (int i = 0; i < count; i++) {
+      assertFalse(existsResp.take());
+    }
+    IntStream.range(0, count)
+        .forEach(i -> table.get(new Get(concat(ROW, i)).addColumn(FAMILY, 
QUALIFIER))
+            .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
+    for (int i = 0; i < count; i++) {
+      Pair<Integer, Result> pair = getResp.take();
+      assertTrue(pair.getSecond().isEmpty());
+    }
+  }
+}

Reply via email to