This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new b26196b8b93 HBASE-28352 HTable batch does not honor 
RpcThrottlingException waitInterval (#5671)
b26196b8b93 is described below

commit b26196b8b931fc99b2a9e3dc3d87d8f7addb3815
Author: Bryan Beaudreault <[email protected]>
AuthorDate: Sun Feb 11 09:09:38 2024 -0500

    HBASE-28352 HTable batch does not honor RpcThrottlingException waitInterval 
(#5671)
    
    Signed-off-by: Duo Zhang <[email protected]>
---
 .../hbase/client/AsyncRequestFutureImpl.java       | 28 +++++++--
 .../hadoop/hbase/client/TestAsyncProcess.java      | 68 ++++++++++++++++++----
 .../hbase/client/TestClientOperationTimeout.java   | 39 ++++++++-----
 .../hadoop/hbase/quotas/TestAtomicReadQuota.java   |  3 +-
 4 files changed, 110 insertions(+), 28 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index dc150724504..b34ef863d56 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
+import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -416,21 +417,25 @@ class AsyncRequestFutureImpl<CResult> implements 
AsyncRequestFuture {
    * timeout against the appropriate tracker, or returns false if no tracker.
    */
   private boolean isOperationTimeoutExceeded() {
+    // return value of 1 is special to mean exceeded, to differentiate from 0
+    // which is no timeout. see implementation of 
RetryingTimeTracker.getRemainingTime
+    return getRemainingTime() == 1;
+  }
+
+  private long getRemainingTime() {
     RetryingTimeTracker currentTracker;
     if (tracker != null) {
       currentTracker = tracker;
     } else if (currentCallable != null && currentCallable.getTracker() != 
null) {
       currentTracker = currentCallable.getTracker();
     } else {
-      return false;
+      return 0;
     }
 
     // no-op if already started, this is just to ensure it was initialized 
(usually true)
     currentTracker.start();
 
-    // return value of 1 is special to mean exceeded, to differentiate from 0
-    // which is no timeout. see implementation of getRemainingTime
-    return currentTracker.getRemainingTime(operationTimeout) == 1;
+    return currentTracker.getRemainingTime(operationTimeout);
   }
 
   /**
@@ -820,6 +825,8 @@ class AsyncRequestFutureImpl<CResult> implements 
AsyncRequestFuture {
     long backOffTime;
     if (retryImmediately) {
       backOffTime = 0;
+    } else if (throwable instanceof RpcThrottlingException) {
+      backOffTime = ((RpcThrottlingException) throwable).getWaitInterval();
     } else if (HBaseServerException.isServerOverloaded(throwable)) {
       // Give a special check when encountering an exception indicating the 
server is overloaded.
       // see #HBASE-17114 and HBASE-26807
@@ -842,6 +849,19 @@ class AsyncRequestFutureImpl<CResult> implements 
AsyncRequestFuture {
         backOffTime, true, null, -1, -1));
     }
 
+    long remainingTime = getRemainingTime();
+    // 1 is a special value meaning exceeded and 0 means no timeout.
+    // throw if timeout already exceeded, or if backoff is larger than 
non-zero remaining
+    if (remainingTime == 1 || (remainingTime > 0 && backOffTime > 
remainingTime)) {
+      OperationTimeoutExceededException ex = new 
OperationTimeoutExceededException(
+        "Backoff time of " + backOffTime + "ms would exceed operation 
timeout", throwable);
+      for (Action actionToFail : toReplay) {
+        manageError(actionToFail.getOriginalIndex(), actionToFail.getAction(),
+          Retry.NO_NOT_RETRIABLE, ex, null);
+      }
+      return;
+    }
+
     try {
       if (backOffTime > 0) {
         Thread.sleep(backOffTime);
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index c36ab952ed8..245c802845a 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.CallDroppedException;
 import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HBaseServerException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -66,6 +67,7 @@ import 
org.apache.hadoop.hbase.client.backoff.ServerStatistics;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -1742,16 +1744,30 @@ public class TestAsyncProcess {
     testRetryPauseWhenServerIsOverloaded(new CallDroppedException());
   }
 
+  @Test
+  public void testRetryPauseForRpcThrottling() throws IOException {
+    long waitInterval = 500L;
+    testRetryPause(new Configuration(CONF), waitInterval, new 
RpcThrottlingException(
+      RpcThrottlingException.Type.NumReadRequestsExceeded, waitInterval, "For 
test"));
+  }
+
   private void testRetryPauseWhenServerIsOverloaded(HBaseServerException 
exception)
     throws IOException {
-    Configuration conf = new Configuration(CONF);
-    final long specialPause = 500L;
+    Configuration testConf = new Configuration(CONF);
+    long specialPause = 500L;
+    
testConf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
+      specialPause);
+    testRetryPause(testConf, specialPause, exception);
+  }
+
+  private void testRetryPause(Configuration testConf, long expectedPause,
+    HBaseIOException exception) throws IOException {
+
     final int retries = 1;
-    
conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, 
specialPause);
-    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
+    testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
 
-    ClusterConnection conn = new MyConnectionImpl(conf);
-    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, 
exception);
+    ClusterConnection conn = new MyConnectionImpl(testConf);
+    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, testConf, 
exception);
     BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, 
DUMMY_TABLE);
     BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, 
ap);
 
@@ -1771,9 +1787,9 @@ public class TestAsyncProcess {
     long actualSleep = EnvironmentEdgeManager.currentTime() - startTime;
     long expectedSleep = 0L;
     for (int i = 0; i < retries; i++) {
-      expectedSleep += ConnectionUtils.getPauseTime(specialPause, i);
+      expectedSleep += ConnectionUtils.getPauseTime(expectedPause, i);
       // Prevent jitter in ConcurrentMapUtils#getPauseTime to affect result
-      actualSleep += (long) (specialPause * 0.01f);
+      actualSleep += (long) (expectedPause * 0.01f);
     }
     LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + 
actualSleep + "ms");
     Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " 
+ actualSleep + "ms",
@@ -1781,8 +1797,8 @@ public class TestAsyncProcess {
 
     // check and confirm normal IOE will use the normal pause
     final long normalPause =
-      conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
-    ap = new AsyncProcessWithFailure(conn, conf, new IOException());
+      testConf.getLong(HConstants.HBASE_CLIENT_PAUSE, 
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+    ap = new AsyncProcessWithFailure(conn, testConf, new IOException());
     bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
     mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
     Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
@@ -1806,6 +1822,38 @@ public class TestAsyncProcess {
     Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep 
<= expectedSleep);
   }
 
+  @Test
+  public void testFastFailIfBackoffGreaterThanRemaining() throws IOException {
+    Configuration testConf = new Configuration(CONF);
+    testConf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 100);
+    long waitInterval = 500L;
+    HBaseIOException exception = new RpcThrottlingException(
+      RpcThrottlingException.Type.NumReadRequestsExceeded, waitInterval, "For 
test");
+
+    final int retries = 1;
+    testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
+
+    ClusterConnection conn = new MyConnectionImpl(testConf);
+    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, testConf, 
exception);
+    BufferedMutatorParams bufferParam =
+      createBufferedMutatorParams(ap, DUMMY_TABLE).operationTimeout(100);
+    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, 
ap);
+
+    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
+
+    Put p = createPut(1, true);
+    mutator.mutate(p);
+
+    try {
+      mutator.flush();
+      Assert.fail();
+    } catch (RetriesExhaustedWithDetailsException expected) {
+      assertEquals(1, expected.getNumExceptions());
+      assertTrue(expected.getCause(0) instanceof 
OperationTimeoutExceededException);
+      assertTrue(expected.getCause(0).getMessage().startsWith("Backoff"));
+    }
+  }
+
   /**
    * Tests that we properly recover from exceptions that DO NOT go through 
receiveGlobalFailure, due
    * to updating the meta cache for the region which failed. Successful 
multigets can include region
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationTimeout.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationTimeout.java
index ccdd153dd73..114886a587a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationTimeout.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationTimeout.java
@@ -44,6 +44,7 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.function.ThrowingRunnable;
 
 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
@@ -161,8 +162,9 @@ public class TestClientOperationTimeout {
   }
 
   /**
-   * Tests that a batch mutate and batch get on a table throws {@link 
SocketTimeoutException} when
-   * the operation takes longer than 'hbase.client.operation.timeout'.
+   * Tests that a batch mutate and batch get on a table throws {@link 
SocketTimeoutException} or
+   * {@link OperationTimeoutExceededException} when the operation takes longer 
than
+   * 'hbase.client.operation.timeout'.
    */
   @Test
   public void testMultiTimeout() {
@@ -174,12 +176,7 @@ public class TestClientOperationTimeout {
     List<Put> puts = new ArrayList<>();
     puts.add(put1);
     puts.add(put2);
-    try {
-      TABLE.batch(puts, new Object[2]);
-      Assert.fail("should not reach here");
-    } catch (Exception e) {
-      Assert.assertTrue(e instanceof SocketTimeoutException);
-    }
+    assertMultiException(() -> TABLE.batch(puts, new Object[2]));
 
     Get get1 = new Get(ROW);
     get1.addColumn(FAMILY, QUALIFIER);
@@ -189,11 +186,27 @@ public class TestClientOperationTimeout {
     List<Get> gets = new ArrayList<>();
     gets.add(get1);
     gets.add(get2);
-    try {
-      TABLE.batch(gets, new Object[2]);
-      Assert.fail("should not reach here");
-    } catch (Exception e) {
-      Assert.assertTrue(e instanceof SocketTimeoutException);
+    assertMultiException(() -> TABLE.batch(gets, new Object[2]));
+  }
+
+  /**
+   * AsyncProcess has an overall waitUntilDone with a timeout, and if all 
callables dont finish by
+   * then it throws a SocketTimeoutException. The callables themselves also 
try to honor the
+   * operation timeout and result in OperationTimeoutExceededException 
(wrapped in
+   * RetriesExhausted). The latter is the more user-friendly exception because 
it contains details
+   * about which server has issues, etc. For now we need to account for both 
because it's sort of a
+   * race to see which timeout exceeds first. Maybe we can replace the 
waitUntilDone behavior with
+   * an interrupt in the future so we can further unify.
+   */
+  private void assertMultiException(ThrowingRunnable runnable) {
+    IOException e = Assert.assertThrows(IOException.class, runnable);
+    if (e instanceof SocketTimeoutException) {
+      return;
+    }
+    Assert.assertTrue("Expected SocketTimeoutException or 
RetriesExhaustedWithDetailsException"
+      + " but was " + e.getClass(), e instanceof 
RetriesExhaustedWithDetailsException);
+    for (Throwable cause : ((RetriesExhaustedWithDetailsException) 
e).getCauses()) {
+      Assert.assertEquals(OperationTimeoutExceededException.class, 
cause.getClass());
     }
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java
index 9217ff14b97..f2beb8f5d27 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java
@@ -216,7 +216,8 @@ public class TestAtomicReadQuota {
           request.run(table);
           return false;
         } catch (RetriesExhaustedWithDetailsException e) {
-          success = e.getCauses().stream().allMatch(t -> t instanceof 
RpcThrottlingException);
+          success = e.getCauses().stream().allMatch(t -> t instanceof 
RpcThrottlingException
+            || t.getCause() instanceof RpcThrottlingException);
           ex = e;
         } catch (Exception e) {
           success = e.getCause() instanceof RpcThrottlingException;

Reply via email to