This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 35bb4768150 HBASE-29390 Too many logs in AsyncBatchRpcRetryingCaller 
when hitting RegionTooBusyException (#7105)
35bb4768150 is described below

commit 35bb4768150890362b4a1a96c993dc812e3fdfd9
Author: Duo Zhang <[email protected]>
AuthorDate: Fri Jun 20 09:53:43 2025 +0800

    HBASE-29390 Too many logs in AsyncBatchRpcRetryingCaller when hitting 
RegionTooBusyException (#7105)
    
    Signed-off-by: Nihal Jain <[email protected]>
    (cherry picked from commit 387e21383e80d364138264a3a1e88cfa46d3a2f9)
---
 .../hbase/client/AsyncBatchRpcRetryingCaller.java  |  61 ++++++++--
 .../client/TestAsyncBatchRpcRetryingCaller.java    | 123 +++++++++++++++++++++
 2 files changed, 175 insertions(+), 9 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index d02d9daa5f3..fbf5285ea0c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -25,11 +25,15 @@ import static 
org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 import static 
org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -118,7 +122,8 @@ class AsyncBatchRpcRetryingCaller<T> {
 
   // we can not use HRegionLocation as the map key because the hashCode and 
equals method of
   // HRegionLocation only consider serverName.
-  private static final class RegionRequest {
+  // package private for testing log output
+  static final class RegionRequest {
 
     public final HRegionLocation loc;
 
@@ -212,14 +217,47 @@ class AsyncBatchRpcRetryingCaller<T> {
     }
   }
 
-  private void logException(int tries, Supplier<Stream<RegionRequest>> 
regionsSupplier,
+  private void logRegionsException(int tries, Supplier<Stream<RegionRequest>> 
regionsSupplier,
     Throwable error, ServerName serverName) {
     if (tries > startLogErrorsCnt) {
       String regions =
         regionsSupplier.get().map(r -> "'" + 
r.loc.getRegion().getRegionNameAsString() + "'")
           .collect(Collectors.joining(",", "[", "]"));
-      LOG.warn("Process batch for " + regions + " in " + tableName + " from " 
+ serverName
-        + " failed, tries=" + tries, error);
+      LOG.warn("Process batch for {} from {} failed, tries={}", regions, 
serverName, tries, error);
+    }
+  }
+
+  private static final int MAX_SAMPLED_ERRORS = 3;
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/(src/test/|AsyncBatchRpcRetryingCaller).*")
+  static void logActionsException(int tries, int startLogErrorsCnt, 
RegionRequest regionReq,
+    IdentityHashMap<Action, Throwable> action2Error, ServerName serverName) {
+    if (tries <= startLogErrorsCnt || action2Error.isEmpty()) {
+      return;
+    }
+    if (LOG.isWarnEnabled()) {
+      StringWriter sw = new StringWriter();
+      PrintWriter action2ErrorWriter = new PrintWriter(sw);
+      action2ErrorWriter.println();
+      Iterator<Map.Entry<Action, Throwable>> iter = 
action2Error.entrySet().iterator();
+      for (int i = 0; i < MAX_SAMPLED_ERRORS && iter.hasNext(); i++) {
+        Map.Entry<Action, Throwable> entry = iter.next();
+        action2ErrorWriter.print(entry.getKey().getAction());
+        action2ErrorWriter.print(" => ");
+        entry.getValue().printStackTrace(action2ErrorWriter);
+      }
+      action2ErrorWriter.flush();
+      LOG.warn("Process batch for {} on {}, {}/{} actions failed, tries={}, 
sampled {} errors: {}",
+        regionReq.loc.getRegion().getRegionNameAsString(), serverName, 
action2Error.size(),
+        regionReq.actions.size(), tries, Math.min(MAX_SAMPLED_ERRORS, 
action2Error.size()),
+        sw.toString());
+    }
+    // if trace is enabled, we log all the details
+    if (LOG.isTraceEnabled()) {
+      action2Error.forEach((action, error) -> LOG.trace(
+        "Process action {} in batch for {} on {} failed, tries={}", 
action.getAction(),
+        regionReq.loc.getRegion().getRegionNameAsString(), serverName, tries, 
error));
     }
   }
 
@@ -297,7 +335,7 @@ class AsyncBatchRpcRetryingCaller<T> {
   @SuppressWarnings("unchecked")
   private void onComplete(Action action, RegionRequest regionReq, int tries, 
ServerName serverName,
     RegionResult regionResult, List<Action> failedActions, Throwable 
regionException,
-    MutableBoolean retryImmediately) {
+    MutableBoolean retryImmediately, IdentityHashMap<Action, Throwable> 
action2Error) {
     Object result = 
regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
     if (result == null) {
       LOG.error("Server " + serverName + " sent us neither result nor 
exception for row '"
@@ -307,7 +345,7 @@ class AsyncBatchRpcRetryingCaller<T> {
       failedActions.add(action);
     } else if (result instanceof Throwable) {
       Throwable error = translateException((Throwable) result);
-      logException(tries, () -> Stream.of(regionReq), error, serverName);
+      action2Error.put(action, error);
       conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
       if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
         failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
@@ -333,8 +371,13 @@ class AsyncBatchRpcRetryingCaller<T> {
       RegionResult regionResult = resp.getResults().get(rn);
       Throwable regionException = resp.getException(rn);
       if (regionResult != null) {
+        // Here we record the exceptions and log it at once, to avoid flooding 
log output if lots of
+        // actions are failed. For example, if the region's memstore is full, 
all actions will
+        // received a RegionTooBusyException, see HBASE-29390.
+        IdentityHashMap<Action, Throwable> action2Error = new 
IdentityHashMap<>();
         regionReq.actions.forEach(action -> onComplete(action, regionReq, 
tries, serverName,
-          regionResult, failedActions, regionException, retryImmediately));
+          regionResult, failedActions, regionException, retryImmediately, 
action2Error));
+        logActionsException(tries, startLogErrorsCnt, regionReq, action2Error, 
serverName);
       } else {
         Throwable error;
         if (regionException == null) {
@@ -344,7 +387,7 @@ class AsyncBatchRpcRetryingCaller<T> {
         } else {
           error = translateException(regionException);
         }
-        logException(tries, () -> Stream.of(regionReq), error, serverName);
+        logRegionsException(tries, () -> Stream.of(regionReq), error, 
serverName);
         conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
         if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
           failAll(regionReq.actions.stream(), tries, error, serverName);
@@ -453,7 +496,7 @@ class AsyncBatchRpcRetryingCaller<T> {
   private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, 
Throwable t,
     ServerName serverName) {
     Throwable error = translateException(t);
-    logException(tries, () -> actionsByRegion.values().stream(), error, 
serverName);
+    logRegionsException(tries, () -> actionsByRegion.values().stream(), error, 
serverName);
     actionsByRegion.forEach(
       (rn, regionReq) -> 
conn.getLocator().updateCachedLocationOnError(regionReq.loc, error));
     if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBatchRpcRetryingCaller.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBatchRpcRetryingCaller.java
new file mode 100644
index 00000000000..0d37ee454b1
--- /dev/null
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBatchRpcRetryingCaller.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.IdentityHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import 
org.apache.hadoop.hbase.client.AsyncBatchRpcRetryingCaller.RegionRequest;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestAsyncBatchRpcRetryingCaller {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncBatchRpcRetryingCaller.class);
+
+  private org.apache.logging.log4j.core.Appender mockAppender;
+
+  @Before
+  public void setUp() {
+    mockAppender = mock(org.apache.logging.log4j.core.Appender.class);
+    when(mockAppender.getName()).thenReturn("mockAppender");
+    when(mockAppender.isStarted()).thenReturn(true);
+    ((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
+      .getLogger(AsyncBatchRpcRetryingCaller.class)).addAppender(mockAppender);
+
+  }
+
+  @After
+  public void tearDown() {
+    ((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
+      
.getLogger(AsyncBatchRpcRetryingCaller.class)).removeAppender(mockAppender);
+  }
+
+  @Test
+  public void testLogAction() {
+    AtomicReference<org.apache.logging.log4j.Level> level = new 
AtomicReference<>();
+    AtomicReference<String> msg = new AtomicReference<String>();
+    doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        org.apache.logging.log4j.core.LogEvent logEvent =
+          invocation.getArgument(0, 
org.apache.logging.log4j.core.LogEvent.class);
+        level.set(logEvent.getLevel());
+        msg.set(logEvent.getMessage().getFormattedMessage());
+        return null;
+      }
+    
}).when(mockAppender).append(any(org.apache.logging.log4j.core.LogEvent.class));
+    TableName tn = TableName.valueOf("async");
+    ServerName sn = ServerName.valueOf("host", 12345, 
EnvironmentEdgeManager.currentTime());
+    RegionRequest request =
+      new RegionRequest(new 
HRegionLocation(RegionInfoBuilder.newBuilder(tn).build(), sn));
+    Action put = new Action(new Put(Bytes.toBytes("a")), 0);
+    Action get = new Action(new Get(Bytes.toBytes("b")), 1);
+    Action incr = new Action(new Increment(Bytes.toBytes("c")), 2);
+    Action del = new Action(new Delete(Bytes.toBytes("d")), 3);
+    request.actions.add(put);
+    request.actions.add(get);
+    request.actions.add(incr);
+    request.actions.add(del);
+    IdentityHashMap<Action, Throwable> action2Error = new IdentityHashMap<>();
+    AsyncBatchRpcRetryingCaller.logActionsException(1, 2, request, 
action2Error, sn);
+    verify(mockAppender, never()).append(any());
+    AsyncBatchRpcRetryingCaller.logActionsException(5, 4, request, 
action2Error, sn);
+    verify(mockAppender, never()).append(any());
+
+    action2Error.put(get, new IOException("get error"));
+    action2Error.put(incr, new IOException("incr error"));
+    AsyncBatchRpcRetryingCaller.logActionsException(5, 4, request, 
action2Error, sn);
+    verify(mockAppender, times(1)).append(any());
+    assertEquals(org.apache.logging.log4j.Level.WARN, level.get());
+
+    String logMsg = msg.get();
+    assertThat(logMsg,
+      startsWith("Process batch for " + 
request.loc.getRegion().getRegionNameAsString() + " on "
+        + sn.toString() + ", 2/4 actions failed, tries=5, sampled 2 errors:"));
+    assertThat(logMsg, containsString("=> java.io.IOException: get error"));
+    assertThat(logMsg, containsString("=> java.io.IOException: incr error"));
+  }
+}

Reply via email to