HBASE-12730 Backport HBASE-5162 (Basic client pushback mechanism) to branch-1


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/04a003d6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/04a003d6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/04a003d6

Branch: refs/heads/branch-1
Commit: 04a003d6a2b628bc551a55ae556c1b44485e3022
Parents: 05f4e0c
Author: Andrew Purtell <[email protected]>
Authored: Thu Jan 22 14:50:40 2015 -0800
Committer: Andrew Purtell <[email protected]>
Committed: Thu Jan 22 14:50:40 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       | 120 ++-
 .../hadoop/hbase/client/ClusterConnection.java  |  13 +-
 .../hadoop/hbase/client/ConnectionAdapter.java  |  12 +
 .../hadoop/hbase/client/ConnectionManager.java  |  26 +-
 .../hadoop/hbase/client/DelayingRunner.java     | 116 +++
 .../org/apache/hadoop/hbase/client/HTable.java  |   5 +-
 .../apache/hadoop/hbase/client/MultiAction.java |  17 +-
 .../org/apache/hadoop/hbase/client/Result.java  |  20 +
 .../hadoop/hbase/client/ResultStatsUtil.java    |  76 ++
 .../hbase/client/RpcRetryingCallerFactory.java  |  48 +-
 .../hbase/client/ServerStatisticTracker.java    |  75 ++
 .../client/StatsTrackingRpcRetryingCaller.java  |  78 ++
 .../client/backoff/ClientBackoffPolicy.java     |  42 +
 .../backoff/ClientBackoffPolicyFactory.java     |  59 ++
 .../backoff/ExponentialClientBackoffPolicy.java |  71 ++
 .../hbase/client/backoff/ServerStatistics.java  |  68 ++
 .../hbase/ipc/RegionCoprocessorRpcChannel.java  |   2 +-
 .../hbase/protobuf/ResponseConverter.java       |  16 +-
 .../client/TestClientExponentialBackoff.java    | 110 +++
 .../hadoop/hbase/client/TestDelayingRunner.java |  62 ++
 .../org/apache/hadoop/hbase/HConstants.java     |   6 +
 .../hbase/protobuf/generated/ClientProtos.java  | 810 ++++++++++++++++++-
 hbase-protocol/src/main/protobuf/Client.proto   |  10 +
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |   3 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  34 +-
 .../hbase/regionserver/RSRpcServices.java       |  19 +-
 .../regionserver/wal/WALEditsReplaySink.java    |   2 +-
 .../hbase/client/HConnectionTestingUtility.java |   2 +-
 .../hadoop/hbase/client/TestClientPushback.java | 151 ++++
 29 files changed, 1984 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index eef4035..c72fb0f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -313,7 +315,8 @@ class AsyncProcess {
    * Uses default ExecutorService for this AP (must have been created with 
one).
    */
   public <CResult> AsyncRequestFuture submit(TableName tableName, List<? 
extends Row> rows,
-      boolean atLeastOne, Batch.Callback<CResult> callback, boolean 
needResults) throws InterruptedIOException {
+      boolean atLeastOne, Batch.Callback<CResult> callback, boolean 
needResults)
+      throws InterruptedIOException {
     return submit(null, tableName, rows, atLeastOne, callback, needResults);
   }
 
@@ -374,7 +377,7 @@ class AsyncProcess {
           locationErrors = new ArrayList<Exception>();
           locationErrorRows = new ArrayList<Integer>();
           LOG.error("Failed to get region location ", ex);
-          // This action failed before creating ars. Add it to retained but do 
not add to submit list.
+          // This action failed before creating ars. Retain it, but do not add 
to submit list.
           // We will then add it to ars in an already-failed state.
           retainedActions.add(new Action<Row>(r, ++posInList));
           locationErrors.add(ex);
@@ -918,14 +921,12 @@ class AsyncProcess {
       return loc;
     }
 
-
-
     /**
      * Send a multi action structure to the servers, after a delay depending 
on the attempt
      * number. Asynchronous.
      *
      * @param actionsByServer the actions structured by regions
-     * @param numAttempt      the attempt number.
+     * @param numAttempt the attempt number.
      * @param actionsForReplicaThread original actions for replica thread; 
null on non-first call.
      */
     private void sendMultiAction(Map<ServerName, MultiAction<Row>> 
actionsByServer,
@@ -935,33 +936,98 @@ class AsyncProcess {
       int actionsRemaining = actionsByServer.size();
       // This iteration is by server (the HRegionLocation comparator is by 
server portion only).
       for (Map.Entry<ServerName, MultiAction<Row>> e : 
actionsByServer.entrySet()) {
-        final ServerName server = e.getKey();
-        final MultiAction<Row> multiAction = e.getValue();
+        ServerName server = e.getKey();
+        MultiAction<Row> multiAction = e.getValue();
         incTaskCounters(multiAction.getRegions(), server);
-        Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction",
-            new SingleServerRequestRunnable(multiAction, numAttempt, server));
-        if ((--actionsRemaining == 0) && reuseThread) {
-          runnable.run();
-        } else {
-          try {
-            pool.submit(runnable);
-          } catch (RejectedExecutionException ree) {
-            // This should never happen. But as the pool is provided by the 
end user, let's secure
-            //  this a little.
-            decTaskCounters(multiAction.getRegions(), server);
-            LOG.warn("#" + id + ", the task was rejected by the pool. This is 
unexpected." +
-                " Server is " + server.getServerName(), ree);
-            // We're likely to fail again, but this will increment the attempt 
counter, so it will
-            //  finish.
-            receiveGlobalFailure(multiAction, server, numAttempt, ree);
+        Collection<? extends Runnable> runnables = 
getNewMultiActionRunnable(server, multiAction,
+            numAttempt);
+        // make sure we correctly count the number of runnables before we try 
to reuse the send
+        // thread, in case we had to split the request into different 
runnables because of backoff
+        if (runnables.size() > actionsRemaining) {
+          actionsRemaining = runnables.size();
+        }
+
+        // run all the runnables
+        for (Runnable runnable : runnables) {
+          if ((--actionsRemaining == 0) && reuseThread) {
+            runnable.run();
+          } else {
+            try {
+              pool.submit(runnable);
+            } catch (RejectedExecutionException ree) {
+              // This should never happen. But as the pool is provided by the 
end user, let's secure
+              //  this a little.
+              decTaskCounters(multiAction.getRegions(), server);
+              LOG.warn("#" + id + ", the task was rejected by the pool. This 
is unexpected." +
+                  " Server is " + server.getServerName(), ree);
+              // We're likely to fail again, but this will increment the 
attempt counter, so it will
+              //  finish.
+              receiveGlobalFailure(multiAction, server, numAttempt, ree);
+            }
           }
         }
       }
+
       if (actionsForReplicaThread != null) {
         startWaitingForReplicaCalls(actionsForReplicaThread);
       }
     }
 
+    private Collection<? extends Runnable> 
getNewMultiActionRunnable(ServerName server,
+        MultiAction<Row> multiAction,
+        int numAttempt) {
+      // no stats to manage, just do the standard action
+      if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
+        return 
Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
+            new SingleServerRequestRunnable(multiAction, numAttempt, server)));
+      }
+
+      // group the actions by the amount of delay
+      Map<Long, DelayingRunner> actions = new HashMap<Long, 
DelayingRunner>(multiAction
+          .size());
+
+      // split up the actions
+      for (Map.Entry<byte[], List<Action<Row>>> e : 
multiAction.actions.entrySet()) {
+        Long backoff = getBackoff(server, e.getKey());
+        DelayingRunner runner = actions.get(backoff);
+        if (runner == null) {
+          actions.put(backoff, new DelayingRunner(backoff, e));
+        } else {
+          runner.add(e);
+        }
+      }
+
+      List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
+      for (DelayingRunner runner : actions.values()) {
+        String traceText = "AsyncProcess.sendMultiAction";
+        Runnable runnable =
+            new SingleServerRequestRunnable(runner.getActions(), numAttempt, 
server);
+        // use a delay runner only if we need to sleep for some time
+        if (runner.getSleepTime() > 0) {
+          runner.setRunner(runnable);
+          traceText = "AsyncProcess.clientBackoff.sendMultiAction";
+          runnable = runner;
+        }
+        runnable = Trace.wrap(traceText, runnable);
+        toReturn.add(runnable);
+
+      }
+      return toReturn;
+    }
+
+    /**
+     * @param server server location where the target region is hosted
+     * @param regionName name of the region which we are going to write some 
data
+     * @return the amount of time the client should wait until it submit a 
request to the
+     * specified server and region
+     */
+    private Long getBackoff(ServerName server, byte[] regionName) {
+      ServerStatisticTracker tracker = 
AsyncProcess.this.connection.getStatisticsTracker();
+      ServerStatistics stats = tracker.getStats(server);
+      return AsyncProcess.this.connection.getBackoffPolicy()
+          .getBackoffTime(server, regionName, stats);
+    }
+
     /**
      * Starts waiting to issue replica calls on a different thread; or issues 
them immediately.
      */
@@ -1169,6 +1235,13 @@ class AsyncProcess {
               ++failed;
             }
           } else {
+            // update the stats about the region, if its a user table. We 
don't want to slow down
+            // updates to meta tables, especially from internal updates 
(master, etc).
+            if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
+              result = ResultStatsUtil.updateStats(result,
+                  AsyncProcess.this.connection.getStatisticsTracker(), server, 
regionName);
+            }
+
             if (callback != null) {
               try {
                 //noinspection unchecked
@@ -1497,7 +1570,6 @@ class AsyncProcess {
     }
   }
 
-
   @VisibleForTesting
   /** Create AsyncRequestFuture. Isolated to be easily overridden in the 
tests. */
   protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
index 62570b9..05d5c63 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
@@ -288,5 +290,14 @@ public interface ClusterConnection extends HConnection {
    * @return true if this is a managed connection.
    */
   boolean isManaged();
-}
 
+  /**
+   * @return the current statistics tracker associated with this connection
+   */
+  ServerStatisticTracker getStatisticsTracker();
+
+  /**
+   * @return the configured client backoff policy
+   */
+  ClientBackoffPolicy getBackoffPolicy();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
index 7c8f8d8..a1b71f4 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
@@ -442,4 +444,14 @@ abstract class ConnectionAdapter implements 
ClusterConnection {
   public boolean isManaged() {
     return wrappedConnection.isManaged();
   }
+
+  @Override
+  public ServerStatisticTracker getStatisticsTracker() {
+    return wrappedConnection.getStatisticsTracker();
+  }
+
+  @Override
+  public ClientBackoffPolicy getBackoffPolicy() {
+    return wrappedConnection.getBackoffPolicy();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 7bfa972..581bde8 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -64,6 +64,8 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@@ -537,6 +539,8 @@ class ConnectionManager {
     final int rpcTimeout;
     private NonceGenerator nonceGenerator = null;
     private final AsyncProcess asyncProcess;
+    // single tracker per connection
+    private final ServerStatisticTracker stats;
 
     private volatile boolean closed;
     private volatile boolean aborted;
@@ -592,6 +596,8 @@ class ConnectionManager {
      */
      Registry registry;
 
+    private final ClientBackoffPolicy backoffPolicy;
+
      HConnectionImplementation(Configuration conf, boolean managed) throws 
IOException {
        this(conf, managed, null, null);
      }
@@ -666,9 +672,11 @@ class ConnectionManager {
       } else {
         this.nonceGenerator = new NoNonceGenerator();
       }
+      stats = ServerStatisticTracker.create(conf);
       this.asyncProcess = createAsyncProcess(this.conf);
       this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
-      this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, 
interceptor);
+      this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, 
interceptor, this.stats);
+      this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
     }
 
     @Override
@@ -2184,7 +2192,8 @@ class ConnectionManager {
     protected AsyncProcess createAsyncProcess(Configuration conf) {
       // No default pool available.
       return new AsyncProcess(this, conf, this.batchPool,
-          RpcRetryingCallerFactory.instantiate(conf), false, 
RpcControllerFactory.instantiate(conf));
+          RpcRetryingCallerFactory.instantiate(conf, 
this.getStatisticsTracker()), false,
+          RpcControllerFactory.instantiate(conf));
     }
 
     @Override
@@ -2192,6 +2201,16 @@ class ConnectionManager {
       return asyncProcess;
     }
 
+    @Override
+    public ServerStatisticTracker getStatisticsTracker() {
+      return this.stats;
+    }
+
+    @Override
+    public ClientBackoffPolicy getBackoffPolicy() {
+      return this.backoffPolicy;
+    }
+
     /*
      * Return the number of cached region for a table. It will only be called
      * from a unit test.
@@ -2469,7 +2488,8 @@ class ConnectionManager {
 
     @Override
     public RpcRetryingCallerFactory 
getNewRpcRetryingCallerFactory(Configuration conf) {
-      return RpcRetryingCallerFactory.instantiate(conf, this.interceptor);
+      return RpcRetryingCallerFactory
+          .instantiate(conf, this.interceptor, this.getStatisticsTracker());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java
new file mode 100644
index 0000000..83c73b6
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper for a runnable for a group of actions for a single regionserver.
+ * <p>
+ * This can be used to build up the actions that should be taken and then
+ * </p>
+ * <p>
+ * This class exists to simulate using a ScheduledExecutorService with just a 
regular
+ * ExecutorService and Runnables. It is used for legacy reasons in the the 
client; this could
+ * only be removed if we change the expectations in HTable around the pool the 
client is able to
+ * pass in and even if we deprecate the current APIs would require keeping 
this class around
+ * for the interim to bridge between the legacy ExecutorServices and the 
scheduled pool.
+ * </p>
+ */
[email protected]
+public class DelayingRunner<T> implements Runnable {
+  private static final Log LOG = LogFactory.getLog(DelayingRunner.class);
+
+  private final Object sleepLock = new Object();
+  private boolean triggerWake = false;
+  private long sleepTime;
+  private MultiAction<T> actions = new MultiAction<T>();
+  private Runnable runnable;
+
+  public DelayingRunner(long sleepTime, Map.Entry<byte[], List<Action<T>>> e) {
+    this.sleepTime = sleepTime;
+    add(e);
+  }
+
+  public void setRunner(Runnable runner) {
+    this.runnable = runner;
+  }
+
+  @Override
+  public void run() {
+    if (!sleep()) {
+      LOG.warn(
+          "Interrupted while sleeping for expected sleep time " + sleepTime + 
" ms");
+    }
+    //TODO maybe we should consider switching to a listenableFuture for the 
actual callable and
+    // then handling the results/errors as callbacks. That way we can 
decrement outstanding tasks
+    // even if we get interrupted here, but for now, we still need to run so 
we decrement the
+    // outstanding tasks
+    this.runnable.run();
+  }
+
+  /**
+   * Sleep for an expected amount of time.
+   * <p>
+   * This is nearly a copy of what the Sleeper does, but with the ability to 
know if you
+   * got interrupted while sleeping.
+   * </p>
+   *
+   * @return <tt>true</tt> if the sleep completely entirely successfully,
+   * but otherwise <tt>false</tt> if the sleep was interrupted.
+   */
+  private boolean sleep() {
+    long now = EnvironmentEdgeManager.currentTime();
+    long startTime = now;
+    long waitTime = sleepTime;
+    while (waitTime > 0) {
+      long woke = -1;
+      try {
+        synchronized (sleepLock) {
+          if (triggerWake) break;
+          sleepLock.wait(waitTime);
+        }
+        woke = EnvironmentEdgeManager.currentTime();
+      } catch (InterruptedException iex) {
+        return false;
+      }
+      // Recalculate waitTime.
+      woke = (woke == -1) ? EnvironmentEdgeManager.currentTime() : woke;
+      waitTime = waitTime - (woke - startTime);
+    }
+    return true;
+  }
+
+  public void add(Map.Entry<byte[], List<Action<T>>> e) {
+    actions.add(e.getKey(), e.getValue());
+  }
+
+  public MultiAction<T> getActions() {
+    return actions;
+  }
+
+  public long getSleepTime() {
+    return sleepTime;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 9307667..e124b26 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -1872,8 +1872,9 @@ public class HTable implements HTableInterface {
 
     AsyncProcess asyncProcess =
         new AsyncProcess(connection, configuration, pool,
-            RpcRetryingCallerFactory.instantiate(configuration), true,
-            RpcControllerFactory.instantiate(configuration));
+            RpcRetryingCallerFactory.instantiate(configuration, 
connection.getStatisticsTracker()),
+            true, RpcControllerFactory.instantiate(configuration));
+
     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
         new Callback<ClientProtos.CoprocessorServiceResult>() {
           @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
index eefe40d..0a9055e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.client;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -68,12 +69,24 @@ public final class MultiAction<R> {
    * @param a
    */
   public void add(byte[] regionName, Action<R> a) {
+    add(regionName, Arrays.asList(a));
+  }
+
+  /**
+   * Add an Action to this container based on it's regionName. If the 
regionName
+   * is wrong, the initial execution will fail, but will be automatically
+   * retried after looking up the correct region.
+   *
+   * @param regionName
+   * @param actionList list of actions to add for the region
+   */
+  public void add(byte[] regionName, List<Action<R>> actionList){
     List<Action<R>> rsActions = actions.get(regionName);
     if (rsActions == null) {
-      rsActions = new ArrayList<Action<R>>();
+      rsActions = new ArrayList<Action<R>>(actionList.size());
       actions.put(regionName, rsActions);
     }
-    rsActions.add(a);
+    rsActions.addAll(actionList);
   }
 
   public void setNonceGroup(long nonceGroup) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index 401710e..6730863 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -37,6 +37,9 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -94,6 +97,7 @@ public class Result implements CellScannable, CellScanner {
    * Index for where we are when Result is acting as a {@link CellScanner}.
    */
   private int cellScannerIndex = INITIAL_CELLSCANNER_INDEX;
+  private ClientProtos.RegionLoadStats stats;
 
   /**
    * Creates an empty Result w/ no KeyValue payload; returns null if you call 
{@link #rawCells()}.
@@ -871,4 +875,20 @@ public class Result implements CellScannable, CellScanner {
   public boolean isStale() {
     return stale;
   }
+
+  /**
+   * Add load information about the region to the information about the result
+   * @param loadStats statistics about the current region from which this was 
returned
+   */
+  public void addResults(ClientProtos.RegionLoadStats loadStats) {
+    this.stats = loadStats;
+  }
+
+  /**
+   * @return the associated statistics about the region from which this was 
returned. Can be
+   * <tt>null</tt> if stats are disabled.
+   */
+  public ClientProtos.RegionLoadStats getStats() {
+    return stats;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
new file mode 100644
index 0000000..3caa63e
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+
+/**
+ * A {@link Result} with some statistics about the server/region status
+ */
[email protected]
+public final class ResultStatsUtil {
+
+  private ResultStatsUtil() {
+    //private ctor for util class
+  }
+
+  /**
+   * Update the stats for the specified region if the result is an instance of 
{@link
+   * ResultStatsUtil}
+   *
+   * @param r object that contains the result and possibly the statistics 
about the region
+   * @param serverStats stats tracker to update from the result
+   * @param server server from which the result was obtained
+   * @param regionName full region name for the stats.
+   * @return the underlying {@link Result} if the passed result is an {@link
+   * ResultStatsUtil} or just returns the result;
+   */
+  public static <T> T updateStats(T r, ServerStatisticTracker serverStats,
+      ServerName server, byte[] regionName) {
+    if (!(r instanceof Result)) {
+      return r;
+    }
+    Result result = (Result) r;
+    // early exit if there are no stats to collect
+    ClientProtos.RegionLoadStats stats = result.getStats();
+    if(stats == null){
+      return r;
+    }
+
+    if (regionName != null) {
+      serverStats.updateRegionStats(server, regionName, stats);
+    }
+
+    return r;
+  }
+
+  public static <T> T updateStats(T r, ServerStatisticTracker stats,
+      HRegionLocation regionLocation) {
+    byte[] regionName = null;
+    ServerName server = null;
+    if (regionLocation != null) {
+      server = regionLocation.getServerName();
+      regionName = regionLocation.getRegionInfo().getRegionName();
+    }
+
+    return updateStats(r, stats, server, regionName);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
index 9f05997..1bf7bb0 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
@@ -35,6 +35,8 @@ public class RpcRetryingCallerFactory {
   private final int retries;
   private final RetryingCallerInterceptor interceptor;
   private final int startLogErrorsCnt;
+  private final boolean enableBackPressure;
+  private ServerStatisticTracker stats;
 
   public RpcRetryingCallerFactory(Configuration conf) {
     this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
@@ -49,27 +51,57 @@ public class RpcRetryingCallerFactory {
     startLogErrorsCnt = 
conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
         AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
     this.interceptor = interceptor;
+    enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
+        HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
+  }
+
+  /**
+   * Set the tracker that should be used for tracking statistics about the 
server
+   */
+  public void setStatisticTracker(ServerStatisticTracker statisticTracker) {
+    this.stats = statisticTracker;
   }
 
   public <T> RpcRetryingCaller<T> newCaller() {
     // We store the values in the factory instance. This way, constructing new 
objects
     //  is cheap as it does not require parsing a complex structure.
-      return new RpcRetryingCaller<T>(pause, retries, interceptor, 
startLogErrorsCnt);
+    RpcRetryingCaller<T> caller = new RpcRetryingCaller<T>(pause, retries, 
interceptor,
+        startLogErrorsCnt);
+
+    // wrap it with stats, if we are tracking them
+    if (enableBackPressure && this.stats != null) {
+      caller = new StatsTrackingRpcRetryingCaller<T>(pause, retries, 
interceptor,
+          startLogErrorsCnt, stats);
+    }
+
+    return caller;
   }
 
   public static RpcRetryingCallerFactory instantiate(Configuration 
configuration) {
-    return instantiate(configuration, 
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
+    return instantiate(configuration, 
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
   }
-  
+
   public static RpcRetryingCallerFactory instantiate(Configuration 
configuration,
-      RetryingCallerInterceptor interceptor) {
+      ServerStatisticTracker stats) {
+    return instantiate(configuration, 
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats);
+  }
+
+  public static RpcRetryingCallerFactory instantiate(Configuration 
configuration,
+      RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) {
     String clazzName = RpcRetryingCallerFactory.class.getName();
     String rpcCallerFactoryClazz =
         configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, 
clazzName);
+    RpcRetryingCallerFactory factory;
     if (rpcCallerFactoryClazz.equals(clazzName)) {
-      return new RpcRetryingCallerFactory(configuration, interceptor);
+      factory = new RpcRetryingCallerFactory(configuration, interceptor);
+    } else {
+      factory = ReflectionUtils.instantiateWithCustomCtor(
+          rpcCallerFactoryClazz, new Class[] { Configuration.class },
+          new Object[] { configuration });
     }
-    return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
-      new Class[] { Configuration.class }, new Object[] { configuration });
+
+    // setting for backwards compat with existing caller factories, rather 
than in the ctor
+    factory.setStatisticTracker(stats);
+    return factory;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
new file mode 100644
index 0000000..42da0b3
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Tracks the statistics for multiple regions
+ */
[email protected]
+public class ServerStatisticTracker {
+
+  private final ConcurrentHashMap<ServerName, ServerStatistics> stats =
+      new ConcurrentHashMap<ServerName, ServerStatistics>();
+
+  public void updateRegionStats(ServerName server, byte[] region, 
ClientProtos.RegionLoadStats
+      currentStats) {
+    ServerStatistics stat = stats.get(server);
+
+    if (stat == null) {
+      stat = stats.get(server);
+      // We don't have stats for that server yet, so we need to make an entry.
+      // If we race with another thread it's a harmless unnecessary allocation.
+      if (stat == null) {
+        stat = new ServerStatistics();
+        ServerStatistics old = stats.putIfAbsent(server, stat);
+        if (old != null) {
+          stat = old;
+       }
+      }
+    }
+    stat.update(region, currentStats);
+  }
+
+  public ServerStatistics getStats(ServerName server) {
+    return this.stats.get(server);
+  }
+
+  public static ServerStatisticTracker create(Configuration conf) {
+    if (!conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
+        HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE)) {
+      return null;
+    }
+    return new ServerStatisticTracker();
+  }
+
+  @VisibleForTesting
+  ServerStatistics getServerStatsForTesting(ServerName server) {
+    return stats.get(server);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
new file mode 100644
index 0000000..fc175bb
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import java.io.IOException;
+
+/**
+ * An {@link RpcRetryingCaller} that will update the per-region stats for the 
call on return,
+ * if stats are available
+ */
[email protected]
+public class StatsTrackingRpcRetryingCaller<T> extends RpcRetryingCaller<T> {
+  private final ServerStatisticTracker stats;
+
+  public StatsTrackingRpcRetryingCaller(long pause, int retries, int 
startLogErrorsCnt,
+      ServerStatisticTracker stats) {
+    super(pause, retries, startLogErrorsCnt);
+    this.stats = stats;
+  }
+
+  public StatsTrackingRpcRetryingCaller(long pause, int retries,
+      RetryingCallerInterceptor interceptor, int startLogErrorsCnt,
+      ServerStatisticTracker stats) {
+    super(pause, retries, interceptor, startLogErrorsCnt);
+    this.stats = stats;
+  }
+
+  @Override
+  public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
+      throws IOException, RuntimeException {
+    T result = super.callWithRetries(callable, callTimeout);
+    return updateStatsAndUnwrap(result, callable);
+  }
+
+  @Override
+  public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
+      throws IOException, RuntimeException {
+    T result = super.callWithRetries(callable, callTimeout);
+    return updateStatsAndUnwrap(result, callable);
+  }
+
+  private T updateStatsAndUnwrap(T result, RetryingCallable<T> callable) {
+    // don't track stats about requests that aren't to regionservers
+    if (!(callable instanceof RegionServerCallable)) {
+      return result;
+    }
+
+    // mutli-server callables span multiple regions, so they don't have a 
location,
+    // but they are region server callables, so we have to handle them when we 
process the
+    // result in AsyncProcess#receiveMultiAction, not in here
+    if (callable instanceof MultiServerCallable) {
+      return result;
+    }
+
+    // update the stats for the single server callable
+    RegionServerCallable<T> regionCallable = (RegionServerCallable) callable;
+    HRegionLocation location = regionCallable.getLocation();
+    return ResultStatsUtil.updateStats(result, stats, location);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java
new file mode 100644
index 0000000..94e434f
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Configurable policy for the amount of time a client should wait for a new 
request to the
+ * server when given the server load statistics.
+ * <p>
+ * Must have a single-argument constructor that takes a {@link 
org.apache.hadoop.conf.Configuration}
+ * </p>
+ */
[email protected]
[email protected]
+public interface ClientBackoffPolicy {
+
+  public static final String BACKOFF_POLICY_CLASS =
+      "hbase.client.statistics.backoff-policy";
+
+  /**
+   * @return the number of ms to wait on the client based on the
+   */
+  public long getBackoffTime(ServerName serverName, byte[] region, 
ServerStatistics stats);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java
new file mode 100644
index 0000000..879a0e2
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
[email protected]
[email protected]
+public final class ClientBackoffPolicyFactory {
+
+  private static final Log LOG = 
LogFactory.getLog(ClientBackoffPolicyFactory.class);
+
+  private ClientBackoffPolicyFactory() {
+  }
+
+  public static ClientBackoffPolicy create(Configuration conf) {
+    // create the backoff policy
+    String className =
+        conf.get(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, 
NoBackoffPolicy.class
+            .getName());
+      return ReflectionUtils.instantiateWithCustomCtor(className,
+          new Class<?>[] { Configuration.class }, new Object[] { conf });
+  }
+
+  /**
+   * Default backoff policy that doesn't create any backoff for the client, 
regardless of load
+   */
+  public static class NoBackoffPolicy implements ClientBackoffPolicy {
+    public NoBackoffPolicy(Configuration conf){
+      // necessary to meet contract
+    }
+
+    @Override
+    public long getBackoffTime(ServerName serverName, byte[] region, 
ServerStatistics stats) {
+      return 0;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java
new file mode 100644
index 0000000..6e75670
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Simple exponential backoff policy on for the client that uses a  percent^4 
times the
+ * max backoff to generate the backoff time.
+ */
[email protected]
[email protected]
+public class ExponentialClientBackoffPolicy implements ClientBackoffPolicy {
+
+  private static final Log LOG = 
LogFactory.getLog(ExponentialClientBackoffPolicy.class);
+
+  private static final long ONE_MINUTE = 60 * 1000;
+  public static final long DEFAULT_MAX_BACKOFF = 5 * ONE_MINUTE;
+  public static final String MAX_BACKOFF_KEY = 
"hbase.client.exponential-backoff.max";
+  private long maxBackoff;
+
+  public ExponentialClientBackoffPolicy(Configuration conf) {
+    this.maxBackoff = conf.getLong(MAX_BACKOFF_KEY, DEFAULT_MAX_BACKOFF);
+  }
+
+  @Override
+  public long getBackoffTime(ServerName serverName, byte[] region, 
ServerStatistics stats) {
+    // no stats for the server yet, so don't backoff
+    if (stats == null) {
+      return 0;
+    }
+
+    ServerStatistics.RegionStatistics regionStats = 
stats.getStatsForRegion(region);
+    // no stats for the region yet - don't backoff
+    if (regionStats == null) {
+      return 0;
+    }
+
+    // square the percent as a value less than 1. Closer we move to 100 
percent,
+    // the percent moves to 1, but squaring causes the exponential curve
+    double percent = regionStats.getMemstoreLoadPercent() / 100.0;
+    double multiplier = Math.pow(percent, 4.0);
+    // shouldn't ever happen, but just incase something changes in the 
statistic data
+    if (multiplier > 1) {
+      LOG.warn("Somehow got a backoff multiplier greater than the allowed 
backoff. Forcing back " +
+          "down to the max backoff");
+      multiplier = 1;
+    }
+    return (long) (multiplier * maxBackoff);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java
new file mode 100644
index 0000000..a3b8e11
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Track the statistics for a single region
+ */
[email protected]
+public class ServerStatistics {
+
+  private Map<byte[], RegionStatistics>
+      stats = new TreeMap<byte[], RegionStatistics>(Bytes.BYTES_COMPARATOR);
+
+  /**
+   * Good enough attempt. Last writer wins. It doesn't really matter which one 
gets to update,
+   * as something gets set
+   * @param region
+   * @param currentStats
+   */
+  public void update(byte[] region, ClientProtos.RegionLoadStats currentStats) 
{
+    RegionStatistics regionStat = this.stats.get(region);
+    if(regionStat == null){
+      regionStat = new RegionStatistics();
+      this.stats.put(region, regionStat);
+    }
+
+    regionStat.update(currentStats);
+  }
+
+  @InterfaceAudience.Private
+  public RegionStatistics getStatsForRegion(byte[] regionName){
+    return stats.get(regionName);
+  }
+
+  public static class RegionStatistics{
+    private int memstoreLoad = 0;
+
+    public void update(ClientProtos.RegionLoadStats currentStats) {
+      this.memstoreLoad = currentStats.getMemstoreLoad();
+    }
+
+    public int getMemstoreLoadPercent(){
+      return this.memstoreLoad;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
index 026c64b..cabe5e6 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
@@ -61,7 +61,7 @@ public class RegionCoprocessorRpcChannel extends 
CoprocessorRpcChannel{
     this.connection = conn;
     this.table = table;
     this.row = row;
-    this.rpcFactory = 
RpcRetryingCallerFactory.instantiate(conn.getConfiguration());
+    this.rpcFactory = 
RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null);
     this.operationTimeout = conn.getConfiguration().getInt(
         HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
index 725736a..9493668 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
@@ -114,17 +114,23 @@ public final class ResponseConverter {
       }
 
       for (ResultOrException roe : actionResult.getResultOrExceptionList()) {
+        Object responseValue;
         if (roe.hasException()) {
-          results.add(regionName, roe.getIndex(), 
ProtobufUtil.toException(roe.getException()));
+          responseValue = ProtobufUtil.toException(roe.getException());
         } else if (roe.hasResult()) {
-          results.add(regionName, roe.getIndex(), 
ProtobufUtil.toResult(roe.getResult(), cells));
+          responseValue = ProtobufUtil.toResult(roe.getResult(), cells);
+          // add the load stats, if we got any
+          if (roe.hasLoadStats()) {
+            ((Result) responseValue).addResults(roe.getLoadStats());
+          }
         } else if (roe.hasServiceResult()) {
-          results.add(regionName, roe.getIndex(), roe.getServiceResult());
+          responseValue = roe.getServiceResult();
         } else {
           // no result & no exception. Unexpected.
           throw new IllegalStateException("No result & no exception roe=" + 
roe +
               " for region " + actions.getRegion());
         }
+        results.add(regionName, roe.getIndex(), responseValue);
       }
     }
 
@@ -149,9 +155,11 @@ public final class ResponseConverter {
    * @param r
    * @return an action result builder
    */
-  public static ResultOrException.Builder buildActionResult(final 
ClientProtos.Result r) {
+  public static ResultOrException.Builder buildActionResult(final 
ClientProtos.Result r,
+      ClientProtos.RegionLoadStats stats) {
     ResultOrException.Builder builder = ResultOrException.newBuilder();
     if (r != null) builder.setResult(r);
+    if(stats != null) builder.setLoadStats(stats);
     return builder;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java
new file mode 100644
index 0000000..88e409d
--- /dev/null
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestClientExponentialBackoff {
+
+  ServerName server = Mockito.mock(ServerName.class);
+  byte[] regionname = Bytes.toBytes("region");
+
+  @Test
+  public void testNulls() {
+    Configuration conf = new Configuration(false);
+    ExponentialClientBackoffPolicy backoff = new 
ExponentialClientBackoffPolicy(conf);
+    assertEquals(0, backoff.getBackoffTime(null, null, null));
+
+    // server name doesn't matter to calculation, but check it now anyways
+    assertEquals(0, backoff.getBackoffTime(server, null, null));
+    assertEquals(0, backoff.getBackoffTime(server, regionname, null));
+
+    // check when no stats for the region yet
+    ServerStatistics stats = new ServerStatistics();
+    assertEquals(0, backoff.getBackoffTime(server, regionname, stats));
+  }
+
+  @Test
+  public void testMaxLoad() {
+    Configuration conf = new Configuration(false);
+    ExponentialClientBackoffPolicy backoff = new 
ExponentialClientBackoffPolicy(conf);
+
+    ServerStatistics stats = new ServerStatistics();
+    update(stats, 100);
+    assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, 
backoff.getBackoffTime(server,
+        regionname, stats));
+
+    // another policy with a different max timeout
+    long max = 100;
+    conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, max);
+    ExponentialClientBackoffPolicy backoffShortTimeout = new 
ExponentialClientBackoffPolicy(conf);
+    assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, 
stats));
+
+    // test beyond 100 still doesn't exceed the max
+    update(stats, 101);
+    assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, 
backoff.getBackoffTime(server,
+        regionname, stats));
+    assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, 
stats));
+
+    // and that when we are below 100, its less than the max timeout
+    update(stats, 99);
+    assertTrue(backoff.getBackoffTime(server,
+        regionname, stats) < 
ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF);
+    assertTrue(backoffShortTimeout.getBackoffTime(server, regionname, stats) < 
max);
+  }
+
+  /**
+   * Make sure that we get results in the order that we expect - backoff for a 
load of 1 should
+   * less than backoff for 10, which should be less than that for 50.
+   */
+  @Test
+  public void testResultOrdering() {
+    Configuration conf = new Configuration(false);
+    // make the max timeout really high so we get differentiation between load 
factors
+    conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, 
Integer.MAX_VALUE);
+    ExponentialClientBackoffPolicy backoff = new 
ExponentialClientBackoffPolicy(conf);
+
+    ServerStatistics stats = new ServerStatistics();
+    long previous = backoff.getBackoffTime(server, regionname, stats);
+    for (int i = 1; i <= 100; i++) {
+      update(stats, i);
+      long next = backoff.getBackoffTime(server, regionname, stats);
+      assertTrue(
+          "Previous backoff time" + previous + " >= " + next + ", the next 
backoff time for " +
+              "load " + i, previous < next);
+      previous = next;
+    }
+  }
+
+  private void update(ServerStatistics stats, int load) {
+    ClientProtos.RegionLoadStats stat = 
ClientProtos.RegionLoadStats.newBuilder()
+        .setMemstoreLoad
+            (load).build();
+    stats.update(regionname, stat);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java
new file mode 100644
index 0000000..4348100
--- /dev/null
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestDelayingRunner {
+
+  private static final TableName DUMMY_TABLE =
+      TableName.valueOf("DUMMY_TABLE");
+  private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
+  private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
+  private static HRegionInfo hri1 =
+      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testDelayingRunner() throws Exception{
+    MultiAction<Row> ma = new MultiAction<Row>();
+    ma.add(hri1.getRegionName(), new Action<Row>(new Put(DUMMY_BYTES_1), 0));
+    final AtomicLong endTime = new AtomicLong();
+    final long sleepTime = 1000;
+    DelayingRunner runner = new DelayingRunner(sleepTime, 
ma.actions.entrySet().iterator().next());
+    runner.setRunner(new Runnable() {
+      @Override
+      public void run() {
+        endTime.set(EnvironmentEdgeManager.currentTime());
+      }      
+    });
+    long startTime = EnvironmentEdgeManager.currentTime();
+    runner.run();
+    long delay = endTime.get() - startTime;
+    assertTrue("DelayingRunner did not delay long enough", delay >= sleepTime);
+    assertFalse("DelayingRunner delayed too long", delay > sleepTime + 
sleepTime*0.2);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/04a003d6/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index b18e148..22543ae 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1115,6 +1115,12 @@ public final class HConstants {
   public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL =
       "hbase.client.fast.fail.interceptor.impl";
 
+  /** Config key for if the server should send backpressure and if the client 
should listen to
+   * that backpressure from the server */
+  public static final String ENABLE_CLIENT_BACKPRESSURE = 
"hbase.client.backpressure.enabled";
+  public static final boolean DEFAULT_ENABLE_CLIENT_BACKPRESSURE = false;
+
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

Reply via email to