HBASE-12729 Backport HBASE-5162 (Basic client pushback mechanism) to 0.98

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/85e7270b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/85e7270b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/85e7270b

Branch: refs/heads/0.98
Commit: 85e7270b6f8cfb593c1cba137241f2aad986a3e1
Parents: d081756
Author: Andrew Purtell <[email protected]>
Authored: Mon Jan 19 17:02:27 2015 -0800
Committer: Andrew Purtell <[email protected]>
Committed: Mon Jan 19 17:02:27 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       | 199 +++--
 .../hadoop/hbase/client/ClientScanner.java      |   5 +-
 .../hadoop/hbase/client/ClientSmallScanner.java |   4 +-
 .../hadoop/hbase/client/DelayingRunner.java     | 117 +++
 .../apache/hadoop/hbase/client/HBaseAdmin.java  |   4 +-
 .../hadoop/hbase/client/HConnectionManager.java |  32 +-
 .../org/apache/hadoop/hbase/client/HTable.java  |   4 +-
 .../hadoop/hbase/client/HTableMultiplexer.java  |   8 +-
 .../apache/hadoop/hbase/client/MultiAction.java |  17 +
 .../org/apache/hadoop/hbase/client/Result.java  |  17 +
 .../hadoop/hbase/client/ResultStatsUtil.java    |  88 ++
 .../hadoop/hbase/client/RpcRetryingCaller.java  |   4 +-
 .../hbase/client/RpcRetryingCallerFactory.java  |  43 +-
 .../hbase/client/ServerStatisticTracker.java    |  74 ++
 .../hbase/client/StatisticsHConnection.java     |  39 +
 .../client/StatsTrackingRpcRetryingCaller.java  |  63 ++
 .../client/backoff/ClientBackoffPolicy.java     |  42 +
 .../backoff/ClientBackoffPolicyFactory.java     |  59 ++
 .../backoff/ExponentialClientBackoffPolicy.java |  71 ++
 .../hbase/client/backoff/ServerStatistics.java  |  68 ++
 .../hbase/protobuf/ResponseConverter.java       |  19 +-
 .../client/TestClientExponentialBackoff.java    | 110 +++
 .../hadoop/hbase/client/TestDelayingRunner.java |  62 ++
 .../org/apache/hadoop/hbase/HConstants.java     |   5 +
 .../hbase/protobuf/generated/ClientProtos.java  | 808 ++++++++++++++++++-
 hbase-protocol/src/main/protobuf/Client.proto   |  10 +
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |   2 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  33 +-
 .../hbase/regionserver/HRegionServer.java       |  25 +-
 .../regionserver/wal/WALEditsReplaySink.java    |   5 +-
 .../hadoop/hbase/client/TestClientPushback.java | 160 ++++
 31 files changed, 2074 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index d206c71..bd15837 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -281,6 +283,19 @@ class AsyncProcess<CResult> {
    * @param atLeastOne true if we should submit at least a subset.
    */
   public void submit(List<? extends Row> rows, boolean atLeastOne) throws 
InterruptedIOException {
+    submit(rows, atLeastOne, null);
+  }
+
+  /**
+   * Extract from the rows list what we can submit. The rows we can not submit 
are kept in the
+   * list.
+   *
+   * @param rows - the submitted row. Modified by the method: we remove the 
rows we took.
+   * @param atLeastOne true if we should submit at least a subset.
+   * @param batchCallback Batch callback. Only called on success
+   */
+  public void submit(List<? extends Row> rows, boolean atLeastOne,
+      Batch.Callback<CResult> batchCallback) throws InterruptedIOException {
     if (rows.isEmpty()) {
       return;
     }
@@ -331,7 +346,7 @@ class AsyncProcess<CResult> {
     } while (retainedActions.isEmpty() && atLeastOne && !hasError());
 
     HConnectionManager.ServerErrorTracker errorsByServer = 
createServerErrorTracker();
-    sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer);
+    sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer, 
batchCallback);
   }
 
   /**
@@ -520,7 +535,7 @@ class AsyncProcess<CResult> {
     }
 
     if (!actionsByServer.isEmpty()) {
-      sendMultiAction(initialActions, actionsByServer, numAttempt, 
errorsByServer);
+      sendMultiAction(initialActions, actionsByServer, numAttempt, 
errorsByServer, null);
     }
   }
 
@@ -535,61 +550,141 @@ class AsyncProcess<CResult> {
   public void sendMultiAction(final List<Action<Row>> initialActions,
                               Map<HRegionLocation, MultiAction<Row>> 
actionsByServer,
                               final int numAttempt,
-                              final HConnectionManager.ServerErrorTracker 
errorsByServer) {
+                              final HConnectionManager.ServerErrorTracker 
errorsByServer,
+                              Batch.Callback<CResult> batchCallback) {
     // Send the queries and add them to the inProgress list
     // This iteration is by server (the HRegionLocation comparator is by 
server portion only).
     for (Map.Entry<HRegionLocation, MultiAction<Row>> e : 
actionsByServer.entrySet()) {
-      final HRegionLocation loc = e.getKey();
-      final MultiAction<Row> multiAction = e.getValue();
-      incTaskCounters(multiAction.getRegions(), loc.getServerName());
-      Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new 
Runnable() {
-        @Override
-        public void run() {
-          MultiResponse res;
+      HRegionLocation loc = e.getKey();
+      MultiAction<Row> multiAction = e.getValue();
+      Collection<? extends Runnable> runnables = 
getNewMultiActionRunnable(initialActions, loc,
+        multiAction, numAttempt, errorsByServer, batchCallback);
+      for (Runnable runnable: runnables) {
+        try {
+          incTaskCounters(multiAction.getRegions(), loc.getServerName());
+          this.pool.submit(runnable);
+        } catch (RejectedExecutionException ree) {
+          // This should never happen. But as the pool is provided by the end 
user, let's secure
+          //  this a little.
+          decTaskCounters(multiAction.getRegions(), loc.getServerName());
+          LOG.warn("#" + id + ", the task was rejected by the pool. This is 
unexpected." +
+            " Server is " + loc.getServerName(), ree);
+          // We're likely to fail again, but this will increment the attempt 
counter, so it will
+          //  finish.
+          receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, 
ree, errorsByServer);
+        }
+      }
+    }
+  }
+
+  private Runnable getNewSingleServerRunnable(
+      final List<Action<Row>> initialActions,
+      final HRegionLocation loc,
+      final MultiAction<Row> multiAction,
+      final int numAttempt,
+      final HConnectionManager.ServerErrorTracker errorsByServer,
+      final Batch.Callback<CResult> batchCallback) {
+    return new Runnable() {
+      @Override
+      public void run() {
+        MultiResponse res;
+        try {
+          MultiServerCallable<Row> callable = createCallable(loc, multiAction);
           try {
-            MultiServerCallable<Row> callable = createCallable(loc, 
multiAction);
-            try {
-              res = createCaller(callable).callWithoutRetries(callable, 
timeout);
-            } catch (IOException e) {
-              // The service itself failed . It may be an error coming from 
the communication
-              //   layer, but, as well, a functional error raised by the 
server.
-              receiveGlobalFailure(initialActions, multiAction, loc, 
numAttempt, e,
-                  errorsByServer);
-              return;
-            } catch (Throwable t) {
-              // This should not happen. Let's log & retry anyway.
-              LOG.error("#" + id + ", Caught throwable while calling. This is 
unexpected." +
-                  " Retrying. Server is " + loc.getServerName() + ", 
tableName=" + tableName, t);
-              receiveGlobalFailure(initialActions, multiAction, loc, 
numAttempt, t,
-                  errorsByServer);
-              return;
-            }
+            res = createCaller(callable).callWithoutRetries(callable, timeout);
+          } catch (IOException e) {
+            // The service itself failed . It may be an error coming from the 
communication
+            //   layer, but, as well, a functional error raised by the server.
+            receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, 
e,
+                errorsByServer);
+            return;
+          } catch (Throwable t) {
+            // This should not happen. Let's log & retry anyway.
+            LOG.error("#" + id + ", Caught throwable while calling. This is 
unexpected." +
+                " Retrying. Server is " + loc.getServerName() + ", tableName=" 
+ tableName, t);
+            receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, 
t,
+                errorsByServer);
+            return;
+          }
 
-            // Nominal case: we received an answer from the server, and it's 
not an exception.
-            receiveMultiAction(initialActions, multiAction, loc, res, 
numAttempt, errorsByServer);
+          // Nominal case: we received an answer from the server, and it's not 
an exception.
+          receiveMultiAction(initialActions, multiAction, loc, res, 
numAttempt, errorsByServer,
+            batchCallback);
 
-          } finally {
-            decTaskCounters(multiAction.getRegions(), loc.getServerName());
-          }
+        } finally {
+          decTaskCounters(multiAction.getRegions(), loc.getServerName());
         }
-      });
-
-      try {
-        this.pool.submit(runnable);
-      } catch (RejectedExecutionException ree) {
-        // This should never happen. But as the pool is provided by the end 
user, let's secure
-        //  this a little.
-        decTaskCounters(multiAction.getRegions(), loc.getServerName());
-        LOG.warn("#" + id + ", the task was rejected by the pool. This is 
unexpected." +
-            " Server is " + loc.getServerName(), ree);
-        // We're likely to fail again, but this will increment the attempt 
counter, so it will
-        //  finish.
-        receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, 
ree, errorsByServer);
       }
+    };
+  }
+
+  private Collection<? extends Runnable> getNewMultiActionRunnable(
+      final List<Action<Row>> initialActions,
+      final HRegionLocation loc,
+      final MultiAction<Row> multiAction,
+      final int numAttempt,
+      final HConnectionManager.ServerErrorTracker errorsByServer,
+      final Batch.Callback<CResult> batchCallback) {
+    // no stats to manage, just do the standard action
+    if (!(AsyncProcess.this.hConnection instanceof StatisticsHConnection) ||
+        
((StatisticsHConnection)AsyncProcess.this.hConnection).getStatisticsTracker() 
== null) {
+      List<Runnable> toReturn = new ArrayList<Runnable>(1);
+      toReturn.add(Trace.wrap("AsyncProcess.sendMultiAction", 
+        getNewSingleServerRunnable(initialActions, loc, multiAction, 
numAttempt,
+          errorsByServer, batchCallback)));
+      return toReturn;
+    } else {
+      // group the actions by the amount of delay
+      Map<Long, DelayingRunner> actions = new HashMap<Long, 
DelayingRunner>(multiAction
+        .size());
+
+      // split up the actions
+      for (Map.Entry<byte[], List<Action<Row>>> e : 
multiAction.actions.entrySet()) {
+        Long backoff = getBackoff(loc);
+        DelayingRunner runner = actions.get(backoff);
+        if (runner == null) {
+          actions.put(backoff, new DelayingRunner(backoff, e));
+        } else {
+          runner.add(e);
+        }
+      }
+
+      List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
+      for (DelayingRunner runner : actions.values()) {
+        String traceText = "AsyncProcess.sendMultiAction";
+        Runnable runnable = getNewSingleServerRunnable(initialActions, loc, 
runner.getActions(),
+          numAttempt, errorsByServer, batchCallback);
+        // use a delay runner only if we need to sleep for some time
+        if (runner.getSleepTime() > 0) {
+          runner.setRunner(runnable);
+          traceText = "AsyncProcess.clientBackoff.sendMultiAction";
+          runnable = runner;
+        }
+        runnable = Trace.wrap(traceText, runnable);
+        toReturn.add(runnable);
+      }
+      return toReturn;
     }
   }
 
   /**
+   * @param server server location where the target region is hosted
+   * @param regionName name of the region which we are going to write some data
+   * @return the amount of time the client should wait until it submit a 
request to the
+   * specified server and region
+   */
+  private Long getBackoff(HRegionLocation location) {
+    Preconditions.checkState(AsyncProcess.this.hConnection instanceof 
StatisticsHConnection,
+      "AsyncProcess connection should be a StatisticsHConnection");
+    ServerStatisticTracker tracker = 
((StatisticsHConnection)AsyncProcess.this.hConnection)
+      .getStatisticsTracker();
+    ServerStatistics stats = tracker.getStats(location.getServerName());
+    return 
((StatisticsHConnection)AsyncProcess.this.hConnection).getBackoffPolicy()
+      .getBackoffTime(location.getServerName(), 
location.getRegionInfo().getRegionName(),
+        stats);
+  }
+
+  /**
    * Create a callable. Isolated to be easily overridden in the tests.
    */
   protected MultiServerCallable<Row> createCallable(final HRegionLocation 
location,
@@ -738,7 +833,8 @@ class AsyncProcess<CResult> {
   private void receiveMultiAction(List<Action<Row>> initialActions, 
MultiAction<Row> multiAction,
                                   HRegionLocation location,
                                   MultiResponse responses, int numAttempt,
-                                  HConnectionManager.ServerErrorTracker 
errorsByServer) {
+                                  HConnectionManager.ServerErrorTracker 
errorsByServer,
+                                  Batch.Callback<CResult> batchCallback) {
      assert responses != null;
 
     // Success or partial success
@@ -780,12 +876,17 @@ class AsyncProcess<CResult> {
             toReplay.add(correspondingAction);
           }
         } else { // success
-          if (callback != null) {
+          if (callback != null || batchCallback != null) {
             int index = regionResult.getFirst();
             Action<Row> correspondingAction = initialActions.get(index);
             Row row = correspondingAction.getAction();
-            //noinspection unchecked
-            this.callback.success(index, resultsForRS.getKey(), row, (CResult) 
result);
+            if (callback != null) {
+              //noinspection unchecked
+              this.callback.success(index, resultsForRS.getKey(), row, 
(CResult) result);
+            }
+            if (batchCallback != null) {
+              batchCallback.update(resultsForRS.getKey(), row.getRow(), 
(CResult) result);
+            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index bea2a1b..7821f46 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -107,7 +107,10 @@ public class ClientScanner extends AbstractClientScanner {
      */
   public ClientScanner(final Configuration conf, final Scan scan, final 
TableName tableName,
       HConnection connection) throws IOException {
-    this(conf, scan, tableName, connection, 
RpcRetryingCallerFactory.instantiate(conf),
+    this(conf, scan, tableName, connection,
+      RpcRetryingCallerFactory.instantiate(conf,
+        connection instanceof StatisticsHConnection ? 
+          ((StatisticsHConnection)connection).getStatisticsTracker() : null),
         RpcControllerFactory.instantiate(conf));
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
index 31df3b0..ad9380f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
@@ -83,7 +83,9 @@ public class ClientSmallScanner extends ClientScanner {
    */
   public ClientSmallScanner(final Configuration conf, final Scan scan,
       final TableName tableName, HConnection connection) throws IOException {
-    this(conf, scan, tableName, connection, 
RpcRetryingCallerFactory.instantiate(conf),
+    this(conf, scan, tableName, connection, 
RpcRetryingCallerFactory.instantiate(conf,
+      connection instanceof StatisticsHConnection ?
+          ((StatisticsHConnection)connection).getStatisticsTracker() : null),
         RpcControllerFactory.instantiate(conf));
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java
new file mode 100644
index 0000000..22c0461
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * A wrapper for a runnable for a group of actions for a single regionserver.
+ * <p>
+ * This can be used to build up the actions that should be taken and then
+ * </p>
+ * <p>
+ * This class exists to simulate using a ScheduledExecutorService with just a 
regular
+ * ExecutorService and Runnables. It is used for legacy reasons in the the 
client; this could
+ * only be removed if we change the expectations in HTable around the pool the 
client is able to
+ * pass in and even if we deprecate the current APIs would require keeping 
this class around
+ * for the interim to bridge between the legacy ExecutorServices and the 
scheduled pool.
+ * </p>
+ */
[email protected]
+public class DelayingRunner<T> implements Runnable {
+  private static final Log LOG = LogFactory.getLog(DelayingRunner.class);
+
+  private final Object sleepLock = new Object();
+  private boolean triggerWake = false;
+  private long sleepTime;
+  private MultiAction<T> actions = new MultiAction<T>();
+  private Runnable runnable;
+
+  public DelayingRunner(long sleepTime, Entry<byte[], List<Action<T>>> e) {
+    this.sleepTime = sleepTime;
+    add(e);
+  }
+
+  public void setRunner(Runnable runner) {
+    this.runnable = runner;
+  }
+
+  @Override
+  public void run() {
+    if (!sleep()) {
+      LOG.warn(
+          "Interrupted while sleeping for expected sleep time " + sleepTime + 
" ms");
+    }
+    //TODO maybe we should consider switching to a listenableFuture for the 
actual callable and
+    // then handling the results/errors as callbacks. That way we can 
decrement outstanding tasks
+    // even if we get interrupted here, but for now, we still need to run so 
we decrement the
+    // outstanding tasks
+    this.runnable.run();
+  }
+
+  /**
+   * Sleep for an expected amount of time.
+   * <p>
+   * This is nearly a copy of what the Sleeper does, but with the ability to 
know if you
+   * got interrupted while sleeping.
+   * </p>
+   *
+   * @return <tt>true</tt> if the sleep completely entirely successfully,
+   * but otherwise <tt>false</tt> if the sleep was interrupted.
+   */
+  private boolean sleep() {
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    long startTime = now;
+    long waitTime = sleepTime;
+    while (waitTime > 0) {
+      long woke = -1;
+      try {
+        synchronized (sleepLock) {
+          if (triggerWake) break;
+          sleepLock.wait(waitTime);
+        }
+        woke = EnvironmentEdgeManager.currentTimeMillis();
+      } catch (InterruptedException iex) {
+        return false;
+      }
+      // Recalculate waitTime.
+      woke = (woke == -1) ? EnvironmentEdgeManager.currentTimeMillis() : woke;
+      waitTime = waitTime - (woke - startTime);
+    }
+    return true;
+  }
+
+  public void add(Map.Entry<byte[], List<Action<T>>> e) {
+    actions.add(e.getKey(), e.getValue());
+  }
+
+  public MultiAction<T> getActions() {
+    return actions;
+  }
+
+  public long getSleepTime() {
+    return sleepTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index effa069..e3af5e0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -214,7 +214,9 @@ public class HBaseAdmin implements Abortable, Closeable {
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
     this.retryLongerMultiplier = this.conf.getInt(
         "hbase.client.retries.longer.multiplier", 10);
-    this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
+    this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf,
+      connection instanceof StatisticsHConnection ?
+        ((StatisticsHConnection)connection).getStatisticsTracker() : null);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
index da7a122..b06dc6f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
@@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@@ -592,7 +594,7 @@ public class HConnectionManager {
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
       justification="Access to the conncurrent hash map is under a lock so 
should be fine.")
-  public static class HConnectionImplementation implements HConnection, 
Closeable {
+  public static class HConnectionImplementation implements 
StatisticsHConnection, Closeable {
     static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
     private final long pause;
     private final int numTries;
@@ -664,6 +666,11 @@ public class HConnectionManager {
 
     private RpcControllerFactory rpcControllerFactory;
 
+    // single tracker per connection
+    private final ServerStatisticTracker stats;
+
+    private final ClientBackoffPolicy backoffPolicy;
+
     /**
      * Cluster registry of basic info such as clusterid and meta region 
location.
      */
@@ -720,7 +727,7 @@ public class HConnectionManager {
         }
       }
 
-      this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
+      this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, 
this.stats);
       this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
     }
 
@@ -760,13 +767,16 @@ public class HConnectionManager {
         this.nonceGenerator = new NoNonceGenerator();
       }
 
+      this.stats = ServerStatisticTracker.create(conf);
       this.usePrefetch = conf.getBoolean(HConstants.HBASE_CLIENT_PREFETCH,
           HConstants.DEFAULT_HBASE_CLIENT_PREFETCH);
       this.prefetchRegionLimit = conf.getInt(
           HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
           HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
-      this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
       this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
+      this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, 
this.stats);
+      this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
+      
     }
 
     @Override
@@ -2418,11 +2428,12 @@ public class HConnectionManager {
     // For tests.
     protected <R> AsyncProcess createAsyncProcess(TableName tableName, 
ExecutorService pool,
            AsyncProcess.AsyncProcessCallback<R> callback, Configuration conf) {
-      return new AsyncProcess<R>(this, tableName, pool, callback, conf,
-          RpcRetryingCallerFactory.instantiate(conf), 
RpcControllerFactory.instantiate(conf));
+      RpcControllerFactory controllerFactory = 
RpcControllerFactory.instantiate(conf);
+      RpcRetryingCallerFactory callerFactory = 
RpcRetryingCallerFactory.instantiate(conf, this.stats);
+      return new AsyncProcess<R>(this, tableName, pool, callback, conf, 
callerFactory,
+        controllerFactory);
     }
 
-
     /**
      * Fill the result array for the interfaces using it.
      */
@@ -2461,6 +2472,15 @@ public class HConnectionManager {
       }
     }
 
+    @Override
+    public ServerStatisticTracker getStatisticsTracker() {
+      return this.stats;
+    }
+
+    @Override
+    public ClientBackoffPolicy getBackoffPolicy() {
+      return this.backoffPolicy;
+    }
 
     /*
      * Return the number of cached region for a table. It will only be called

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 1d8a037..9cc6d7b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -379,7 +379,9 @@ public class HTable implements HTableInterface {
     this.scannerCaching = tableConfiguration.getScannerCaching();
 
     if (this.rpcCallerFactory == null) {
-      this.rpcCallerFactory = 
RpcRetryingCallerFactory.instantiate(configuration);
+      this.rpcCallerFactory = 
RpcRetryingCallerFactory.instantiate(configuration,
+        this.connection instanceof StatisticsHConnection ?
+           ((StatisticsHConnection)this.connection).getStatisticsTracker() : 
null);
     }
     if (this.rpcControllerFactory == null) {
       this.rpcControllerFactory = 
RpcControllerFactory.instantiate(configuration);

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index 709e5fd..7f5a4ca 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -406,14 +406,16 @@ public class HTableMultiplexer {
     private final ScheduledExecutorService executor;
     private final int maxRetryInQueue;
     private final AtomicInteger retryInQueue = new AtomicInteger(0);
-    
+
     public FlushWorker(Configuration conf, HConnection conn, HRegionLocation 
addr,
         HTableMultiplexer multiplexer, int perRegionServerBufferQueueSize,
         ExecutorService pool, ScheduledExecutorService executor) {
       this.addr = addr;
       this.multiplexer = multiplexer;
       this.queue = new 
LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize);
-      RpcRetryingCallerFactory rpcCallerFactory = 
RpcRetryingCallerFactory.instantiate(conf);
+      RpcRetryingCallerFactory rpcCallerFactory = 
RpcRetryingCallerFactory.instantiate(conf,
+        conn instanceof StatisticsHConnection ?
+          ((StatisticsHConnection)conn).getStatisticsTracker() : null);
       RpcControllerFactory rpcControllerFactory = 
RpcControllerFactory.instantiate(conf);
       this.ap = new AsyncProcess<Object>(conn, null, pool, this, conf, 
rpcCallerFactory,
               rpcControllerFactory);
@@ -519,7 +521,7 @@ public class HTableMultiplexer {
           try {
             HConnectionManager.ServerErrorTracker errorsByServer =
                 new HConnectionManager.ServerErrorTracker(1, 10);
-            ap.sendMultiAction(retainedActions, actionsByServer, 10, 
errorsByServer);
+            ap.sendMultiAction(retainedActions, actionsByServer, 10, 
errorsByServer, null);
             ap.waitUntilDone();
 
             if (ap.hasError()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
index e1cfc9a..0af351f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
@@ -76,6 +76,23 @@ public final class MultiAction<R> {
     rsActions.add(a);
   }
 
+  /**
+   * Add an list of Actions to this container based on it's regionName. If the 
regionName
+   * is wrong, the initial execution will fail, but will be automatically
+   * retried after looking up the correct region.
+   *
+   * @param regionName
+   * @param a
+   */
+  public void add(byte[] regionName, List<Action<R>> a) {
+    List<Action<R>> rsActions = actions.get(regionName);
+    if (rsActions == null) {
+      rsActions = new ArrayList<Action<R>>(a.size());
+      actions.put(regionName, rsActions);
+    }
+    rsActions.addAll(a);
+  }
+
   public void setNonceGroup(long nonceGroup) {
     this.nonceGroup = nonceGroup;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index b995e28..c220718 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -93,6 +94,7 @@ public class Result implements CellScannable, CellScanner {
    * Index for where we are when Result is acting as a {@link CellScanner}.
    */
   private int cellScannerIndex = INITIAL_CELLSCANNER_INDEX;
+  private ClientProtos.RegionLoadStats loadStats;
 
   /**
    * Creates an empty Result w/ no KeyValue payload; returns null if you call 
{@link #rawCells()}.
@@ -839,4 +841,19 @@ public class Result implements CellScannable, CellScanner {
     this.exists = exists;
   }
 
+  /**
+   * Add load information about the region to the information about the result
+   * @param loadStats statistics about the current region from which this was 
returned
+   */
+  public void addResults(ClientProtos.RegionLoadStats loadStats) {
+    this.loadStats = loadStats;
+  }
+
+  /**
+   * @return the associated statistics about the region from which this was 
returned. Can be
+   * <tt>null</tt> if stats are disabled.
+   */
+  public ClientProtos.RegionLoadStats getStats() {
+    return loadStats;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
new file mode 100644
index 0000000..3398d7d
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * A {@link Result} with some statistics about the server/region status
+ */
[email protected]
+public final class ResultStatsUtil {
+
+  private ResultStatsUtil() {
+    //private ctor for util class
+  }
+
+  /**
+   * Update the stats for the specified region if the result is an instance of 
{@link
+   * ResultStatsUtil}
+   *
+   * @param r object that contains the result and possibly the statistics 
about the region
+   * @param serverStats stats tracker to update from the result
+   * @param server server from which the result was obtained
+   * @param regionName full region name for the stats.
+   * @return the underlying {@link Result} if the passed result is an {@link
+   * ResultStatsUtil} or just returns the result;
+   */
+  public static <T> T updateStats(T r, ServerStatisticTracker serverStats,
+      ServerName server, byte[] regionName) {
+    if (!(r instanceof Result)) {
+      return r;
+    }
+    Result result = (Result) r;
+    // early exit if there are no stats to collect
+    ClientProtos.RegionLoadStats stats = result.getStats();
+    if (stats == null) {
+      return r;
+    }
+    serverStats.updateRegionStats(server, regionName, stats);
+    return r;
+  }
+
+  public static <T> T updateStats(T r, ServerStatisticTracker stats,
+      HRegionLocation regionLocation) {
+    // Writes submitted using multi() will receive MultiResponses
+    if (r instanceof MultiResponse) {
+      MultiResponse mr = (MultiResponse) r;
+      for (Map.Entry<byte[], List<Pair<Integer, Object>>> e: 
mr.getResults().entrySet()) {
+        byte[] regionName = e.getKey();
+        for (Pair<Integer, Object> regionResult : e.getValue()) {
+          Object o = regionResult.getSecond();
+          if (o instanceof Result) {
+            Result result = (Result) o;
+            ClientProtos.RegionLoadStats loadStats = result.getStats();
+            if (loadStats != null) {
+              stats.updateRegionStats(regionLocation.getServerName(), 
regionName, loadStats);
+              // Once we have stats for one region we can move on to the next
+              break;
+            }
+          }
+        }
+      }
+    }
+    return r;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
index 9552992..95fbb92 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
@@ -39,9 +39,7 @@ import org.apache.hadoop.ipc.RemoteException;
 import com.google.protobuf.ServiceException;
 
 /**
- * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client
- * threadlocal outstanding timeouts as so we don't persist too much.
- * Dynamic rather than static so can set the generic appropriately.
+ * Dynamic rather than static so can set the generic return type appropriately.
  */
 @InterfaceAudience.Private
 @edu.umd.cs.findbugs.annotations.SuppressWarnings

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
index 53d5c58..dd9c725 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
@@ -19,11 +19,13 @@ package org.apache.hadoop.hbase.client;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 /**
  * Factory to create an {@link RpcRetryingCaller}
  */
[email protected]
 public class RpcRetryingCallerFactory {
 
   /** Configuration key for a custom {@link RpcRetryingCaller} */
@@ -32,31 +34,62 @@ public class RpcRetryingCallerFactory {
   private final long pause;
   private final int retries;
   private final int startLogErrorsCnt;
+  private final boolean enableBackPressure;
+  private ServerStatisticTracker stats;
 
   public RpcRetryingCallerFactory(Configuration conf) {
+    this(conf, null);
+  }
+
+  public RpcRetryingCallerFactory(Configuration conf, ServerStatisticTracker 
stats) {
     this.conf = conf;
+    this.stats = stats;
     pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
     retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
     startLogErrorsCnt = 
conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
         AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
+    enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
+      HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
+  }
+
+  /**
+   * Set the tracker that should be used for tracking statistics about the 
server
+   */
+  public void setStatisticTracker(ServerStatisticTracker statisticTracker) {
+    this.stats = statisticTracker;
   }
 
   public <T> RpcRetryingCaller<T> newCaller() {
     // We store the values in the factory instance. This way, constructing new 
objects
     //  is cheap as it does not require parsing a complex structure.
-    return new RpcRetryingCaller<T>(pause, retries, startLogErrorsCnt);
+    RpcRetryingCaller<T> caller;
+    if (enableBackPressure && this.stats != null) {
+      caller = new StatsTrackingRpcRetryingCaller<T>(pause, retries, 
startLogErrorsCnt,
+        this.stats);
+    } else {
+      caller = new RpcRetryingCaller<T>(pause, retries, startLogErrorsCnt);
+    }
+    return caller;
   }
 
-  public static RpcRetryingCallerFactory instantiate(Configuration 
configuration) {
+  public static RpcRetryingCallerFactory instantiate(Configuration 
configuration,
+      ServerStatisticTracker stats) {
     String clazzName = RpcRetryingCallerFactory.class.getName();
     String rpcCallerFactoryClazz =
         configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, 
clazzName);
     if (rpcCallerFactoryClazz.equals(clazzName)) {
-      return new RpcRetryingCallerFactory(configuration);
+      return new RpcRetryingCallerFactory(configuration, stats);
+    }
+    try {
+      return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
+        new Class[] { Configuration.class, ServerStatisticTracker.class },
+        new Object[] { configuration, stats });
+    } catch (UnsupportedOperationException e) {
+      return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
+        new Class[] { Configuration.class },
+        new Object[] { configuration });
     }
-    return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
-      new Class[] { Configuration.class }, new Object[] { configuration });
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
new file mode 100644
index 0000000..0c7b683
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Tracks the statistics for multiple regions
+ */
[email protected]
+public class ServerStatisticTracker {
+
+  private final Map<ServerName, ServerStatistics> stats =
+      new ConcurrentHashMap<ServerName, ServerStatistics>();
+
+  public void updateRegionStats(ServerName server, byte[] region, 
ClientProtos.RegionLoadStats
+      currentStats) {
+    ServerStatistics stat = stats.get(server);
+
+    if (stat == null) {
+      // create a stats object and update the stats
+      synchronized (this) {
+        stat = stats.get(server);
+        // we don't have stats for that server yet, so we need to make some
+        if (stat == null) {
+          stat = new ServerStatistics();
+          stats.put(server, stat);
+        }
+      }
+    }
+    stat.update(region, currentStats);
+  }
+
+  public ServerStatistics getStats(ServerName server) {
+    return this.stats.get(server);
+  }
+
+  public static ServerStatisticTracker create(Configuration conf) {
+    if (!conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
+        HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE)) {
+      return null;
+    }
+    return new ServerStatisticTracker();
+  }
+
+  @VisibleForTesting
+  ServerStatistics getServerStatsForTesting(ServerName server) {
+    return stats.get(server);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticsHConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticsHConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticsHConnection.java
new file mode 100644
index 0000000..79bf9f7
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticsHConnection.java
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
+
+/**
+ * A server statistics tracking aware HConnection.
+ */
[email protected]
+public interface StatisticsHConnection extends HConnection {
+  /**
+   * @return the current statistics tracker associated with this connection
+   */
+  ServerStatisticTracker getStatisticsTracker();
+
+  /**
+   * @return the configured client backoff policy
+   */
+  ClientBackoffPolicy getBackoffPolicy();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
new file mode 100644
index 0000000..2a5f0ec
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import java.io.IOException;
+
+/**
+ * An {@link RpcRetryingCaller} that will update the per-region stats for the 
call on return,
+ * if stats are available
+ */
[email protected]
+public class StatsTrackingRpcRetryingCaller<T> extends RpcRetryingCaller<T> {
+  private final ServerStatisticTracker stats;
+
+  public StatsTrackingRpcRetryingCaller(long pause, int retries, int 
startLogErrorsCnt,
+      ServerStatisticTracker stats) {
+    super(pause, retries, startLogErrorsCnt);
+    this.stats = stats;
+  }
+
+  @Override
+  public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
+      throws IOException, RuntimeException {
+    T result = super.callWithRetries(callable, callTimeout);
+    return updateStatsAndUnwrap(result, callable);
+  }
+
+  @Override
+  public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
+      throws IOException, RuntimeException {
+    T result = super.callWithRetries(callable, callTimeout);
+    return updateStatsAndUnwrap(result, callable);
+  }
+
+  private T updateStatsAndUnwrap(T result, RetryingCallable<T> callable) {
+    // don't track stats about requests that aren't to regionservers
+    if (!(callable instanceof RegionServerCallable)) {
+      return result;
+    }
+
+    RegionServerCallable<T> regionCallable = (RegionServerCallable) callable;
+    HRegionLocation location = regionCallable.getLocation();
+    return ResultStatsUtil.updateStats(result, stats, location);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java
new file mode 100644
index 0000000..94e434f
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Configurable policy for the amount of time a client should wait for a new 
request to the
+ * server when given the server load statistics.
+ * <p>
+ * Must have a single-argument constructor that takes a {@link 
org.apache.hadoop.conf.Configuration}
+ * </p>
+ */
[email protected]
[email protected]
+public interface ClientBackoffPolicy {
+
+  public static final String BACKOFF_POLICY_CLASS =
+      "hbase.client.statistics.backoff-policy";
+
+  /**
+   * @return the number of ms to wait on the client based on the
+   */
+  public long getBackoffTime(ServerName serverName, byte[] region, 
ServerStatistics stats);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java
new file mode 100644
index 0000000..879a0e2
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
[email protected]
[email protected]
+public final class ClientBackoffPolicyFactory {
+
+  private static final Log LOG = 
LogFactory.getLog(ClientBackoffPolicyFactory.class);
+
+  private ClientBackoffPolicyFactory() {
+  }
+
+  public static ClientBackoffPolicy create(Configuration conf) {
+    // create the backoff policy
+    String className =
+        conf.get(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, 
NoBackoffPolicy.class
+            .getName());
+      return ReflectionUtils.instantiateWithCustomCtor(className,
+          new Class<?>[] { Configuration.class }, new Object[] { conf });
+  }
+
+  /**
+   * Default backoff policy that doesn't create any backoff for the client, 
regardless of load
+   */
+  public static class NoBackoffPolicy implements ClientBackoffPolicy {
+    public NoBackoffPolicy(Configuration conf){
+      // necessary to meet contract
+    }
+
+    @Override
+    public long getBackoffTime(ServerName serverName, byte[] region, 
ServerStatistics stats) {
+      return 0;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java
new file mode 100644
index 0000000..6e75670
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Simple exponential backoff policy on for the client that uses a  percent^4 
times the
+ * max backoff to generate the backoff time.
+ */
[email protected]
[email protected]
+public class ExponentialClientBackoffPolicy implements ClientBackoffPolicy {
+
+  private static final Log LOG = 
LogFactory.getLog(ExponentialClientBackoffPolicy.class);
+
+  private static final long ONE_MINUTE = 60 * 1000;
+  public static final long DEFAULT_MAX_BACKOFF = 5 * ONE_MINUTE;
+  public static final String MAX_BACKOFF_KEY = 
"hbase.client.exponential-backoff.max";
+  private long maxBackoff;
+
+  public ExponentialClientBackoffPolicy(Configuration conf) {
+    this.maxBackoff = conf.getLong(MAX_BACKOFF_KEY, DEFAULT_MAX_BACKOFF);
+  }
+
+  @Override
+  public long getBackoffTime(ServerName serverName, byte[] region, 
ServerStatistics stats) {
+    // no stats for the server yet, so don't backoff
+    if (stats == null) {
+      return 0;
+    }
+
+    ServerStatistics.RegionStatistics regionStats = 
stats.getStatsForRegion(region);
+    // no stats for the region yet - don't backoff
+    if (regionStats == null) {
+      return 0;
+    }
+
+    // square the percent as a value less than 1. Closer we move to 100 
percent,
+    // the percent moves to 1, but squaring causes the exponential curve
+    double percent = regionStats.getMemstoreLoadPercent() / 100.0;
+    double multiplier = Math.pow(percent, 4.0);
+    // shouldn't ever happen, but just incase something changes in the 
statistic data
+    if (multiplier > 1) {
+      LOG.warn("Somehow got a backoff multiplier greater than the allowed 
backoff. Forcing back " +
+          "down to the max backoff");
+      multiplier = 1;
+    }
+    return (long) (multiplier * maxBackoff);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java
new file mode 100644
index 0000000..a3b8e11
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.backoff;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Track the statistics for a single region
+ */
[email protected]
+public class ServerStatistics {
+
+  private Map<byte[], RegionStatistics>
+      stats = new TreeMap<byte[], RegionStatistics>(Bytes.BYTES_COMPARATOR);
+
+  /**
+   * Good enough attempt. Last writer wins. It doesn't really matter which one 
gets to update,
+   * as something gets set
+   * @param region
+   * @param currentStats
+   */
+  public void update(byte[] region, ClientProtos.RegionLoadStats currentStats) 
{
+    RegionStatistics regionStat = this.stats.get(region);
+    if(regionStat == null){
+      regionStat = new RegionStatistics();
+      this.stats.put(region, regionStat);
+    }
+
+    regionStat.update(currentStats);
+  }
+
+  @InterfaceAudience.Private
+  public RegionStatistics getStatsForRegion(byte[] regionName){
+    return stats.get(regionName);
+  }
+
+  public static class RegionStatistics{
+    private int memstoreLoad = 0;
+
+    public void update(ClientProtos.RegionLoadStats currentStats) {
+      this.memstoreLoad = currentStats.getMemstoreLoad();
+    }
+
+    public int getMemstoreLoadPercent(){
+      return this.memstoreLoad;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
index 367d5f9..887c4c0 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
@@ -116,19 +116,23 @@ public final class ResponseConverter {
       }
 
       for (ResultOrException roe : actionResult.getResultOrExceptionList()) {
+        Object responseValue;
         if (roe.hasException()) {
-          results.add(regionName, new Pair<Integer, Object>(roe.getIndex(),
-              ProtobufUtil.toException(roe.getException())));
+          responseValue = ProtobufUtil.toException(roe.getException());
         } else if (roe.hasResult()) {
-          results.add(regionName, new Pair<Integer, Object>(roe.getIndex(),
-              ProtobufUtil.toResult(roe.getResult(), cells)));
+          responseValue = ProtobufUtil.toResult(roe.getResult(), cells);
+          // add the load stats, if we got any
+          if (roe.hasLoadStats()) {
+            ((Result) responseValue).addResults(roe.getLoadStats());
+          }
         } else if (roe.hasServiceResult()) {
-          results.add(regionName, roe.getIndex(), roe.getServiceResult());
+          responseValue = roe.getServiceResult();
         } else {
           // no result & no exception. Unexpected.
           throw new IllegalStateException("No result & no exception roe=" + 
roe +
               " for region " + actions.getRegion());
         }
+        results.add(regionName, roe.getIndex(), responseValue);
       }
     }
 
@@ -151,11 +155,14 @@ public final class ResponseConverter {
    * Wrap a throwable to an action result.
    *
    * @param r
+   * @param stats
    * @return an action result builder
    */
-  public static ResultOrException.Builder buildActionResult(final 
ClientProtos.Result r) {
+  public static ResultOrException.Builder buildActionResult(final 
ClientProtos.Result r,
+      ClientProtos.RegionLoadStats stats) {
     ResultOrException.Builder builder = ResultOrException.newBuilder();
     if (r != null) builder.setResult(r);
+    if (stats != null) builder.setLoadStats(stats);
     return builder;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java
new file mode 100644
index 0000000..24cb661
--- /dev/null
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestClientExponentialBackoff {
+
+  ServerName server = Mockito.mock(ServerName.class);
+  byte[] regionname = Bytes.toBytes("region");
+
+  @Test
+  public void testNulls() {
+    Configuration conf = new Configuration(false);
+    ExponentialClientBackoffPolicy backoff = new 
ExponentialClientBackoffPolicy(conf);
+    assertEquals(0, backoff.getBackoffTime(null, null, null));
+
+    // server name doesn't matter to calculation, but check it now anyways
+    assertEquals(0, backoff.getBackoffTime(server, null, null));
+    assertEquals(0, backoff.getBackoffTime(server, regionname, null));
+
+    // check when no stats for the region yet
+    ServerStatistics stats = new ServerStatistics();
+    assertEquals(0, backoff.getBackoffTime(server, regionname, stats));
+  }
+
+  @Test
+  public void testMaxLoad() {
+    Configuration conf = new Configuration(false);
+    ExponentialClientBackoffPolicy backoff = new 
ExponentialClientBackoffPolicy(conf);
+
+    ServerStatistics stats = new ServerStatistics();
+    update(stats, 100);
+    assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, 
backoff.getBackoffTime(server,
+        regionname, stats));
+
+    // another policy with a different max timeout
+    long max = 100;
+    conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, max);
+    ExponentialClientBackoffPolicy backoffShortTimeout = new 
ExponentialClientBackoffPolicy(conf);
+    assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, 
stats));
+
+    // test beyond 100 still doesn't exceed the max
+    update(stats, 101);
+    assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, 
backoff.getBackoffTime(server,
+        regionname, stats));
+    assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, 
stats));
+
+    // and that when we are below 100, its less than the max timeout
+    update(stats, 99);
+    assertTrue(backoff.getBackoffTime(server,
+        regionname, stats) < 
ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF);
+    assertTrue(backoffShortTimeout.getBackoffTime(server, regionname, stats) < 
max);
+  }
+
+  /**
+   * Make sure that we get results in the order that we expect - backoff for a 
load of 1 should
+   * less than backoff for 10, which should be less than that for 50.
+   */
+  @Test
+  public void testResultOrdering() {
+    Configuration conf = new Configuration(false);
+    // make the max timeout really high so we get differentiation between load 
factors
+    conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, 
Integer.MAX_VALUE);
+    ExponentialClientBackoffPolicy backoff = new 
ExponentialClientBackoffPolicy(conf);
+
+    ServerStatistics stats = new ServerStatistics();
+    long previous = backoff.getBackoffTime(server, regionname, stats);
+    for (int i = 1; i <= 100; i++) {
+      update(stats, i);
+      long next = backoff.getBackoffTime(server, regionname, stats);
+      assertTrue(
+          "Previous backoff time" + previous + " >= " + next + ", the next 
backoff time for " +
+              "load " + i, previous < next);
+      previous = next;
+    }
+  }
+
+  private void update(ServerStatistics stats, int load) {
+    ClientProtos.RegionLoadStats stat = 
ClientProtos.RegionLoadStats.newBuilder()
+        .setMemstoreLoad
+            (load).build();
+    stats.update(regionname, stat);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java
new file mode 100644
index 0000000..308d7ef
--- /dev/null
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestDelayingRunner {
+
+  private static final TableName DUMMY_TABLE =
+      TableName.valueOf("DUMMY_TABLE");
+  private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
+  private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
+  private static HRegionInfo hri1 =
+      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testDelayingRunner() throws Exception{
+    MultiAction<Row> ma = new MultiAction<Row>();
+    ma.add(hri1.getRegionName(), new Action<Row>(new Put(DUMMY_BYTES_1), 0));
+    final AtomicLong endTime = new AtomicLong();
+    final long sleepTime = 1000;
+    DelayingRunner runner = new DelayingRunner(sleepTime, 
ma.actions.entrySet().iterator().next());
+    runner.setRunner(new Runnable() {
+      @Override
+      public void run() {
+        endTime.set(EnvironmentEdgeManager.currentTimeMillis());
+      }      
+    });
+    long startTime = EnvironmentEdgeManager.currentTimeMillis();
+    runner.run();
+    long delay = endTime.get() - startTime;
+    assertTrue("DelayingRunner did not delay long enough", delay >= sleepTime);
+    assertFalse("DelayingRunner delayed too long", delay > sleepTime + 
sleepTime*0.2);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/85e7270b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 269a285..c181ad5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1037,6 +1037,11 @@ public final class HConstants {
   /** Configuration key for setting replication codec class name */
   public static final String REPLICATION_CODEC_CONF_KEY = 
"hbase.replication.rpc.codec";
 
+  /** Config key for if the server should send backpressure and if the client 
should listen to
+   * that backpressure from the server */
+  public static final String ENABLE_CLIENT_BACKPRESSURE = 
"hbase.client.backpressure.enabled";
+  public static final boolean DEFAULT_ENABLE_CLIENT_BACKPRESSURE = false;
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

Reply via email to