HBASE-14946 Don't allow multi's to over run the max result size.

Summary:
* Add VersionInfoUtil to determine if a client has a specified version or better
* Add an exception type to say that the response should be chunked
* Add on client knowledge of retry exceptions
* Add on metrics for how often this happens

Test Plan: Added a unit test

Differential Revision: https://reviews.facebook.net/D51771


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/48e217a7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/48e217a7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/48e217a7

Branch: refs/heads/hbase-12439
Commit: 48e217a7db8c23501ea4934d28e57684b82d71fb
Parents: c15e0af
Author: Elliott Clark <ecl...@apache.org>
Authored: Mon Dec 7 18:33:35 2015 -0800
Committer: Elliott Clark <ecl...@apache.org>
Committed: Thu Dec 10 18:10:32 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/MultiActionResultTooLarge.java |  31 ++++
 .../hadoop/hbase/RetryImmediatelyException.java |  27 ++++
 .../hadoop/hbase/client/AsyncProcess.java       |  86 ++++++++---
 .../hbase/client/ConnectionImplementation.java  |   7 +-
 .../org/apache/hadoop/hbase/client/Result.java  |   3 +
 .../hbase/ipc/MetricsHBaseServerSource.java     |   8 +-
 .../hbase/ipc/MetricsHBaseServerSourceImpl.java |   9 ++
 .../hadoop/hbase/client/VersionInfoUtil.java    |  63 ++++++++
 .../hadoop/hbase/ipc/MetricsHBaseServer.java    |   3 +
 .../apache/hadoop/hbase/ipc/RpcCallContext.java |  23 ++-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |  27 +++-
 .../master/procedure/ProcedurePrepareLatch.java |  23 +--
 .../hbase/regionserver/RSRpcServices.java       | 154 ++++++++++++-------
 .../hbase/client/TestMultiRespectsLimits.java   | 102 ++++++++++++
 14 files changed, 462 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-client/src/main/java/org/apache/hadoop/hbase/MultiActionResultTooLarge.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/MultiActionResultTooLarge.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/MultiActionResultTooLarge.java
new file mode 100644
index 0000000..d06eea1
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/MultiActionResultTooLarge.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+/**
+ * Exception thrown when the result needs to be chunked on the server side.
+ * It signals that retries should happen right away and not count against the 
number of
+ * retries because some of the multi was a success.
+ */
+public class MultiActionResultTooLarge extends RetryImmediatelyException {
+
+  public MultiActionResultTooLarge(String s) {
+    super(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-client/src/main/java/org/apache/hadoop/hbase/RetryImmediatelyException.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/RetryImmediatelyException.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/RetryImmediatelyException.java
new file mode 100644
index 0000000..1b39904
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/RetryImmediatelyException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+public class RetryImmediatelyException extends IOException {
+  public RetryImmediatelyException(String s) {
+    super(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index f1fa3eb..5102ec5 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.RetryImmediatelyException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -126,19 +127,35 @@ class AsyncProcess {
     public void waitUntilDone() throws InterruptedIOException;
   }
 
-  /** Return value from a submit that didn't contain any requests. */
+  /**
+   * Return value from a submit that didn't contain any requests.
+   */
   private static final AsyncRequestFuture NO_REQS_RESULT = new 
AsyncRequestFuture() {
     final Object[] result = new Object[0];
+
     @Override
-    public boolean hasError() { return false; }
+    public boolean hasError() {
+      return false;
+    }
+
     @Override
-    public RetriesExhaustedWithDetailsException getErrors() { return null; }
+    public RetriesExhaustedWithDetailsException getErrors() {
+      return null;
+    }
+
     @Override
-    public List<? extends Row> getFailedOperations() { return null; }
+    public List<? extends Row> getFailedOperations() {
+      return null;
+    }
+
     @Override
-    public Object[] getResults() { return result; }
+    public Object[] getResults() {
+      return result;
+    }
+
     @Override
-    public void waitUntilDone() throws InterruptedIOException {}
+    public void waitUntilDone() throws InterruptedIOException {
+    }
   };
 
   /** Sync point for calls to multiple replicas for the same user request 
(Get).
@@ -308,8 +325,12 @@ class AsyncProcess {
    *         RuntimeException
    */
   private ExecutorService getPool(ExecutorService pool) {
-    if (pool != null) return pool;
-    if (this.pool != null) return this.pool;
+    if (pool != null) {
+      return pool;
+    }
+    if (this.pool != null) {
+      return this.pool;
+    }
     throw new RuntimeException("Neither AsyncProcess nor request have 
ExecutorService");
   }
 
@@ -367,7 +388,9 @@ class AsyncProcess {
         Row r = it.next();
         HRegionLocation loc;
         try {
-          if (r == null) throw new IllegalArgumentException("#" + id + ", row 
cannot be null");
+          if (r == null) {
+            throw new IllegalArgumentException("#" + id + ", row cannot be 
null");
+          }
           // Make sure we get 0-s replica.
           RegionLocations locs = connection.locateRegion(
               tableName, r.getRow(), true, true, 
RegionReplicaUtil.DEFAULT_REPLICA_ID);
@@ -730,10 +753,10 @@ class AsyncProcess {
           // Normal case: we received an answer from the server, and it's not 
an exception.
           receiveMultiAction(multiAction, server, res, numAttempt);
         } catch (Throwable t) {
-              // Something really bad happened. We are on the send thread that 
will now die.
-              LOG.error("Internal AsyncProcess #" + id + " error for "
-                  + tableName + " processing for " + server, t);
-              throw new RuntimeException(t);
+          // Something really bad happened. We are on the send thread that 
will now die.
+          LOG.error("Internal AsyncProcess #" + id + " error for "
+              + tableName + " processing for " + server, t);
+          throw new RuntimeException(t);
         } finally {
           decTaskCounters(multiAction.getRegions(), server);
           if (callsInProgress != null && callable != null) {
@@ -752,19 +775,25 @@ class AsyncProcess {
 
     private final TableName tableName;
     private final AtomicLong actionsInProgress = new AtomicLong(-1);
-    /** The lock controls access to results. It is only held when populating 
results where
+    /**
+     * The lock controls access to results. It is only held when populating 
results where
      * there might be several callers (eventual consistency gets). For other 
requests,
-     * there's one unique call going on per result index. */
+     * there's one unique call going on per result index.
+     */
     private final Object replicaResultLock = new Object();
-    /** Result array.  Null if results are not needed. Otherwise, each index 
corresponds to
+    /**
+     * Result array.  Null if results are not needed. Otherwise, each index 
corresponds to
      * the action index in initial actions submitted. For most request types, 
has null-s for
      * requests that are not done, and result/exception for those that are 
done.
      * For eventual-consistency gets, initially the same applies; at some 
point, replica calls
      * might be started, and ReplicaResultState is put at the corresponding 
indices. The
      * returning calls check the type to detect when this is the case. After 
all calls are done,
-     * ReplicaResultState-s are replaced with results for the user. */
+     * ReplicaResultState-s are replaced with results for the user.
+     */
     private final Object[] results;
-    /** Indices of replica gets in results. If null, all or no actions are 
replica-gets. */
+    /**
+     * Indices of replica gets in results. If null, all or no actions are 
replica-gets.
+     */
     private final int[] replicaGetIndices;
     private final boolean hasAnyReplicaGets;
     private final long nonceGroup;
@@ -779,7 +808,9 @@ class AsyncProcess {
       this.actionsInProgress.set(actions.size());
       if (results != null) {
         assert needResults;
-        if (results.length != actions.size()) throw new 
AssertionError("results.length");
+        if (results.length != actions.size()) {
+          throw new AssertionError("results.length");
+        }
         this.results = results;
         for (int i = 0; i != this.results.length; ++i) {
           results[i] = null;
@@ -1178,9 +1209,13 @@ class AsyncProcess {
       // We have two contradicting needs here:
       //  1) We want to get the new location after having slept, as it may 
change.
       //  2) We want to take into account the location when calculating the 
sleep time.
+      //  3) If all this is just because the response needed to be chunked try 
again FAST.
       // It should be possible to have some heuristics to take the right 
decision. Short term,
       //  we go for one.
-      long backOffTime = errorsByServer.calculateBackoffTime(oldServer, pause);
+      boolean retryImmediately = throwable instanceof 
RetryImmediatelyException;
+      int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
+      long backOffTime = retryImmediately ? 0 :
+          errorsByServer.calculateBackoffTime(oldServer, pause);
       if (numAttempt > startLogErrorsCnt) {
         // We use this value to have some logs when we have multiple failures, 
but not too many
         //  logs, as errors are to be expected when a region moves, splits and 
so on
@@ -1189,14 +1224,16 @@ class AsyncProcess {
       }
 
       try {
-        Thread.sleep(backOffTime);
+        if (backOffTime > 0) {
+          Thread.sleep(backOffTime);
+        }
       } catch (InterruptedException e) {
         LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " 
+ oldServer, e);
         Thread.currentThread().interrupt();
         return;
       }
 
-      groupAndSendMultiAction(toReplay, numAttempt + 1);
+      groupAndSendMultiAction(toReplay, nextAttemptNumber);
     }
 
     private void logNoResubmit(ServerName oldServer, int numAttempt,
@@ -1256,6 +1293,7 @@ class AsyncProcess {
           // Failure: retry if it's make sense else update the errors lists
           if (result == null || result instanceof Throwable) {
             Row row = sentAction.getAction();
+            throwable = ConnectionImplementation.findException(result);
             // Register corresponding failures once per server/once per region.
             if (!regionFailureRegistered) {
               regionFailureRegistered = true;
@@ -1405,7 +1443,9 @@ class AsyncProcess {
       // will either see state with callCount 0 after locking it; or will not 
see state at all
       // we will replace it with the result.
       synchronized (state) {
-        if (state.callCount == 0) return; // someone already set the result
+        if (state.callCount == 0) {
+          return; // someone already set the result
+        }
         state.callCount = 0;
       }
       synchronized (replicaResultLock) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 62a7998..0ef2a17 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -35,8 +35,10 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.MultiActionResultTooLarge;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.RetryImmediatelyException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotEnabledException;
@@ -298,7 +300,8 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
     Throwable cur = (Throwable) exception;
     while (cur != null) {
       if (cur instanceof RegionMovedException || cur instanceof 
RegionOpeningException
-          || cur instanceof RegionTooBusyException || cur instanceof 
ThrottlingException) {
+          || cur instanceof RegionTooBusyException || cur instanceof 
ThrottlingException
+          || cur instanceof RetryImmediatelyException) {
         return cur;
       }
       if (cur instanceof RemoteException) {
@@ -1929,7 +1932,7 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
     Throwable cause = findException(exception);
     if (cause != null) {
       if (cause instanceof RegionTooBusyException || cause instanceof 
RegionOpeningException
-          || cause instanceof ThrottlingException) {
+          || cause instanceof ThrottlingException || cause instanceof 
MultiActionResultTooLarge) {
         // We know that the region is still on this region server
         return;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index 702983b..d2a49c2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -839,6 +839,9 @@ public class Result implements CellScannable, CellScanner {
    */
   public static long getTotalSizeOfCells(Result result) {
     long size = 0;
+    if (result.isEmpty()) {
+      return size;
+    }
     for (Cell c : result.rawCells()) {
       size += CellUtil.estimatedHeapSizeOf(c);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index 5cf71f3..061a672 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -74,6 +74,9 @@ public interface MetricsHBaseServerSource extends BaseSource {
   String EXCEPTIONS_SANITY_NAME="exceptions.FailedSanityCheckException";
   String EXCEPTIONS_MOVED_NAME="exceptions.RegionMovedException";
   String EXCEPTIONS_NSRE_NAME="exceptions.NotServingRegionException";
+  String EXCEPTIONS_MULTI_TOO_LARGE_NAME = "exceptions.multiResponseTooLarge";
+  String EXCEPTIONS_MULTI_TOO_LARGE_DESC = "A response to a multi request was 
too large and the " +
+      "rest of the requests will have to be retried.";
 
   void authorizationSuccess();
 
@@ -96,6 +99,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
   void notServingRegionException();
   void unknownScannerException();
   void tooBusyException();
+  void multiActionTooLargeException();
 
   void sentBytes(long count);
 
@@ -110,4 +114,6 @@ public interface MetricsHBaseServerSource extends 
BaseSource {
   void processedCall(int processingTime);
 
   void queuedAndProcessedCall(int totalTime);
-  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index 8984394..487f9f5 100644
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram;
 public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
     implements MetricsHBaseServerSource {
 
+
   private final MetricsHBaseServerWrapper wrapper;
   private final MutableCounterLong authorizationSuccesses;
   private final MutableCounterLong authorizationFailures;
@@ -47,6 +48,7 @@ public class MetricsHBaseServerSourceImpl extends 
BaseSourceImpl
   private final MutableCounterLong exceptionsSanity;
   private final MutableCounterLong exceptionsNSRE;
   private final MutableCounterLong exceptionsMoved;
+  private final MutableCounterLong exceptionsMultiTooLarge;
 
 
   private MutableHistogram queueCallTime;
@@ -81,6 +83,8 @@ public class MetricsHBaseServerSourceImpl extends 
BaseSourceImpl
         .newCounter(EXCEPTIONS_MOVED_NAME, EXCEPTIONS_TYPE_DESC, 0L);
     this.exceptionsNSRE = this.getMetricsRegistry()
         .newCounter(EXCEPTIONS_NSRE_NAME, EXCEPTIONS_TYPE_DESC, 0L);
+    this.exceptionsMultiTooLarge = this.getMetricsRegistry()
+        .newCounter(EXCEPTIONS_MULTI_TOO_LARGE_NAME, 
EXCEPTIONS_MULTI_TOO_LARGE_DESC, 0L);
 
     this.authenticationSuccesses = this.getMetricsRegistry().newCounter(
         AUTHENTICATION_SUCCESSES_NAME, AUTHENTICATION_SUCCESSES_DESC, 0L);
@@ -160,6 +164,11 @@ public class MetricsHBaseServerSourceImpl extends 
BaseSourceImpl
   }
 
   @Override
+  public void multiActionTooLargeException() {
+    exceptionsMultiTooLarge.incr();
+  }
+
+  @Override
   public void authenticationSuccess() {
     authenticationSuccesses.incr();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
new file mode 100644
index 0000000..c405518
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+
+
+/**
+ * Class to help with parsing the version info.
+ */
+@InterfaceAudience.Private
+public final class VersionInfoUtil {
+
+  private VersionInfoUtil() {
+    /* UTIL CLASS ONLY */
+  }
+
+  public static boolean currentClientHasMinimumVersion(int major, int minor) {
+    RpcCallContext call = RpcServer.getCurrentCall();
+    HBaseProtos.VersionInfo versionInfo = call != null ? 
call.getClientVersionInfo() : null;
+    return hasMinimumVersion(versionInfo, major, minor);
+  }
+
+  public static boolean hasMinimumVersion(HBaseProtos.VersionInfo versionInfo,
+                                          int major,
+                                          int minor) {
+    if (versionInfo != null) {
+      try {
+        String[] components = versionInfo.getVersion().split("\\.");
+
+        int clientMajor = components.length > 0 ? 
Integer.parseInt(components[0]) : 0;
+        if (clientMajor != major) {
+          return clientMajor > major;
+        }
+
+        int clientMinor = components.length > 1 ? 
Integer.parseInt(components[1]) : 0;
+        return clientMinor >= minor;
+      } catch (NumberFormatException e) {
+        return false;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
index d276503..05bebb8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import org.apache.hadoop.hbase.MultiActionResultTooLarge;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.UnknownScannerException;
@@ -105,6 +106,8 @@ public class MetricsHBaseServer {
         source.notServingRegionException();
       } else if (throwable instanceof FailedSanityCheckException) {
         source.failedSanityException();
+      } else if (throwable instanceof MultiActionResultTooLarge) {
+        source.multiActionTooLargeException();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
index 60e5f5d..d14e9b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
@@ -19,10 +19,11 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.net.InetAddress;
 
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
 import org.apache.hadoop.hbase.security.User;
 
-
+@InterfaceAudience.Private
 public interface RpcCallContext extends Delayable {
   /**
    * Check if the caller who made this IPC call has disconnected.
@@ -40,7 +41,7 @@ public interface RpcCallContext extends Delayable {
    * support cellblocks while fielding requests from clients that do not.
    * @return True if the client supports cellblocks, else return all content 
in pb
    */
-  boolean isClientCellBlockSupport();
+  boolean isClientCellBlockSupported();
 
   /**
    * Returns the user credentials associated with the current RPC request or
@@ -71,4 +72,22 @@ public interface RpcCallContext extends Delayable {
    * @param callback
    */
   void setCallBack(RpcCallback callback);
+
+  boolean isRetryImmediatelySupported();
+
+  /**
+   * The size of response cells that have been accumulated so far.
+   * This along with the corresponding increment call is used to ensure that 
multi's or
+   * scans dont get too excessively large
+   */
+  long getResponseCellSize();
+
+  /**
+   * Add on the given amount to the retained cell size.
+   *
+   * This is not thread safe and not synchronized at all. If this is used by 
more than one thread
+   * then everything will break. Since this is called for every row 
synchronization would be too
+   * onerous.
+   */
+  void incrementResponseCellSize(long cellSize);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 0db7383..2bef247 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Operation;
+import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
@@ -317,6 +318,9 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
     private InetAddress remoteAddress;
     private RpcCallback callback;
 
+    private long responseCellSize = 0;
+    private boolean retryImmediatelySupported;
+
     Call(int id, final BlockingService service, final MethodDescriptor md, 
RequestHeader header,
          Message param, CellScanner cellScanner, Connection connection, 
Responder responder,
          long size, TraceInfo tinfo, final InetAddress remoteAddress) {
@@ -336,6 +340,7 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       this.tinfo = tinfo;
       this.user = connection.user;
       this.remoteAddress = remoteAddress;
+      this.retryImmediatelySupported = connection.retryImmediatelySupported;
     }
 
     /**
@@ -521,7 +526,7 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
     }
 
     @Override
-    public boolean isClientCellBlockSupport() {
+    public boolean isClientCellBlockSupported() {
       return this.connection != null && this.connection.codec != null;
     }
 
@@ -538,6 +543,14 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       return this.size;
     }
 
+    public long getResponseCellSize() {
+      return responseCellSize;
+    }
+
+    public void incrementResponseCellSize(long cellSize) {
+      responseCellSize += cellSize;
+    }
+
     /**
      * If we have a response, and delay is not set, then respond
      * immediately.  Otherwise, do not respond to client.  This is
@@ -578,6 +591,11 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
     public void setCallBack(RpcCallback callback) {
       this.callback = callback;
     }
+
+    @Override
+    public boolean isRetryImmediatelySupported() {
+      return retryImmediatelySupported;
+    }
   }
 
   /** Listens on the socket. Creates jobs for the handler threads*/
@@ -1264,6 +1282,8 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
     // was authentication allowed with a fallback to simple auth
     private boolean authenticatedWithFallback;
 
+    private boolean retryImmediatelySupported = false;
+
     public UserGroupInformation attemptingUser = null; // user name before auth
     protected User user = null;
     protected UserGroupInformation ugi = null;
@@ -1720,6 +1740,9 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
         }
       }
       if (connectionHeader.hasVersionInfo()) {
+        // see if this connection will support RetryImmediatelyException
+        retryImmediatelySupported = 
VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
+
         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + 
this.remotePort
             + " with version info: "
             + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
@@ -1727,6 +1750,8 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + 
this.remotePort
             + " with unknown version info");
       }
+
+
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
index 052386a..b13e44d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
@@ -24,10 +24,8 @@ import java.util.concurrent.CountDownLatch;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
 
 /**
  * Latch used by the Master to have the prepare() sync behaviour for old
@@ -44,24 +42,7 @@ public abstract class ProcedurePrepareLatch {
   }
 
   public static boolean hasProcedureSupport() {
-    return currentClientHasMinimumVersion(1, 1);
-  }
-
-  private static boolean currentClientHasMinimumVersion(int major, int minor) {
-    RpcCallContext call = RpcServer.getCurrentCall();
-    VersionInfo versionInfo = call != null ? call.getClientVersionInfo() : 
null;
-    if (versionInfo != null) {
-      String[] components = versionInfo.getVersion().split("\\.");
-
-      int clientMajor = components.length > 0 ? 
Integer.parseInt(components[0]) : 0;
-      if (clientMajor != major) {
-        return clientMajor > major;
-      }
-
-      int clientMinor = components.length > 1 ? 
Integer.parseInt(components[1]) : 0;
-      return clientMinor >= minor;
-    }
-    return false;
+    return VersionInfoUtil.currentClientHasMinimumVersion(1, 1);
   }
 
   protected abstract void countDown(final Procedure proc);

http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 0c9b0e6..bba38f7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MultiActionResultTooLarge;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -437,11 +438,11 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
    */
   private boolean isClientCellBlockSupport() {
     RpcCallContext context = RpcServer.getCurrentCall();
-    return context != null && context.isClientCellBlockSupport();
+    return context != null && context.isClientCellBlockSupported();
   }
 
   private boolean isClientCellBlockSupport(RpcCallContext context) {
-    return context != null && context.isClientCellBlockSupport();
+    return context != null && context.isClientCellBlockSupported();
   }
 
   private void addResult(final MutateResponse.Builder builder, final Result 
result,
@@ -500,13 +501,13 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
         rm = new RowMutations(action.getMutation().getRow().toByteArray());
       }
       switch (type) {
-      case PUT:
-        rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
-        break;
-      case DELETE:
-        rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
-        break;
-      default:
+        case PUT:
+          rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
+          break;
+        case DELETE:
+          rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
+          break;
+        default:
           throw new DoNotRetryIOException("Atomic put and/or delete only, not 
" + type.name());
       }
     }
@@ -543,14 +544,14 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
         rm = new RowMutations(action.getMutation().getRow().toByteArray());
       }
       switch (type) {
-      case PUT:
-        rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
-        break;
-      case DELETE:
-        rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
-        break;
-      default:
-        throw new DoNotRetryIOException("Atomic put and/or delete only, not " 
+ type.name());
+        case PUT:
+          rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
+          break;
+        case DELETE:
+          rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
+          break;
+        default:
+          throw new DoNotRetryIOException("Atomic put and/or delete only, not 
" + type.name());
       }
     }
     return region.checkAndRowMutate(row, family, qualifier, compareOp, 
comparator, rm, Boolean.TRUE);
@@ -655,10 +656,42 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
     // ResultOrException instance that matches each Put or Delete is then 
added down in the
     // doBatchOp call.  We should be staying aligned though the Put and Delete 
are deferred/batched
     List<ClientProtos.Action> mutations = null;
+    long maxQuotaResultSize = Math.min(maxScannerResultSize, 
quota.getReadAvailable());
+    IOException sizeIOE = null;
     for (ClientProtos.Action action : actions.getActionList()) {
       ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
       try {
         Result r = null;
+
+        if (context != null
+            && context.isRetryImmediatelySupported()
+            && context.getResponseCellSize() > maxQuotaResultSize) {
+
+          // We're storing the exception since the exception and reason string 
won't
+          // change after the response size limit is reached.
+          if (sizeIOE == null ) {
+            // We don't need the stack un-winding do don't throw the exception.
+            // Throwing will kill the JVM's JIT.
+            //
+            // Instead just create the exception and then store it.
+            sizeIOE = new MultiActionResultTooLarge("Max response size 
exceeded: "
+                    + context.getResponseCellSize());
+
+            // Only report the exception once since there's only one request 
that
+            // caused the exception. Otherwise this number will dominate the 
exceptions count.
+            rpcServer.getMetrics().exception(sizeIOE);
+          }
+
+          // Now that there's an exception is know to be created
+          // use it for the response.
+          //
+          // This will create a copy in the builder.
+          resultOrExceptionBuilder = ResultOrException.newBuilder().
+              setException(ResponseConverter.buildException(sizeIOE));
+          resultOrExceptionBuilder.setIndex(action.getIndex());
+          builder.addResultOrException(resultOrExceptionBuilder.build());
+          continue;
+        }
         if (action.hasGet()) {
           Get get = ProtobufUtil.toGet(action.getGet());
           if (context != null) {
@@ -690,22 +723,22 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
             mutations.clear();
           }
           switch (type) {
-          case APPEND:
-            r = append(region, quota, action.getMutation(), cellScanner, 
nonceGroup);
-            break;
-          case INCREMENT:
-            r = increment(region, quota, action.getMutation(), cellScanner, 
nonceGroup);
-            break;
-          case PUT:
-          case DELETE:
-            // Collect the individual mutations and apply in a batch
-            if (mutations == null) {
-              mutations = new 
ArrayList<ClientProtos.Action>(actions.getActionCount());
-            }
-            mutations.add(action);
-            break;
-          default:
-            throw new DoNotRetryIOException("Unsupported mutate type: " + 
type.name());
+            case APPEND:
+              r = append(region, quota, action.getMutation(), cellScanner, 
nonceGroup);
+              break;
+            case INCREMENT:
+              r = increment(region, quota, action.getMutation(), cellScanner, 
nonceGroup);
+              break;
+            case PUT:
+            case DELETE:
+              // Collect the individual mutations and apply in a batch
+              if (mutations == null) {
+                mutations = new 
ArrayList<ClientProtos.Action>(actions.getActionCount());
+              }
+              mutations.add(action);
+              break;
+            default:
+              throw new DoNotRetryIOException("Unsupported mutate type: " + 
type.name());
           }
         } else {
           throw new HBaseIOException("Unexpected Action type");
@@ -715,11 +748,16 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
           if (isClientCellBlockSupport(context)) {
             pbResult = ProtobufUtil.toResultNoData(r);
             //  Hard to guess the size here.  Just make a rough guess.
-            if (cellsToReturn == null) cellsToReturn = new 
ArrayList<CellScannable>();
+            if (cellsToReturn == null) {
+              cellsToReturn = new ArrayList<CellScannable>();
+            }
             cellsToReturn.add(r);
           } else {
             pbResult = ProtobufUtil.toResult(r);
           }
+          if (context != null) {
+            context.incrementResponseCellSize(Result.getTotalSizeOfCells(r));
+          }
           resultOrExceptionBuilder =
             ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
         }
@@ -801,8 +839,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
           case SUCCESS:
             builder.addResultOrException(getResultOrException(
-              ClientProtos.Result.getDefaultInstance(), index,
-                ((HRegion)region).getRegionStats()));
+                ClientProtos.Result.getDefaultInstance(), index,
+                ((HRegion) region).getRegionStats()));
             break;
         }
       }
@@ -951,13 +989,13 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
     ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
     try {
       rpcServer = new RpcServer(rs, name, getServices(),
-        bindAddress, // use final bindAddress for this server.
-        rs.conf,
-        rpcSchedulerFactory.create(rs.conf, this, rs));
-    } catch(BindException be) {
+          bindAddress, // use final bindAddress for this server.
+          rs.conf,
+          rpcSchedulerFactory.create(rs.conf, this, rs));
+    } catch (BindException be) {
       String configName = (this instanceof MasterRpcServices) ? 
HConstants.MASTER_PORT :
-        HConstants.REGIONSERVER_PORT;
-        throw new IOException(be.getMessage() + ". To switch ports use the '" 
+ configName +
+          HConstants.REGIONSERVER_PORT;
+      throw new IOException(be.getMessage() + ". To switch ports use the '" + 
configName +
           "' configuration property.", be.getCause() != null ? be.getCause() : 
be);
     }
 
@@ -2106,7 +2144,9 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
     // It is also the conduit via which we pass back data.
     PayloadCarryingRpcController controller = 
(PayloadCarryingRpcController)rpcc;
     CellScanner cellScanner = controller != null ? controller.cellScanner(): 
null;
-    if (controller != null) controller.setCellScanner(null);
+    if (controller != null) {
+      controller.setCellScanner(null);
+    }
 
     long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : 
HConstants.NO_NONCE;
 
@@ -2180,7 +2220,9 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
     if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != 
null) {
       controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
     }
-    if (processed != null) responseBuilder.setProcessed(processed);
+    if (processed != null) {
+      responseBuilder.setProcessed(processed);
+    }
     return responseBuilder.build();
   }
 
@@ -2197,10 +2239,12 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
     // rpc controller is how we bring in data via the back door;  it is 
unprotobuf'ed data.
     // It is also the conduit via which we pass back data.
     PayloadCarryingRpcController controller = 
(PayloadCarryingRpcController)rpcc;
-    CellScanner cellScanner = controller != null? controller.cellScanner(): 
null;
+    CellScanner cellScanner = controller != null ? controller.cellScanner() : 
null;
     OperationQuota quota = null;
     // Clear scanner so we are not holding on to reference across call.
-    if (controller != null) controller.setCellScanner(null);
+    if (controller != null) {
+      controller.setCellScanner(null);
+    }
     try {
       checkOpen();
       requestCount.increment();
@@ -2448,8 +2492,6 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
           // where processing of request takes > lease expiration time.
           lease = regionServer.leases.removeLease(scannerName);
           List<Result> results = new ArrayList<Result>();
-          long totalCellSize = 0;
-          long currentScanResultSize = 0;
 
           boolean done = false;
           // Call coprocessor. Get region info from scanner.
@@ -2459,8 +2501,9 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
             if (!results.isEmpty()) {
               for (Result r : results) {
                 for (Cell cell : r.rawCells()) {
-                  totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
-                  currentScanResultSize += CellUtil.estimatedHeapSizeOf(cell);
+                  if (context != null) {
+                    
context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell));
+                  }
                 }
               }
             }
@@ -2493,7 +2536,7 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
                 // If the coprocessor host is adding to the result list, we 
cannot guarantee the
                 // correct ordering of partial results and so we prevent 
partial results from being
                 // formed.
-                boolean serverGuaranteesOrderOfPartials = 
currentScanResultSize == 0;
+                boolean serverGuaranteesOrderOfPartials = results.isEmpty();
                 boolean allowPartialResults =
                     clientHandlesPartials && serverGuaranteesOrderOfPartials 
&& !isSmallScan;
                 boolean moreRows = false;
@@ -2559,7 +2602,9 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
 
                   if (!values.isEmpty()) {
                     for (Cell cell : values) {
-                      totalCellSize += 
CellUtil.estimatedSerializedSizeOf(cell);
+                      if (context != null) {
+                        
context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell));
+                      }
                     }
                     final boolean partial = 
scannerContext.partialResultFormed();
                     results.add(Result.create(values, null, stale, partial));
@@ -2614,9 +2659,10 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
                 }
               }
               region.updateReadRequestsCount(i);
-              region.getMetrics().updateScanNext(totalCellSize);
+              long responseCellSize = context != null ? 
context.getResponseCellSize() : 0;
+              region.getMetrics().updateScanNext(responseCellSize);
               if (regionServer.metricsRegionServer != null) {
-                
regionServer.metricsRegionServer.updateScannerNext(totalCellSize);
+                
regionServer.metricsRegionServer.updateScannerNext(responseCellSize);
               }
             } finally {
               region.closeRegionOperation();

http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
new file mode 100644
index 0000000..47dd7be
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.CompatibilityFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.metrics.BaseSource;
+import org.apache.hadoop.hbase.test.MetricsAssertHelper;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static junit.framework.TestCase.assertEquals;
+
+/**
+ * This test sets the multi size WAAAAAY low and then checks to make sure that 
gets will still make
+ * progress.
+ */
+@Category({MediumTests.class, ClientTests.class})
+public class TestMultiRespectsLimits {
+  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  private static final MetricsAssertHelper METRICS_ASSERT =
+      CompatibilityFactory.getInstance(MetricsAssertHelper.class);
+  private final static byte[] FAMILY = Bytes.toBytes("D");
+  public static final int MAX_SIZE = 500;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setLong(
+        HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
+        MAX_SIZE);
+
+    // Only start on regionserver so that all regions are on the same server.
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testMultiLimits() throws Exception {
+    final TableName name = TableName.valueOf("testMultiLimits");
+    Table t = TEST_UTIL.createTable(name, FAMILY);
+    TEST_UTIL.loadTable(t, FAMILY, false);
+
+    // Split the table to make sure that the chunking happens accross regions.
+    try (final Admin admin = TEST_UTIL.getHBaseAdmin()) {
+      admin.split(name);
+      TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return admin.getTableRegions(name).size() > 1;
+        }
+      });
+    }
+    List<Get> gets = new ArrayList<>(MAX_SIZE);
+
+    for (int i = 0; i < MAX_SIZE; i++) {
+      gets.add(new Get(HBaseTestingUtility.ROWS[i]));
+    }
+    Result[] results = t.get(gets);
+    assertEquals(MAX_SIZE, results.length);
+    RpcServerInterface rpcServer = 
TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer();
+    BaseSource s = rpcServer.getMetrics().getMetricsSource();
+
+    // Cells from TEST_UTIL.loadTable have a length of 27.
+    // Multiplying by less than that gives an easy lower bound on size.
+    // However in reality each kv is being reported as much higher than that.
+    METRICS_ASSERT.assertCounterGt("exceptions", (MAX_SIZE * 25) / MAX_SIZE, 
s);
+    METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge",
+        (MAX_SIZE * 25) / MAX_SIZE, s);
+  }
+}

Reply via email to