This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new b26196b8b93 HBASE-28352 HTable batch does not honor
RpcThrottlingException waitInterval (#5671)
b26196b8b93 is described below
commit b26196b8b931fc99b2a9e3dc3d87d8f7addb3815
Author: Bryan Beaudreault <[email protected]>
AuthorDate: Sun Feb 11 09:09:38 2024 -0500
HBASE-28352 HTable batch does not honor RpcThrottlingException waitInterval
(#5671)
Signed-off-by: Duo Zhang <[email protected]>
---
.../hbase/client/AsyncRequestFutureImpl.java | 28 +++++++--
.../hadoop/hbase/client/TestAsyncProcess.java | 68 ++++++++++++++++++----
.../hbase/client/TestClientOperationTimeout.java | 39 ++++++++-----
.../hadoop/hbase/quotas/TestAtomicReadQuota.java | 3 +-
4 files changed, 110 insertions(+), 28 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index dc150724504..b34ef863d56 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
+import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
@@ -416,21 +417,25 @@ class AsyncRequestFutureImpl<CResult> implements
AsyncRequestFuture {
* timeout against the appropriate tracker, or returns false if no tracker.
*/
private boolean isOperationTimeoutExceeded() {
+ // return value of 1 is special to mean exceeded, to differentiate from 0
+ // which is no timeout. see implementation of
RetryingTimeTracker.getRemainingTime
+ return getRemainingTime() == 1;
+ }
+
+ private long getRemainingTime() {
RetryingTimeTracker currentTracker;
if (tracker != null) {
currentTracker = tracker;
} else if (currentCallable != null && currentCallable.getTracker() !=
null) {
currentTracker = currentCallable.getTracker();
} else {
- return false;
+ return 0;
}
// no-op if already started, this is just to ensure it was initialized
(usually true)
currentTracker.start();
- // return value of 1 is special to mean exceeded, to differentiate from 0
- // which is no timeout. see implementation of getRemainingTime
- return currentTracker.getRemainingTime(operationTimeout) == 1;
+ return currentTracker.getRemainingTime(operationTimeout);
}
/**
@@ -820,6 +825,8 @@ class AsyncRequestFutureImpl<CResult> implements
AsyncRequestFuture {
long backOffTime;
if (retryImmediately) {
backOffTime = 0;
+ } else if (throwable instanceof RpcThrottlingException) {
+ backOffTime = ((RpcThrottlingException) throwable).getWaitInterval();
} else if (HBaseServerException.isServerOverloaded(throwable)) {
// Give a special check when encountering an exception indicating the
server is overloaded.
// see #HBASE-17114 and HBASE-26807
@@ -842,6 +849,19 @@ class AsyncRequestFutureImpl<CResult> implements
AsyncRequestFuture {
backOffTime, true, null, -1, -1));
}
+ long remainingTime = getRemainingTime();
+ // 1 is a special value meaning exceeded and 0 means no timeout.
+ // throw if timeout already exceeded, or if backoff is larger than
non-zero remaining
+ if (remainingTime == 1 || (remainingTime > 0 && backOffTime >
remainingTime)) {
+ OperationTimeoutExceededException ex = new
OperationTimeoutExceededException(
+ "Backoff time of " + backOffTime + "ms would exceed operation
timeout", throwable);
+ for (Action actionToFail : toReplay) {
+ manageError(actionToFail.getOriginalIndex(), actionToFail.getAction(),
+ Retry.NO_NOT_RETRIABLE, ex, null);
+ }
+ return;
+ }
+
try {
if (backOffTime > 0) {
Thread.sleep(backOffTime);
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index c36ab952ed8..245c802845a 100644
---
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.CallDroppedException;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -66,6 +67,7 @@ import
org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -1742,16 +1744,30 @@ public class TestAsyncProcess {
testRetryPauseWhenServerIsOverloaded(new CallDroppedException());
}
+ @Test
+ public void testRetryPauseForRpcThrottling() throws IOException {
+ long waitInterval = 500L;
+ testRetryPause(new Configuration(CONF), waitInterval, new
RpcThrottlingException(
+ RpcThrottlingException.Type.NumReadRequestsExceeded, waitInterval, "For
test"));
+ }
+
private void testRetryPauseWhenServerIsOverloaded(HBaseServerException
exception)
throws IOException {
- Configuration conf = new Configuration(CONF);
- final long specialPause = 500L;
+ Configuration testConf = new Configuration(CONF);
+ long specialPause = 500L;
+
testConf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
+ specialPause);
+ testRetryPause(testConf, specialPause, exception);
+ }
+
+ private void testRetryPause(Configuration testConf, long expectedPause,
+ HBaseIOException exception) throws IOException {
+
final int retries = 1;
-
conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
specialPause);
- conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
+ testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
- ClusterConnection conn = new MyConnectionImpl(conf);
- AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf,
exception);
+ ClusterConnection conn = new MyConnectionImpl(testConf);
+ AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, testConf,
exception);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap,
DUMMY_TABLE);
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam,
ap);
@@ -1771,9 +1787,9 @@ public class TestAsyncProcess {
long actualSleep = EnvironmentEdgeManager.currentTime() - startTime;
long expectedSleep = 0L;
for (int i = 0; i < retries; i++) {
- expectedSleep += ConnectionUtils.getPauseTime(specialPause, i);
+ expectedSleep += ConnectionUtils.getPauseTime(expectedPause, i);
// Prevent jitter in ConcurrentMapUtils#getPauseTime to affect result
- actualSleep += (long) (specialPause * 0.01f);
+ actualSleep += (long) (expectedPause * 0.01f);
}
LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " +
actualSleep + "ms");
Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually "
+ actualSleep + "ms",
@@ -1781,8 +1797,8 @@ public class TestAsyncProcess {
// check and confirm normal IOE will use the normal pause
final long normalPause =
- conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
- ap = new AsyncProcessWithFailure(conn, conf, new IOException());
+ testConf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+ ap = new AsyncProcessWithFailure(conn, testConf, new IOException());
bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
@@ -1806,6 +1822,38 @@ public class TestAsyncProcess {
Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep
<= expectedSleep);
}
+ @Test
+ public void testFastFailIfBackoffGreaterThanRemaining() throws IOException {
+ Configuration testConf = new Configuration(CONF);
+ testConf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 100);
+ long waitInterval = 500L;
+ HBaseIOException exception = new RpcThrottlingException(
+ RpcThrottlingException.Type.NumReadRequestsExceeded, waitInterval, "For
test");
+
+ final int retries = 1;
+ testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
+
+ ClusterConnection conn = new MyConnectionImpl(testConf);
+ AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, testConf,
exception);
+ BufferedMutatorParams bufferParam =
+ createBufferedMutatorParams(ap, DUMMY_TABLE).operationTimeout(100);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam,
ap);
+
+ Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
+
+ Put p = createPut(1, true);
+ mutator.mutate(p);
+
+ try {
+ mutator.flush();
+ Assert.fail();
+ } catch (RetriesExhaustedWithDetailsException expected) {
+ assertEquals(1, expected.getNumExceptions());
+ assertTrue(expected.getCause(0) instanceof
OperationTimeoutExceededException);
+ assertTrue(expected.getCause(0).getMessage().startsWith("Backoff"));
+ }
+ }
+
/**
* Tests that we properly recover from exceptions that DO NOT go through
receiveGlobalFailure, due
* to updating the meta cache for the region which failed. Successful
multigets can include region
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationTimeout.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationTimeout.java
index ccdd153dd73..114886a587a 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationTimeout.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationTimeout.java
@@ -44,6 +44,7 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.function.ThrowingRunnable;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
@@ -161,8 +162,9 @@ public class TestClientOperationTimeout {
}
/**
- * Tests that a batch mutate and batch get on a table throws {@link
SocketTimeoutException} when
- * the operation takes longer than 'hbase.client.operation.timeout'.
+ * Tests that a batch mutate and batch get on a table throws {@link
SocketTimeoutException} or
+ * {@link OperationTimeoutExceededException} when the operation takes longer
than
+ * 'hbase.client.operation.timeout'.
*/
@Test
public void testMultiTimeout() {
@@ -174,12 +176,7 @@ public class TestClientOperationTimeout {
List<Put> puts = new ArrayList<>();
puts.add(put1);
puts.add(put2);
- try {
- TABLE.batch(puts, new Object[2]);
- Assert.fail("should not reach here");
- } catch (Exception e) {
- Assert.assertTrue(e instanceof SocketTimeoutException);
- }
+ assertMultiException(() -> TABLE.batch(puts, new Object[2]));
Get get1 = new Get(ROW);
get1.addColumn(FAMILY, QUALIFIER);
@@ -189,11 +186,27 @@ public class TestClientOperationTimeout {
List<Get> gets = new ArrayList<>();
gets.add(get1);
gets.add(get2);
- try {
- TABLE.batch(gets, new Object[2]);
- Assert.fail("should not reach here");
- } catch (Exception e) {
- Assert.assertTrue(e instanceof SocketTimeoutException);
+ assertMultiException(() -> TABLE.batch(gets, new Object[2]));
+ }
+
+ /**
+ * AsyncProcess has an overall waitUntilDone with a timeout, and if all
callables dont finish by
+ * then it throws a SocketTimeoutException. The callables themselves also
try to honor the
+ * operation timeout and result in OperationTimeoutExceededException
(wrapped in
+ * RetriesExhausted). The latter is the more user-friendly exception because
it contains details
+ * about which server has issues, etc. For now we need to account for both
because it's sort of a
+ * race to see which timeout exceeds first. Maybe we can replace the
waitUntilDone behavior with
+ * an interrupt in the future so we can further unify.
+ */
+ private void assertMultiException(ThrowingRunnable runnable) {
+ IOException e = Assert.assertThrows(IOException.class, runnable);
+ if (e instanceof SocketTimeoutException) {
+ return;
+ }
+ Assert.assertTrue("Expected SocketTimeoutException or
RetriesExhaustedWithDetailsException"
+ + " but was " + e.getClass(), e instanceof
RetriesExhaustedWithDetailsException);
+ for (Throwable cause : ((RetriesExhaustedWithDetailsException)
e).getCauses()) {
+ Assert.assertEquals(OperationTimeoutExceededException.class,
cause.getClass());
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java
index 9217ff14b97..f2beb8f5d27 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java
@@ -216,7 +216,8 @@ public class TestAtomicReadQuota {
request.run(table);
return false;
} catch (RetriesExhaustedWithDetailsException e) {
- success = e.getCauses().stream().allMatch(t -> t instanceof
RpcThrottlingException);
+ success = e.getCauses().stream().allMatch(t -> t instanceof
RpcThrottlingException
+ || t.getCause() instanceof RpcThrottlingException);
ex = e;
} catch (Exception e) {
success = e.getCause() instanceof RpcThrottlingException;