Repository: hbase Updated Branches: refs/heads/branch-1 630ad95c9 -> 421fe24e9
HBASE-15146 Don't block on Reader threads queueing to a scheduler queue Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/421fe24e Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/421fe24e Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/421fe24e Branch: refs/heads/branch-1 Commit: 421fe24e9bb925e6199cc02118a5314458caeb38 Parents: 630ad95 Author: Elliott Clark <[email protected]> Authored: Wed Jan 20 17:43:22 2016 -0800 Committer: Elliott Clark <[email protected]> Committed: Thu Jan 28 08:11:11 2016 -0500 ---------------------------------------------------------------------- .../hadoop/hbase/CallQueueTooBigException.java | 33 +++++++ .../hadoop/hbase/client/AsyncProcess.java | 3 +- .../hadoop/hbase/client/ConnectionManager.java | 48 +--------- .../hbase/exceptions/ClientExceptionsUtil.java | 95 ++++++++++++++++++++ .../hadoop/hbase/client/TestAsyncProcess.java | 39 ++++++-- .../exceptions/TestClientExceptionsUtil.java | 37 ++++++++ .../hbase/ipc/BalancedQueueRpcExecutor.java | 4 +- .../hadoop/hbase/ipc/FifoRpcScheduler.java | 12 ++- .../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 4 +- .../apache/hadoop/hbase/ipc/RpcExecutor.java | 2 +- .../apache/hadoop/hbase/ipc/RpcScheduler.java | 2 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 21 +++-- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 8 +- .../org/apache/hadoop/hbase/client/TestHCM.java | 3 +- 14 files changed, 240 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java new file mode 100644 index 0000000..d07c657 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@SuppressWarnings("serial") [email protected] [email protected] +public class CallQueueTooBigException extends IOException { + public CallQueueTooBigException() { + super(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index e895a13..4069e49e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -1293,7 +1294,7 @@ class AsyncProcess { // Failure: retry if it's make sense else update the errors lists if (result == null || result instanceof Throwable) { Row row = sentAction.getAction(); - throwable = ConnectionManager.findException(result); + throwable = ClientExceptionsUtil.findException(result); // Register corresponding failures once per server/once per region. if (!regionFailureRegistered) { regionFailureRegistered = true; http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 69c8767..7a298e2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.ipc.RpcClient; @@ -2216,10 +2217,9 @@ class ConnectionManager { } HRegionInfo regionInfo = oldLocation.getRegionInfo(); - Throwable cause = findException(exception); + Throwable cause = ClientExceptionsUtil.findException(exception); if (cause != null) { - if (cause instanceof RegionTooBusyException || cause instanceof RegionOpeningException - || cause instanceof ThrottlingException) { + if (!ClientExceptionsUtil.isMetaClearingException(cause)) { // We know that the region is still on this region server return; } @@ -2698,46 +2698,4 @@ class ConnectionManager { } } } - - /** - * Look for an exception we know in the remote exception: - * - hadoop.ipc wrapped exceptions - * - nested exceptions - * - * Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException / - * ThrottlingException - * @return null if we didn't find the exception, the exception otherwise. - */ - public static Throwable findException(Object exception) { - if (exception == null || !(exception instanceof Throwable)) { - return null; - } - Throwable cur = (Throwable) exception; - while (cur != null) { - if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException - || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException - || cur instanceof RetryImmediatelyException) { - return cur; - } - if (cur instanceof RemoteException) { - RemoteException re = (RemoteException) cur; - cur = re.unwrapRemoteException( - RegionOpeningException.class, RegionMovedException.class, - RegionTooBusyException.class); - if (cur == null) { - cur = re.unwrapRemoteException(); - } - // unwrapRemoteException can return the exception given as a parameter when it cannot - // unwrap it. In this case, there is no need to look further - // noinspection ObjectEquality - if (cur == re) { - return null; - } - } else { - cur = cur.getCause(); - } - } - - return null; - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java new file mode 100644 index 0000000..ebf1499 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.exceptions; + +import org.apache.hadoop.hbase.CallQueueTooBigException; +import org.apache.hadoop.hbase.MultiActionResultTooLarge; +import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.RetryImmediatelyException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.quotas.ThrottlingException; +import org.apache.hadoop.ipc.RemoteException; + [email protected] [email protected] +public final class ClientExceptionsUtil { + + private ClientExceptionsUtil() {} + + public static boolean isMetaClearingException(Throwable cur) { + cur = findException(cur); + + if (cur == null) { + return true; + } + return !isSpecialException(cur) || (cur instanceof RegionMovedException); + } + + public static boolean isSpecialException(Throwable cur) { + return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException + || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException + || cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException + || cur instanceof CallQueueTooBigException); + } + + + /** + * Look for an exception we know in the remote exception: + * - hadoop.ipc wrapped exceptions + * - nested exceptions + * + * Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException / + * ThrottlingException + * @return null if we didn't find the exception, the exception otherwise. + */ + public static Throwable findException(Object exception) { + if (exception == null || !(exception instanceof Throwable)) { + return null; + } + Throwable cur = (Throwable) exception; + while (cur != null) { + if (isSpecialException(cur)) { + return cur; + } + if (cur instanceof RemoteException) { + RemoteException re = (RemoteException) cur; + cur = re.unwrapRemoteException( + RegionOpeningException.class, RegionMovedException.class, + RegionTooBusyException.class); + if (cur == null) { + cur = re.unwrapRemoteException(); + } + // unwrapRemoteException can return the exception given as a parameter when it cannot + // unwrap it. In this case, there is no need to look further + // noinspection ObjectEquality + if (cur == re) { + return cur; + } + } else if (cur.getCause() != null) { + cur = cur.getCause(); + } else { + return cur; + } + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index d11fd31..8d1f90b 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.client; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; @@ -215,28 +216,35 @@ public class TestAsyncProcess { static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{ - public CallerWithFailure() { + private final IOException e; + + public CallerWithFailure(IOException e) { super(100, 100, 9); + this.e = e; } @Override public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout) throws IOException, RuntimeException { - throw new IOException("test"); + throw e; } } + static class AsyncProcessWithFailure extends MyAsyncProcess { - public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf) { + private final IOException ioe; + + public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) { super(hc, conf, true); + this.ioe = ioe; serverTrackerTimeout = 1; } @Override protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) { callsCt.incrementAndGet(); - return new CallerWithFailure(); + return new CallerWithFailure(ioe); } } @@ -830,7 +838,7 @@ public class TestAsyncProcess { public void testGlobalErrors() throws IOException { ClusterConnection conn = new MyConnectionImpl(conf); BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); - AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf); + AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test")); mutator.ap = ap; Assert.assertNotNull(mutator.ap.createServerErrorTracker()); @@ -847,6 +855,27 @@ public class TestAsyncProcess { Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); } + + @Test + public void testCallQueueTooLarge() throws IOException { + ClusterConnection conn = new MyConnectionImpl(conf); + BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); + AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException()); + mutator.ap = ap; + + Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + + Put p = createPut(1, true); + mutator.mutate(p); + + try { + mutator.flush(); + Assert.fail(); + } catch (RetriesExhaustedWithDetailsException expected) { + } + // Checking that the ErrorsServers came into play and didn't make us stop immediately + Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); + } /** * This test simulates multiple regions on 2 servers. We should have 2 multi requests and * 2 threads: 1 per server, this whatever the number of regions. http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java new file mode 100644 index 0000000..968e55c --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java @@ -0,0 +1,37 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.exceptions; + +import com.google.protobuf.ServiceException; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.*; + +@SuppressWarnings("ThrowableInstanceNeverThrown") +public class TestClientExceptionsUtil { + + @Test + public void testFindException() throws Exception { + IOException ioe = new IOException("Tesst"); + ServiceException se = new ServiceException(ioe); + assertEquals(ioe, ClientExceptionsUtil.findException(se)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 56424df..79b4ec8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -72,9 +72,9 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { } @Override - public void dispatch(final CallRunner callTask) throws InterruptedException { + public boolean dispatch(final CallRunner callTask) throws InterruptedException { int queueIndex = balancer.getNextQueue(); - queues.get(queueIndex).put(callTask); + return queues.get(queueIndex).offer(callTask); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index 621a8ef..5e104eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * A very simple {@code }RpcScheduler} that serves incoming requests in order. @@ -34,6 +35,7 @@ public class FifoRpcScheduler extends RpcScheduler { private final int handlerCount; private final int maxQueueLength; + private final AtomicInteger queueSize = new AtomicInteger(0); private ThreadPoolExecutor executor; public FifoRpcScheduler(Configuration conf, int handlerCount) { @@ -65,14 +67,22 @@ public class FifoRpcScheduler extends RpcScheduler { } @Override - public void dispatch(final CallRunner task) throws IOException, InterruptedException { + public boolean dispatch(final CallRunner task) throws IOException, InterruptedException { + // Executors provide no offer, so make our own. + int queued = queueSize.getAndIncrement(); + if (maxQueueLength > 0 && queued >= maxQueueLength) { + queueSize.decrementAndGet(); + return false; + } executor.submit(new Runnable() { @Override public void run() { task.setStatus(RpcServer.getStatus()); task.run(); + queueSize.decrementAndGet(); } }); + return true; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 1be8c65..544370d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -160,7 +160,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { } @Override - public void dispatch(final CallRunner callTask) throws InterruptedException { + public boolean dispatch(final CallRunner callTask) throws InterruptedException { RpcServer.Call call = callTask.getCall(); int queueIndex; if (isWriteRequest(call.getHeader(), call.param)) { @@ -170,7 +170,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { } else { queueIndex = numWriteQueues + readBalancer.getNextQueue(); } - queues.get(queueIndex).put(callTask); + return queues.get(queueIndex).offer(callTask); } private boolean isWriteRequest(final RequestHeader header, final Message param) { http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 27750a7..017bf39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -86,7 +86,7 @@ public abstract class RpcExecutor { public abstract int getQueueLength(); /** Add the request to the executor queue */ - public abstract void dispatch(final CallRunner callTask) throws InterruptedException; + public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException; /** Returns the list of request queues */ protected abstract List<BlockingQueue<CallRunner>> getQueues(); http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java index f273865..fffe7f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java @@ -58,7 +58,7 @@ public abstract class RpcScheduler { * * @param task the request to be dispatched */ - public abstract void dispatch(CallRunner task) throws IOException, InterruptedException; + public abstract boolean dispatch(CallRunner task) throws IOException, InterruptedException; /** Retrieves length of the general queue for metrics. */ public abstract int getGeneralQueueLength(); http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 3b58021..2859ea0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -67,6 +67,7 @@ import javax.security.sasl.SaslServer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -1159,13 +1160,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } - @SuppressWarnings("serial") - public static class CallQueueTooBigException extends IOException { - CallQueueTooBigException() { - super(); - } - } - /** Reads calls from a connection and queues them for handling. */ @edu.umd.cs.findbugs.annotations.SuppressWarnings( value="VO_VOLATILE_INCREMENT", @@ -1864,7 +1858,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { : null; Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, totalRequestSize, traceInfo, this.addr); - scheduler.dispatch(new CallRunner(RpcServer.this, call)); + + if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) { + callQueueSize.add(-1 * call.getSize()); + + ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); + metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); + InetSocketAddress address = getListenerAddress(); + setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION, + "Call queue is full on " + (address != null ? address : "(channel closed)") + + ", too many items queued ?"); + responder.doRespond(call); + } } private boolean authorizeConnection() throws IOException { http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index b8e9c52..8de714d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -190,15 +190,15 @@ public class SimpleRpcScheduler extends RpcScheduler { } @Override - public void dispatch(CallRunner callTask) throws InterruptedException { + public boolean dispatch(CallRunner callTask) throws InterruptedException { RpcServer.Call call = callTask.getCall(); int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser()); if (priorityExecutor != null && level > highPriorityLevel) { - priorityExecutor.dispatch(callTask); + return priorityExecutor.dispatch(callTask); } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { - replicationExecutor.dispatch(callTask); + return replicationExecutor.dispatch(callTask); } else { - callExecutor.dispatch(callTask); + return callExecutor.dispatch(callTask); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/421fe24e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 41720be..1669fc4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementatio import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.filter.Filter; @@ -698,7 +699,7 @@ public class TestHCM { Assert.assertArrayEquals(e.getRow(0).getRow(), ROW); // Check that we unserialized the exception as expected - Throwable cause = ConnectionManager.findException(e.getCause(0)); + Throwable cause = ClientExceptionsUtil.findException(e.getCause(0)); Assert.assertNotNull(cause); Assert.assertTrue(cause instanceof RegionMovedException); }
