Repository: hbase
Updated Branches:
  refs/heads/branch-1 630ad95c9 -> 421fe24e9


HBASE-15146 Don't block on Reader threads queueing to a scheduler queue


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/421fe24e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/421fe24e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/421fe24e

Branch: refs/heads/branch-1
Commit: 421fe24e9bb925e6199cc02118a5314458caeb38
Parents: 630ad95
Author: Elliott Clark <[email protected]>
Authored: Wed Jan 20 17:43:22 2016 -0800
Committer: Elliott Clark <[email protected]>
Committed: Thu Jan 28 08:11:11 2016 -0500

----------------------------------------------------------------------
 .../hadoop/hbase/CallQueueTooBigException.java  | 33 +++++++
 .../hadoop/hbase/client/AsyncProcess.java       |  3 +-
 .../hadoop/hbase/client/ConnectionManager.java  | 48 +---------
 .../hbase/exceptions/ClientExceptionsUtil.java  | 95 ++++++++++++++++++++
 .../hadoop/hbase/client/TestAsyncProcess.java   | 39 ++++++--
 .../exceptions/TestClientExceptionsUtil.java    | 37 ++++++++
 .../hbase/ipc/BalancedQueueRpcExecutor.java     |  4 +-
 .../hadoop/hbase/ipc/FifoRpcScheduler.java      | 12 ++-
 .../hadoop/hbase/ipc/RWQueueRpcExecutor.java    |  4 +-
 .../apache/hadoop/hbase/ipc/RpcExecutor.java    |  2 +-
 .../apache/hadoop/hbase/ipc/RpcScheduler.java   |  2 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 21 +++--
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    |  8 +-
 .../org/apache/hadoop/hbase/client/TestHCM.java |  3 +-
 14 files changed, 240 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java
new file mode 100644
index 0000000..d07c657
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@SuppressWarnings("serial")
[email protected]
[email protected]
+public class CallQueueTooBigException extends IOException {
+  public CallQueueTooBigException() {
+    super();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index e895a13..4069e49e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -1293,7 +1294,7 @@ class AsyncProcess {
           // Failure: retry if it's make sense else update the errors lists
           if (result == null || result instanceof Throwable) {
             Row row = sentAction.getAction();
-            throwable = ConnectionManager.findException(result);
+            throwable = ClientExceptionsUtil.findException(result);
             // Register corresponding failures once per server/once per region.
             if (!regionFailureRegistered) {
               regionFailureRegistered = true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 69c8767..7a298e2 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -71,6 +71,7 @@ import 
org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
 import org.apache.hadoop.hbase.ipc.RpcClient;
@@ -2216,10 +2217,9 @@ class ConnectionManager {
       }
 
       HRegionInfo regionInfo = oldLocation.getRegionInfo();
-      Throwable cause = findException(exception);
+      Throwable cause = ClientExceptionsUtil.findException(exception);
       if (cause != null) {
-        if (cause instanceof RegionTooBusyException || cause instanceof 
RegionOpeningException
-            || cause instanceof ThrottlingException) {
+        if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
           // We know that the region is still on this region server
           return;
         }
@@ -2698,46 +2698,4 @@ class ConnectionManager {
       }
     }
   }
-
-  /**
-   * Look for an exception we know in the remote exception:
-   * - hadoop.ipc wrapped exceptions
-   * - nested exceptions
-   *
-   * Looks for: RegionMovedException / RegionOpeningException / 
RegionTooBusyException /
-   *            ThrottlingException
-   * @return null if we didn't find the exception, the exception otherwise.
-   */
-  public static Throwable findException(Object exception) {
-    if (exception == null || !(exception instanceof Throwable)) {
-      return null;
-    }
-    Throwable cur = (Throwable) exception;
-    while (cur != null) {
-      if (cur instanceof RegionMovedException || cur instanceof 
RegionOpeningException
-          || cur instanceof RegionTooBusyException || cur instanceof 
ThrottlingException
-          || cur instanceof RetryImmediatelyException) {
-        return cur;
-      }
-      if (cur instanceof RemoteException) {
-        RemoteException re = (RemoteException) cur;
-        cur = re.unwrapRemoteException(
-            RegionOpeningException.class, RegionMovedException.class,
-            RegionTooBusyException.class);
-        if (cur == null) {
-          cur = re.unwrapRemoteException();
-        }
-        // unwrapRemoteException can return the exception given as a parameter 
when it cannot
-        //  unwrap it. In this case, there is no need to look further
-        // noinspection ObjectEquality
-        if (cur == re) {
-          return null;
-        }
-      } else {
-        cur = cur.getCause();
-      }
-    }
-
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java
new file mode 100644
index 0000000..ebf1499
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.exceptions;
+
+import org.apache.hadoop.hbase.CallQueueTooBigException;
+import org.apache.hadoop.hbase.MultiActionResultTooLarge;
+import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.RetryImmediatelyException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.quotas.ThrottlingException;
+import org.apache.hadoop.ipc.RemoteException;
+
[email protected]
[email protected]
+public final class ClientExceptionsUtil {
+
+  private ClientExceptionsUtil() {}
+
+  public static boolean isMetaClearingException(Throwable cur) {
+    cur = findException(cur);
+
+    if (cur == null) {
+      return true;
+    }
+    return !isSpecialException(cur) || (cur instanceof RegionMovedException);
+  }
+
+  public static boolean isSpecialException(Throwable cur) {
+    return (cur instanceof RegionMovedException || cur instanceof 
RegionOpeningException
+        || cur instanceof RegionTooBusyException || cur instanceof 
ThrottlingException
+        || cur instanceof MultiActionResultTooLarge || cur instanceof 
RetryImmediatelyException
+        || cur instanceof CallQueueTooBigException);
+  }
+
+
+  /**
+   * Look for an exception we know in the remote exception:
+   * - hadoop.ipc wrapped exceptions
+   * - nested exceptions
+   *
+   * Looks for: RegionMovedException / RegionOpeningException / 
RegionTooBusyException /
+   *            ThrottlingException
+   * @return null if we didn't find the exception, the exception otherwise.
+   */
+  public static Throwable findException(Object exception) {
+    if (exception == null || !(exception instanceof Throwable)) {
+      return null;
+    }
+    Throwable cur = (Throwable) exception;
+    while (cur != null) {
+      if (isSpecialException(cur)) {
+        return cur;
+      }
+      if (cur instanceof RemoteException) {
+        RemoteException re = (RemoteException) cur;
+        cur = re.unwrapRemoteException(
+            RegionOpeningException.class, RegionMovedException.class,
+            RegionTooBusyException.class);
+        if (cur == null) {
+          cur = re.unwrapRemoteException();
+        }
+        // unwrapRemoteException can return the exception given as a parameter 
when it cannot
+        //  unwrap it. In this case, there is no need to look further
+        // noinspection ObjectEquality
+        if (cur == re) {
+          return cur;
+        }
+      } else if (cur.getCause() != null) {
+        cur = cur.getCause();
+      } else {
+        return cur;
+      }
+    }
+
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index d11fd31..8d1f90b 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.client;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
@@ -215,28 +216,35 @@ public class TestAsyncProcess {
 
   static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
 
-    public CallerWithFailure() {
+    private final IOException e;
+
+    public CallerWithFailure(IOException e) {
       super(100, 100, 9);
+      this.e = e;
     }
 
     @Override
     public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> 
callable, int callTimeout)
         throws IOException, RuntimeException {
-      throw new IOException("test");
+      throw e;
     }
   }
 
+
   static class AsyncProcessWithFailure extends MyAsyncProcess {
 
-    public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf) {
+    private final IOException ioe;
+
+    public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, 
IOException ioe) {
       super(hc, conf, true);
+      this.ioe = ioe;
       serverTrackerTimeout = 1;
     }
 
     @Override
     protected RpcRetryingCaller<MultiResponse> 
createCaller(MultiServerCallable<Row> callable) {
       callsCt.incrementAndGet();
-      return new CallerWithFailure();
+      return new CallerWithFailure(ioe);
     }
   }
 
@@ -830,7 +838,7 @@ public class TestAsyncProcess {
   public void testGlobalErrors() throws IOException {
     ClusterConnection conn = new MyConnectionImpl(conf);
     BufferedMutatorImpl mutator = (BufferedMutatorImpl) 
conn.getBufferedMutator(DUMMY_TABLE);
-    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf);
+    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new 
IOException("test"));
     mutator.ap = ap;
 
     Assert.assertNotNull(mutator.ap.createServerErrorTracker());
@@ -847,6 +855,27 @@ public class TestAsyncProcess {
     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
   }
 
+
+  @Test
+  public void testCallQueueTooLarge() throws IOException {
+    ClusterConnection conn = new MyConnectionImpl(conf);
+    BufferedMutatorImpl mutator = (BufferedMutatorImpl) 
conn.getBufferedMutator(DUMMY_TABLE);
+    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new 
CallQueueTooBigException());
+    mutator.ap = ap;
+
+    Assert.assertNotNull(mutator.ap.createServerErrorTracker());
+
+    Put p = createPut(1, true);
+    mutator.mutate(p);
+
+    try {
+      mutator.flush();
+      Assert.fail();
+    } catch (RetriesExhaustedWithDetailsException expected) {
+    }
+    // Checking that the ErrorsServers came into play and didn't make us stop 
immediately
+    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
+  }
   /**
    * This test simulates multiple regions on 2 servers. We should have 2 multi 
requests and
    *  2 threads: 1 per server, this whatever the number of regions.

http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java
new file mode 100644
index 0000000..968e55c
--- /dev/null
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java
@@ -0,0 +1,37 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.exceptions;
+
+import com.google.protobuf.ServiceException;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("ThrowableInstanceNeverThrown")
+public class TestClientExceptionsUtil {
+
+  @Test
+  public void testFindException() throws Exception {
+    IOException ioe = new IOException("Tesst");
+    ServiceException se = new ServiceException(ioe);
+    assertEquals(ioe, ClientExceptionsUtil.findException(se));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
index 56424df..79b4ec8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
@@ -72,9 +72,9 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
   }
 
   @Override
-  public void dispatch(final CallRunner callTask) throws InterruptedException {
+  public boolean dispatch(final CallRunner callTask) throws 
InterruptedException {
     int queueIndex = balancer.getNextQueue();
-    queues.get(queueIndex).put(callTask);
+    return queues.get(queueIndex).offer(callTask);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
index 621a8ef..5e104eb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * A very simple {@code }RpcScheduler} that serves incoming requests in order.
@@ -34,6 +35,7 @@ public class FifoRpcScheduler extends RpcScheduler {
 
   private final int handlerCount;
   private final int maxQueueLength;
+  private final AtomicInteger queueSize = new AtomicInteger(0);
   private ThreadPoolExecutor executor;
 
   public FifoRpcScheduler(Configuration conf, int handlerCount) {
@@ -65,14 +67,22 @@ public class FifoRpcScheduler extends RpcScheduler {
   }
 
   @Override
-  public void dispatch(final CallRunner task) throws IOException, 
InterruptedException {
+  public boolean dispatch(final CallRunner task) throws IOException, 
InterruptedException {
+    // Executors provide no offer, so make our own.
+    int queued = queueSize.getAndIncrement();
+    if (maxQueueLength > 0 && queued >= maxQueueLength) {
+      queueSize.decrementAndGet();
+      return false;
+    }
     executor.submit(new Runnable() {
       @Override
       public void run() {
         task.setStatus(RpcServer.getStatus());
         task.run();
+        queueSize.decrementAndGet();
       }
     });
+    return true;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 1be8c65..544370d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -160,7 +160,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
   }
 
   @Override
-  public void dispatch(final CallRunner callTask) throws InterruptedException {
+  public boolean dispatch(final CallRunner callTask) throws 
InterruptedException {
     RpcServer.Call call = callTask.getCall();
     int queueIndex;
     if (isWriteRequest(call.getHeader(), call.param)) {
@@ -170,7 +170,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
     } else {
       queueIndex = numWriteQueues + readBalancer.getNextQueue();
     }
-    queues.get(queueIndex).put(callTask);
+    return queues.get(queueIndex).offer(callTask);
   }
 
   private boolean isWriteRequest(final RequestHeader header, final Message 
param) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 27750a7..017bf39 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -86,7 +86,7 @@ public abstract class RpcExecutor {
   public abstract int getQueueLength();
 
   /** Add the request to the executor queue */
-  public abstract void dispatch(final CallRunner callTask) throws 
InterruptedException;
+  public abstract boolean dispatch(final CallRunner callTask) throws 
InterruptedException;
 
   /** Returns the list of request queues */
   protected abstract List<BlockingQueue<CallRunner>> getQueues();

http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
index f273865..fffe7f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
@@ -58,7 +58,7 @@ public abstract class RpcScheduler {
    *
    * @param task the request to be dispatched
    */
-  public abstract void dispatch(CallRunner task) throws IOException, 
InterruptedException;
+  public abstract boolean dispatch(CallRunner task) throws IOException, 
InterruptedException;
 
   /** Retrieves length of the general queue for metrics. */
   public abstract int getGeneralQueueLength();

http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 3b58021..2859ea0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -67,6 +67,7 @@ import javax.security.sasl.SaslServer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -1159,13 +1160,6 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
     }
   }
 
-  @SuppressWarnings("serial")
-  public static class CallQueueTooBigException extends IOException {
-    CallQueueTooBigException() {
-      super();
-    }
-  }
-
   /** Reads calls from a connection and queues them for handling. */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
       value="VO_VOLATILE_INCREMENT",
@@ -1864,7 +1858,18 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
           : null;
       Call call = new Call(id, this.service, md, header, param, cellScanner, 
this, responder,
               totalRequestSize, traceInfo, this.addr);
-      scheduler.dispatch(new CallRunner(RpcServer.this, call));
+
+      if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
+        callQueueSize.add(-1 * call.getSize());
+
+        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+        metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
+        InetSocketAddress address = getListenerAddress();
+        setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
+            "Call queue is full on " + (address != null ? address : "(channel 
closed)") +
+                ", too many items queued ?");
+        responder.doRespond(call);
+      }
     }
 
     private boolean authorizeConnection() throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index b8e9c52..8de714d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -190,15 +190,15 @@ public class SimpleRpcScheduler extends RpcScheduler {
   }
 
   @Override
-  public void dispatch(CallRunner callTask) throws InterruptedException {
+  public boolean dispatch(CallRunner callTask) throws InterruptedException {
     RpcServer.Call call = callTask.getCall();
     int level = priority.getPriority(call.getHeader(), call.param, 
call.getRequestUser());
     if (priorityExecutor != null && level > highPriorityLevel) {
-      priorityExecutor.dispatch(callTask);
+      return priorityExecutor.dispatch(callTask);
     } else if (replicationExecutor != null && level == 
HConstants.REPLICATION_QOS) {
-      replicationExecutor.dispatch(callTask);
+      return replicationExecutor.dispatch(callTask);
     } else {
-      callExecutor.dispatch(callTask);
+      return callExecutor.dispatch(callTask);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 41720be..1669fc4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -61,6 +61,7 @@ import 
org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementatio
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.filter.Filter;
@@ -698,7 +699,7 @@ public class TestHCM {
       Assert.assertArrayEquals(e.getRow(0).getRow(), ROW);
 
       // Check that we unserialized the exception as expected
-      Throwable cause = ConnectionManager.findException(e.getCause(0));
+      Throwable cause = ClientExceptionsUtil.findException(e.getCause(0));
       Assert.assertNotNull(cause);
       Assert.assertTrue(cause instanceof RegionMovedException);
     }

Reply via email to