HBASE-14946 Don't allow multi's to over run the max result size. Summary: * Add VersionInfoUtil to determine if a client has a specified version or better * Add an exception type to say that the response should be chunked * Add on client knowledge of retry exceptions * Add on metrics for how often this happens
Test Plan: Added a unit test Differential Revision: https://reviews.facebook.net/D51771 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/48e217a7 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/48e217a7 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/48e217a7 Branch: refs/heads/hbase-12439 Commit: 48e217a7db8c23501ea4934d28e57684b82d71fb Parents: c15e0af Author: Elliott Clark <ecl...@apache.org> Authored: Mon Dec 7 18:33:35 2015 -0800 Committer: Elliott Clark <ecl...@apache.org> Committed: Thu Dec 10 18:10:32 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/MultiActionResultTooLarge.java | 31 ++++ .../hadoop/hbase/RetryImmediatelyException.java | 27 ++++ .../hadoop/hbase/client/AsyncProcess.java | 86 ++++++++--- .../hbase/client/ConnectionImplementation.java | 7 +- .../org/apache/hadoop/hbase/client/Result.java | 3 + .../hbase/ipc/MetricsHBaseServerSource.java | 8 +- .../hbase/ipc/MetricsHBaseServerSourceImpl.java | 9 ++ .../hadoop/hbase/client/VersionInfoUtil.java | 63 ++++++++ .../hadoop/hbase/ipc/MetricsHBaseServer.java | 3 + .../apache/hadoop/hbase/ipc/RpcCallContext.java | 23 ++- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 27 +++- .../master/procedure/ProcedurePrepareLatch.java | 23 +-- .../hbase/regionserver/RSRpcServices.java | 154 ++++++++++++------- .../hbase/client/TestMultiRespectsLimits.java | 102 ++++++++++++ 14 files changed, 462 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-client/src/main/java/org/apache/hadoop/hbase/MultiActionResultTooLarge.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MultiActionResultTooLarge.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MultiActionResultTooLarge.java new file mode 100644 index 0000000..d06eea1 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MultiActionResultTooLarge.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +/** + * Exception thrown when the result needs to be chunked on the server side. + * It signals that retries should happen right away and not count against the number of + * retries because some of the multi was a success. + */ +public class MultiActionResultTooLarge extends RetryImmediatelyException { + + public MultiActionResultTooLarge(String s) { + super(s); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-client/src/main/java/org/apache/hadoop/hbase/RetryImmediatelyException.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RetryImmediatelyException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RetryImmediatelyException.java new file mode 100644 index 0000000..1b39904 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RetryImmediatelyException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.IOException; + +public class RetryImmediatelyException extends IOException { + public RetryImmediatelyException(String s) { + super(s); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index f1fa3eb..5102ec5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.RetryImmediatelyException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -126,19 +127,35 @@ class AsyncProcess { public void waitUntilDone() throws InterruptedIOException; } - /** Return value from a submit that didn't contain any requests. */ + /** + * Return value from a submit that didn't contain any requests. + */ private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() { final Object[] result = new Object[0]; + @Override - public boolean hasError() { return false; } + public boolean hasError() { + return false; + } + @Override - public RetriesExhaustedWithDetailsException getErrors() { return null; } + public RetriesExhaustedWithDetailsException getErrors() { + return null; + } + @Override - public List<? extends Row> getFailedOperations() { return null; } + public List<? extends Row> getFailedOperations() { + return null; + } + @Override - public Object[] getResults() { return result; } + public Object[] getResults() { + return result; + } + @Override - public void waitUntilDone() throws InterruptedIOException {} + public void waitUntilDone() throws InterruptedIOException { + } }; /** Sync point for calls to multiple replicas for the same user request (Get). @@ -308,8 +325,12 @@ class AsyncProcess { * RuntimeException */ private ExecutorService getPool(ExecutorService pool) { - if (pool != null) return pool; - if (this.pool != null) return this.pool; + if (pool != null) { + return pool; + } + if (this.pool != null) { + return this.pool; + } throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService"); } @@ -367,7 +388,9 @@ class AsyncProcess { Row r = it.next(); HRegionLocation loc; try { - if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null"); + if (r == null) { + throw new IllegalArgumentException("#" + id + ", row cannot be null"); + } // Make sure we get 0-s replica. RegionLocations locs = connection.locateRegion( tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); @@ -730,10 +753,10 @@ class AsyncProcess { // Normal case: we received an answer from the server, and it's not an exception. receiveMultiAction(multiAction, server, res, numAttempt); } catch (Throwable t) { - // Something really bad happened. We are on the send thread that will now die. - LOG.error("Internal AsyncProcess #" + id + " error for " - + tableName + " processing for " + server, t); - throw new RuntimeException(t); + // Something really bad happened. We are on the send thread that will now die. + LOG.error("Internal AsyncProcess #" + id + " error for " + + tableName + " processing for " + server, t); + throw new RuntimeException(t); } finally { decTaskCounters(multiAction.getRegions(), server); if (callsInProgress != null && callable != null) { @@ -752,19 +775,25 @@ class AsyncProcess { private final TableName tableName; private final AtomicLong actionsInProgress = new AtomicLong(-1); - /** The lock controls access to results. It is only held when populating results where + /** + * The lock controls access to results. It is only held when populating results where * there might be several callers (eventual consistency gets). For other requests, - * there's one unique call going on per result index. */ + * there's one unique call going on per result index. + */ private final Object replicaResultLock = new Object(); - /** Result array. Null if results are not needed. Otherwise, each index corresponds to + /** + * Result array. Null if results are not needed. Otherwise, each index corresponds to * the action index in initial actions submitted. For most request types, has null-s for * requests that are not done, and result/exception for those that are done. * For eventual-consistency gets, initially the same applies; at some point, replica calls * might be started, and ReplicaResultState is put at the corresponding indices. The * returning calls check the type to detect when this is the case. After all calls are done, - * ReplicaResultState-s are replaced with results for the user. */ + * ReplicaResultState-s are replaced with results for the user. + */ private final Object[] results; - /** Indices of replica gets in results. If null, all or no actions are replica-gets. */ + /** + * Indices of replica gets in results. If null, all or no actions are replica-gets. + */ private final int[] replicaGetIndices; private final boolean hasAnyReplicaGets; private final long nonceGroup; @@ -779,7 +808,9 @@ class AsyncProcess { this.actionsInProgress.set(actions.size()); if (results != null) { assert needResults; - if (results.length != actions.size()) throw new AssertionError("results.length"); + if (results.length != actions.size()) { + throw new AssertionError("results.length"); + } this.results = results; for (int i = 0; i != this.results.length; ++i) { results[i] = null; @@ -1178,9 +1209,13 @@ class AsyncProcess { // We have two contradicting needs here: // 1) We want to get the new location after having slept, as it may change. // 2) We want to take into account the location when calculating the sleep time. + // 3) If all this is just because the response needed to be chunked try again FAST. // It should be possible to have some heuristics to take the right decision. Short term, // we go for one. - long backOffTime = errorsByServer.calculateBackoffTime(oldServer, pause); + boolean retryImmediately = throwable instanceof RetryImmediatelyException; + int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1; + long backOffTime = retryImmediately ? 0 : + errorsByServer.calculateBackoffTime(oldServer, pause); if (numAttempt > startLogErrorsCnt) { // We use this value to have some logs when we have multiple failures, but not too many // logs, as errors are to be expected when a region moves, splits and so on @@ -1189,14 +1224,16 @@ class AsyncProcess { } try { - Thread.sleep(backOffTime); + if (backOffTime > 0) { + Thread.sleep(backOffTime); + } } catch (InterruptedException e) { LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e); Thread.currentThread().interrupt(); return; } - groupAndSendMultiAction(toReplay, numAttempt + 1); + groupAndSendMultiAction(toReplay, nextAttemptNumber); } private void logNoResubmit(ServerName oldServer, int numAttempt, @@ -1256,6 +1293,7 @@ class AsyncProcess { // Failure: retry if it's make sense else update the errors lists if (result == null || result instanceof Throwable) { Row row = sentAction.getAction(); + throwable = ConnectionImplementation.findException(result); // Register corresponding failures once per server/once per region. if (!regionFailureRegistered) { regionFailureRegistered = true; @@ -1405,7 +1443,9 @@ class AsyncProcess { // will either see state with callCount 0 after locking it; or will not see state at all // we will replace it with the result. synchronized (state) { - if (state.callCount == 0) return; // someone already set the result + if (state.callCount == 0) { + return; // someone already set the result + } state.callCount = 0; } synchronized (replicaResultLock) { http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 62a7998..0ef2a17 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -35,8 +35,10 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.MultiActionResultTooLarge; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.RetryImmediatelyException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; @@ -298,7 +300,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { Throwable cur = (Throwable) exception; while (cur != null) { if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException - || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException) { + || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException + || cur instanceof RetryImmediatelyException) { return cur; } if (cur instanceof RemoteException) { @@ -1929,7 +1932,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { Throwable cause = findException(exception); if (cause != null) { if (cause instanceof RegionTooBusyException || cause instanceof RegionOpeningException - || cause instanceof ThrottlingException) { + || cause instanceof ThrottlingException || cause instanceof MultiActionResultTooLarge) { // We know that the region is still on this region server return; } http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index 702983b..d2a49c2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -839,6 +839,9 @@ public class Result implements CellScannable, CellScanner { */ public static long getTotalSizeOfCells(Result result) { long size = 0; + if (result.isEmpty()) { + return size; + } for (Cell c : result.rawCells()) { size += CellUtil.estimatedHeapSizeOf(c); } http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java index 5cf71f3..061a672 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java @@ -74,6 +74,9 @@ public interface MetricsHBaseServerSource extends BaseSource { String EXCEPTIONS_SANITY_NAME="exceptions.FailedSanityCheckException"; String EXCEPTIONS_MOVED_NAME="exceptions.RegionMovedException"; String EXCEPTIONS_NSRE_NAME="exceptions.NotServingRegionException"; + String EXCEPTIONS_MULTI_TOO_LARGE_NAME = "exceptions.multiResponseTooLarge"; + String EXCEPTIONS_MULTI_TOO_LARGE_DESC = "A response to a multi request was too large and the " + + "rest of the requests will have to be retried."; void authorizationSuccess(); @@ -96,6 +99,7 @@ public interface MetricsHBaseServerSource extends BaseSource { void notServingRegionException(); void unknownScannerException(); void tooBusyException(); + void multiActionTooLargeException(); void sentBytes(long count); @@ -110,4 +114,6 @@ public interface MetricsHBaseServerSource extends BaseSource { void processedCall(int processingTime); void queuedAndProcessedCall(int totalTime); - } + + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index 8984394..487f9f5 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram; public class MetricsHBaseServerSourceImpl extends BaseSourceImpl implements MetricsHBaseServerSource { + private final MetricsHBaseServerWrapper wrapper; private final MutableCounterLong authorizationSuccesses; private final MutableCounterLong authorizationFailures; @@ -47,6 +48,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl private final MutableCounterLong exceptionsSanity; private final MutableCounterLong exceptionsNSRE; private final MutableCounterLong exceptionsMoved; + private final MutableCounterLong exceptionsMultiTooLarge; private MutableHistogram queueCallTime; @@ -81,6 +83,8 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl .newCounter(EXCEPTIONS_MOVED_NAME, EXCEPTIONS_TYPE_DESC, 0L); this.exceptionsNSRE = this.getMetricsRegistry() .newCounter(EXCEPTIONS_NSRE_NAME, EXCEPTIONS_TYPE_DESC, 0L); + this.exceptionsMultiTooLarge = this.getMetricsRegistry() + .newCounter(EXCEPTIONS_MULTI_TOO_LARGE_NAME, EXCEPTIONS_MULTI_TOO_LARGE_DESC, 0L); this.authenticationSuccesses = this.getMetricsRegistry().newCounter( AUTHENTICATION_SUCCESSES_NAME, AUTHENTICATION_SUCCESSES_DESC, 0L); @@ -160,6 +164,11 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl } @Override + public void multiActionTooLargeException() { + exceptionsMultiTooLarge.incr(); + } + + @Override public void authenticationSuccess() { authenticationSuccesses.incr(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java new file mode 100644 index 0000000..c405518 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.RpcCallContext; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; + + +/** + * Class to help with parsing the version info. + */ +@InterfaceAudience.Private +public final class VersionInfoUtil { + + private VersionInfoUtil() { + /* UTIL CLASS ONLY */ + } + + public static boolean currentClientHasMinimumVersion(int major, int minor) { + RpcCallContext call = RpcServer.getCurrentCall(); + HBaseProtos.VersionInfo versionInfo = call != null ? call.getClientVersionInfo() : null; + return hasMinimumVersion(versionInfo, major, minor); + } + + public static boolean hasMinimumVersion(HBaseProtos.VersionInfo versionInfo, + int major, + int minor) { + if (versionInfo != null) { + try { + String[] components = versionInfo.getVersion().split("\\."); + + int clientMajor = components.length > 0 ? Integer.parseInt(components[0]) : 0; + if (clientMajor != major) { + return clientMajor > major; + } + + int clientMinor = components.length > 1 ? Integer.parseInt(components[1]) : 0; + return clientMinor >= minor; + } catch (NumberFormatException e) { + return false; + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java index d276503..05bebb8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.ipc; +import org.apache.hadoop.hbase.MultiActionResultTooLarge; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.UnknownScannerException; @@ -105,6 +106,8 @@ public class MetricsHBaseServer { source.notServingRegionException(); } else if (throwable instanceof FailedSanityCheckException) { source.failedSanityException(); + } else if (throwable instanceof MultiActionResultTooLarge) { + source.multiActionTooLargeException(); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java index 60e5f5d..d14e9b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java @@ -19,10 +19,11 @@ package org.apache.hadoop.hbase.ipc; import java.net.InetAddress; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo; import org.apache.hadoop.hbase.security.User; - +@InterfaceAudience.Private public interface RpcCallContext extends Delayable { /** * Check if the caller who made this IPC call has disconnected. @@ -40,7 +41,7 @@ public interface RpcCallContext extends Delayable { * support cellblocks while fielding requests from clients that do not. * @return True if the client supports cellblocks, else return all content in pb */ - boolean isClientCellBlockSupport(); + boolean isClientCellBlockSupported(); /** * Returns the user credentials associated with the current RPC request or @@ -71,4 +72,22 @@ public interface RpcCallContext extends Delayable { * @param callback */ void setCallBack(RpcCallback callback); + + boolean isRetryImmediatelySupported(); + + /** + * The size of response cells that have been accumulated so far. + * This along with the corresponding increment call is used to ensure that multi's or + * scans dont get too excessively large + */ + long getResponseCellSize(); + + /** + * Add on the given amount to the retained cell size. + * + * This is not thread safe and not synchronized at all. If this is used by more than one thread + * then everything will break. Since this is called for every row synchronization would be too + * onerous. + */ + void incrementResponseCellSize(long cellSize); } http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 0db7383..2bef247 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Operation; +import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.RegionMovedException; @@ -317,6 +318,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private InetAddress remoteAddress; private RpcCallback callback; + private long responseCellSize = 0; + private boolean retryImmediatelySupported; + Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, Connection connection, Responder responder, long size, TraceInfo tinfo, final InetAddress remoteAddress) { @@ -336,6 +340,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.tinfo = tinfo; this.user = connection.user; this.remoteAddress = remoteAddress; + this.retryImmediatelySupported = connection.retryImmediatelySupported; } /** @@ -521,7 +526,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } @Override - public boolean isClientCellBlockSupport() { + public boolean isClientCellBlockSupported() { return this.connection != null && this.connection.codec != null; } @@ -538,6 +543,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return this.size; } + public long getResponseCellSize() { + return responseCellSize; + } + + public void incrementResponseCellSize(long cellSize) { + responseCellSize += cellSize; + } + /** * If we have a response, and delay is not set, then respond * immediately. Otherwise, do not respond to client. This is @@ -578,6 +591,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { public void setCallBack(RpcCallback callback) { this.callback = callback; } + + @Override + public boolean isRetryImmediatelySupported() { + return retryImmediatelySupported; + } } /** Listens on the socket. Creates jobs for the handler threads*/ @@ -1264,6 +1282,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // was authentication allowed with a fallback to simple auth private boolean authenticatedWithFallback; + private boolean retryImmediatelySupported = false; + public UserGroupInformation attemptingUser = null; // user name before auth protected User user = null; protected UserGroupInformation ugi = null; @@ -1720,6 +1740,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } if (connectionHeader.hasVersionInfo()) { + // see if this connection will support RetryImmediatelyException + retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2); + AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort + " with version info: " + TextFormat.shortDebugString(connectionHeader.getVersionInfo())); @@ -1727,6 +1750,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort + " with unknown version info"); } + + } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java index 052386a..b13e44d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java @@ -24,10 +24,8 @@ import java.util.concurrent.CountDownLatch; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.ipc.RpcServer; -import org.apache.hadoop.hbase.ipc.RpcCallContext; +import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo; /** * Latch used by the Master to have the prepare() sync behaviour for old @@ -44,24 +42,7 @@ public abstract class ProcedurePrepareLatch { } public static boolean hasProcedureSupport() { - return currentClientHasMinimumVersion(1, 1); - } - - private static boolean currentClientHasMinimumVersion(int major, int minor) { - RpcCallContext call = RpcServer.getCurrentCall(); - VersionInfo versionInfo = call != null ? call.getClientVersionInfo() : null; - if (versionInfo != null) { - String[] components = versionInfo.getVersion().split("\\."); - - int clientMajor = components.length > 0 ? Integer.parseInt(components[0]) : 0; - if (clientMajor != major) { - return clientMajor > major; - } - - int clientMinor = components.length > 1 ? Integer.parseInt(components[1]) : 0; - return clientMinor >= minor; - } - return false; + return VersionInfoUtil.currentClientHasMinimumVersion(1, 1); } protected abstract void countDown(final Procedure proc); http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 0c9b0e6..bba38f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MultiActionResultTooLarge; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -437,11 +438,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private boolean isClientCellBlockSupport() { RpcCallContext context = RpcServer.getCurrentCall(); - return context != null && context.isClientCellBlockSupport(); + return context != null && context.isClientCellBlockSupported(); } private boolean isClientCellBlockSupport(RpcCallContext context) { - return context != null && context.isClientCellBlockSupport(); + return context != null && context.isClientCellBlockSupported(); } private void addResult(final MutateResponse.Builder builder, final Result result, @@ -500,13 +501,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rm = new RowMutations(action.getMutation().getRow().toByteArray()); } switch (type) { - case PUT: - rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner)); - break; - case DELETE: - rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); - break; - default: + case PUT: + rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner)); + break; + case DELETE: + rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); + break; + default: throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); } } @@ -543,14 +544,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rm = new RowMutations(action.getMutation().getRow().toByteArray()); } switch (type) { - case PUT: - rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner)); - break; - case DELETE: - rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); - break; - default: - throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); + case PUT: + rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner)); + break; + case DELETE: + rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); + break; + default: + throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); } } return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE); @@ -655,10 +656,42 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // ResultOrException instance that matches each Put or Delete is then added down in the // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched List<ClientProtos.Action> mutations = null; + long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); + IOException sizeIOE = null; for (ClientProtos.Action action : actions.getActionList()) { ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null; try { Result r = null; + + if (context != null + && context.isRetryImmediatelySupported() + && context.getResponseCellSize() > maxQuotaResultSize) { + + // We're storing the exception since the exception and reason string won't + // change after the response size limit is reached. + if (sizeIOE == null ) { + // We don't need the stack un-winding do don't throw the exception. + // Throwing will kill the JVM's JIT. + // + // Instead just create the exception and then store it. + sizeIOE = new MultiActionResultTooLarge("Max response size exceeded: " + + context.getResponseCellSize()); + + // Only report the exception once since there's only one request that + // caused the exception. Otherwise this number will dominate the exceptions count. + rpcServer.getMetrics().exception(sizeIOE); + } + + // Now that there's an exception is know to be created + // use it for the response. + // + // This will create a copy in the builder. + resultOrExceptionBuilder = ResultOrException.newBuilder(). + setException(ResponseConverter.buildException(sizeIOE)); + resultOrExceptionBuilder.setIndex(action.getIndex()); + builder.addResultOrException(resultOrExceptionBuilder.build()); + continue; + } if (action.hasGet()) { Get get = ProtobufUtil.toGet(action.getGet()); if (context != null) { @@ -690,22 +723,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler, mutations.clear(); } switch (type) { - case APPEND: - r = append(region, quota, action.getMutation(), cellScanner, nonceGroup); - break; - case INCREMENT: - r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup); - break; - case PUT: - case DELETE: - // Collect the individual mutations and apply in a batch - if (mutations == null) { - mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount()); - } - mutations.add(action); - break; - default: - throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); + case APPEND: + r = append(region, quota, action.getMutation(), cellScanner, nonceGroup); + break; + case INCREMENT: + r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup); + break; + case PUT: + case DELETE: + // Collect the individual mutations and apply in a batch + if (mutations == null) { + mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount()); + } + mutations.add(action); + break; + default: + throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); } } else { throw new HBaseIOException("Unexpected Action type"); @@ -715,11 +748,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (isClientCellBlockSupport(context)) { pbResult = ProtobufUtil.toResultNoData(r); // Hard to guess the size here. Just make a rough guess. - if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>(); + if (cellsToReturn == null) { + cellsToReturn = new ArrayList<CellScannable>(); + } cellsToReturn.add(r); } else { pbResult = ProtobufUtil.toResult(r); } + if (context != null) { + context.incrementResponseCellSize(Result.getTotalSizeOfCells(r)); + } resultOrExceptionBuilder = ClientProtos.ResultOrException.newBuilder().setResult(pbResult); } @@ -801,8 +839,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, case SUCCESS: builder.addResultOrException(getResultOrException( - ClientProtos.Result.getDefaultInstance(), index, - ((HRegion)region).getRegionStats())); + ClientProtos.Result.getDefaultInstance(), index, + ((HRegion) region).getRegionStats())); break; } } @@ -951,13 +989,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG); try { rpcServer = new RpcServer(rs, name, getServices(), - bindAddress, // use final bindAddress for this server. - rs.conf, - rpcSchedulerFactory.create(rs.conf, this, rs)); - } catch(BindException be) { + bindAddress, // use final bindAddress for this server. + rs.conf, + rpcSchedulerFactory.create(rs.conf, this, rs)); + } catch (BindException be) { String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT : - HConstants.REGIONSERVER_PORT; - throw new IOException(be.getMessage() + ". To switch ports use the '" + configName + + HConstants.REGIONSERVER_PORT; + throw new IOException(be.getMessage() + ". To switch ports use the '" + configName + "' configuration property.", be.getCause() != null ? be.getCause() : be); } @@ -2106,7 +2144,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // It is also the conduit via which we pass back data. PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; CellScanner cellScanner = controller != null ? controller.cellScanner(): null; - if (controller != null) controller.setCellScanner(null); + if (controller != null) { + controller.setCellScanner(null); + } long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; @@ -2180,7 +2220,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); } - if (processed != null) responseBuilder.setProcessed(processed); + if (processed != null) { + responseBuilder.setProcessed(processed); + } return responseBuilder.build(); } @@ -2197,10 +2239,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. // It is also the conduit via which we pass back data. PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; - CellScanner cellScanner = controller != null? controller.cellScanner(): null; + CellScanner cellScanner = controller != null ? controller.cellScanner() : null; OperationQuota quota = null; // Clear scanner so we are not holding on to reference across call. - if (controller != null) controller.setCellScanner(null); + if (controller != null) { + controller.setCellScanner(null); + } try { checkOpen(); requestCount.increment(); @@ -2448,8 +2492,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // where processing of request takes > lease expiration time. lease = regionServer.leases.removeLease(scannerName); List<Result> results = new ArrayList<Result>(); - long totalCellSize = 0; - long currentScanResultSize = 0; boolean done = false; // Call coprocessor. Get region info from scanner. @@ -2459,8 +2501,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (!results.isEmpty()) { for (Result r : results) { for (Cell cell : r.rawCells()) { - totalCellSize += CellUtil.estimatedSerializedSizeOf(cell); - currentScanResultSize += CellUtil.estimatedHeapSizeOf(cell); + if (context != null) { + context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell)); + } } } } @@ -2493,7 +2536,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // If the coprocessor host is adding to the result list, we cannot guarantee the // correct ordering of partial results and so we prevent partial results from being // formed. - boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0; + boolean serverGuaranteesOrderOfPartials = results.isEmpty(); boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; boolean moreRows = false; @@ -2559,7 +2602,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (!values.isEmpty()) { for (Cell cell : values) { - totalCellSize += CellUtil.estimatedSerializedSizeOf(cell); + if (context != null) { + context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell)); + } } final boolean partial = scannerContext.partialResultFormed(); results.add(Result.create(values, null, stale, partial)); @@ -2614,9 +2659,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } region.updateReadRequestsCount(i); - region.getMetrics().updateScanNext(totalCellSize); + long responseCellSize = context != null ? context.getResponseCellSize() : 0; + region.getMetrics().updateScanNext(responseCellSize); if (regionServer.metricsRegionServer != null) { - regionServer.metricsRegionServer.updateScannerNext(totalCellSize); + regionServer.metricsRegionServer.updateScannerNext(responseCellSize); } } finally { region.closeRegionOperation(); http://git-wip-us.apache.org/repos/asf/hbase/blob/48e217a7/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java new file mode 100644 index 0000000..47dd7be --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.CompatibilityFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.ipc.RpcServerInterface; +import org.apache.hadoop.hbase.metrics.BaseSource; +import org.apache.hadoop.hbase.test.MetricsAssertHelper; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; + +/** + * This test sets the multi size WAAAAAY low and then checks to make sure that gets will still make + * progress. + */ +@Category({MediumTests.class, ClientTests.class}) +public class TestMultiRespectsLimits { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final MetricsAssertHelper METRICS_ASSERT = + CompatibilityFactory.getInstance(MetricsAssertHelper.class); + private final static byte[] FAMILY = Bytes.toBytes("D"); + public static final int MAX_SIZE = 500; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setLong( + HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, + MAX_SIZE); + + // Only start on regionserver so that all regions are on the same server. + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testMultiLimits() throws Exception { + final TableName name = TableName.valueOf("testMultiLimits"); + Table t = TEST_UTIL.createTable(name, FAMILY); + TEST_UTIL.loadTable(t, FAMILY, false); + + // Split the table to make sure that the chunking happens accross regions. + try (final Admin admin = TEST_UTIL.getHBaseAdmin()) { + admin.split(name); + TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + return admin.getTableRegions(name).size() > 1; + } + }); + } + List<Get> gets = new ArrayList<>(MAX_SIZE); + + for (int i = 0; i < MAX_SIZE; i++) { + gets.add(new Get(HBaseTestingUtility.ROWS[i])); + } + Result[] results = t.get(gets); + assertEquals(MAX_SIZE, results.length); + RpcServerInterface rpcServer = TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer(); + BaseSource s = rpcServer.getMetrics().getMetricsSource(); + + // Cells from TEST_UTIL.loadTable have a length of 27. + // Multiplying by less than that gives an easy lower bound on size. + // However in reality each kv is being reported as much higher than that. + METRICS_ASSERT.assertCounterGt("exceptions", (MAX_SIZE * 25) / MAX_SIZE, s); + METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge", + (MAX_SIZE * 25) / MAX_SIZE, s); + } +}