This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 35bb4768150 HBASE-29390 Too many logs in AsyncBatchRpcRetryingCaller
when hitting RegionTooBusyException (#7105)
35bb4768150 is described below
commit 35bb4768150890362b4a1a96c993dc812e3fdfd9
Author: Duo Zhang <[email protected]>
AuthorDate: Fri Jun 20 09:53:43 2025 +0800
HBASE-29390 Too many logs in AsyncBatchRpcRetryingCaller when hitting
RegionTooBusyException (#7105)
Signed-off-by: Nihal Jain <[email protected]>
(cherry picked from commit 387e21383e80d364138264a3a1e88cfa46d3a2f9)
---
.../hbase/client/AsyncBatchRpcRetryingCaller.java | 61 ++++++++--
.../client/TestAsyncBatchRpcRetryingCaller.java | 123 +++++++++++++++++++++
2 files changed, 175 insertions(+), 9 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index d02d9daa5f3..fbf5285ea0c 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -25,11 +25,15 @@ import static
org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static
org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
+import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -118,7 +122,8 @@ class AsyncBatchRpcRetryingCaller<T> {
// we can not use HRegionLocation as the map key because the hashCode and
equals method of
// HRegionLocation only consider serverName.
- private static final class RegionRequest {
+ // package private for testing log output
+ static final class RegionRequest {
public final HRegionLocation loc;
@@ -212,14 +217,47 @@ class AsyncBatchRpcRetryingCaller<T> {
}
}
- private void logException(int tries, Supplier<Stream<RegionRequest>>
regionsSupplier,
+ private void logRegionsException(int tries, Supplier<Stream<RegionRequest>>
regionsSupplier,
Throwable error, ServerName serverName) {
if (tries > startLogErrorsCnt) {
String regions =
regionsSupplier.get().map(r -> "'" +
r.loc.getRegion().getRegionNameAsString() + "'")
.collect(Collectors.joining(",", "[", "]"));
- LOG.warn("Process batch for " + regions + " in " + tableName + " from "
+ serverName
- + " failed, tries=" + tries, error);
+ LOG.warn("Process batch for {} from {} failed, tries={}", regions,
serverName, tries, error);
+ }
+ }
+
+ private static final int MAX_SAMPLED_ERRORS = 3;
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/(src/test/|AsyncBatchRpcRetryingCaller).*")
+ static void logActionsException(int tries, int startLogErrorsCnt,
RegionRequest regionReq,
+ IdentityHashMap<Action, Throwable> action2Error, ServerName serverName) {
+ if (tries <= startLogErrorsCnt || action2Error.isEmpty()) {
+ return;
+ }
+ if (LOG.isWarnEnabled()) {
+ StringWriter sw = new StringWriter();
+ PrintWriter action2ErrorWriter = new PrintWriter(sw);
+ action2ErrorWriter.println();
+ Iterator<Map.Entry<Action, Throwable>> iter =
action2Error.entrySet().iterator();
+ for (int i = 0; i < MAX_SAMPLED_ERRORS && iter.hasNext(); i++) {
+ Map.Entry<Action, Throwable> entry = iter.next();
+ action2ErrorWriter.print(entry.getKey().getAction());
+ action2ErrorWriter.print(" => ");
+ entry.getValue().printStackTrace(action2ErrorWriter);
+ }
+ action2ErrorWriter.flush();
+ LOG.warn("Process batch for {} on {}, {}/{} actions failed, tries={},
sampled {} errors: {}",
+ regionReq.loc.getRegion().getRegionNameAsString(), serverName,
action2Error.size(),
+ regionReq.actions.size(), tries, Math.min(MAX_SAMPLED_ERRORS,
action2Error.size()),
+ sw.toString());
+ }
+ // if trace is enabled, we log all the details
+ if (LOG.isTraceEnabled()) {
+ action2Error.forEach((action, error) -> LOG.trace(
+ "Process action {} in batch for {} on {} failed, tries={}",
action.getAction(),
+ regionReq.loc.getRegion().getRegionNameAsString(), serverName, tries,
error));
}
}
@@ -297,7 +335,7 @@ class AsyncBatchRpcRetryingCaller<T> {
@SuppressWarnings("unchecked")
private void onComplete(Action action, RegionRequest regionReq, int tries,
ServerName serverName,
RegionResult regionResult, List<Action> failedActions, Throwable
regionException,
- MutableBoolean retryImmediately) {
+ MutableBoolean retryImmediately, IdentityHashMap<Action, Throwable>
action2Error) {
Object result =
regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
if (result == null) {
LOG.error("Server " + serverName + " sent us neither result nor
exception for row '"
@@ -307,7 +345,7 @@ class AsyncBatchRpcRetryingCaller<T> {
failedActions.add(action);
} else if (result instanceof Throwable) {
Throwable error = translateException((Throwable) result);
- logException(tries, () -> Stream.of(regionReq), error, serverName);
+ action2Error.put(action, error);
conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
@@ -333,8 +371,13 @@ class AsyncBatchRpcRetryingCaller<T> {
RegionResult regionResult = resp.getResults().get(rn);
Throwable regionException = resp.getException(rn);
if (regionResult != null) {
+ // Here we record the exceptions and log it at once, to avoid flooding
log output if lots of
+ // actions are failed. For example, if the region's memstore is full,
all actions will
+ // received a RegionTooBusyException, see HBASE-29390.
+ IdentityHashMap<Action, Throwable> action2Error = new
IdentityHashMap<>();
regionReq.actions.forEach(action -> onComplete(action, regionReq,
tries, serverName,
- regionResult, failedActions, regionException, retryImmediately));
+ regionResult, failedActions, regionException, retryImmediately,
action2Error));
+ logActionsException(tries, startLogErrorsCnt, regionReq, action2Error,
serverName);
} else {
Throwable error;
if (regionException == null) {
@@ -344,7 +387,7 @@ class AsyncBatchRpcRetryingCaller<T> {
} else {
error = translateException(regionException);
}
- logException(tries, () -> Stream.of(regionReq), error, serverName);
+ logRegionsException(tries, () -> Stream.of(regionReq), error,
serverName);
conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
failAll(regionReq.actions.stream(), tries, error, serverName);
@@ -453,7 +496,7 @@ class AsyncBatchRpcRetryingCaller<T> {
private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries,
Throwable t,
ServerName serverName) {
Throwable error = translateException(t);
- logException(tries, () -> actionsByRegion.values().stream(), error,
serverName);
+ logRegionsException(tries, () -> actionsByRegion.values().stream(), error,
serverName);
actionsByRegion.forEach(
(rn, regionReq) ->
conn.getLocator().updateCachedLocationOnError(regionReq.loc, error));
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBatchRpcRetryingCaller.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBatchRpcRetryingCaller.java
new file mode 100644
index 00000000000..0d37ee454b1
--- /dev/null
+++
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBatchRpcRetryingCaller.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.IdentityHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import
org.apache.hadoop.hbase.client.AsyncBatchRpcRetryingCaller.RegionRequest;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestAsyncBatchRpcRetryingCaller {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncBatchRpcRetryingCaller.class);
+
+ private org.apache.logging.log4j.core.Appender mockAppender;
+
+ @Before
+ public void setUp() {
+ mockAppender = mock(org.apache.logging.log4j.core.Appender.class);
+ when(mockAppender.getName()).thenReturn("mockAppender");
+ when(mockAppender.isStarted()).thenReturn(true);
+ ((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
+ .getLogger(AsyncBatchRpcRetryingCaller.class)).addAppender(mockAppender);
+
+ }
+
+ @After
+ public void tearDown() {
+ ((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
+
.getLogger(AsyncBatchRpcRetryingCaller.class)).removeAppender(mockAppender);
+ }
+
+ @Test
+ public void testLogAction() {
+ AtomicReference<org.apache.logging.log4j.Level> level = new
AtomicReference<>();
+ AtomicReference<String> msg = new AtomicReference<String>();
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ org.apache.logging.log4j.core.LogEvent logEvent =
+ invocation.getArgument(0,
org.apache.logging.log4j.core.LogEvent.class);
+ level.set(logEvent.getLevel());
+ msg.set(logEvent.getMessage().getFormattedMessage());
+ return null;
+ }
+
}).when(mockAppender).append(any(org.apache.logging.log4j.core.LogEvent.class));
+ TableName tn = TableName.valueOf("async");
+ ServerName sn = ServerName.valueOf("host", 12345,
EnvironmentEdgeManager.currentTime());
+ RegionRequest request =
+ new RegionRequest(new
HRegionLocation(RegionInfoBuilder.newBuilder(tn).build(), sn));
+ Action put = new Action(new Put(Bytes.toBytes("a")), 0);
+ Action get = new Action(new Get(Bytes.toBytes("b")), 1);
+ Action incr = new Action(new Increment(Bytes.toBytes("c")), 2);
+ Action del = new Action(new Delete(Bytes.toBytes("d")), 3);
+ request.actions.add(put);
+ request.actions.add(get);
+ request.actions.add(incr);
+ request.actions.add(del);
+ IdentityHashMap<Action, Throwable> action2Error = new IdentityHashMap<>();
+ AsyncBatchRpcRetryingCaller.logActionsException(1, 2, request,
action2Error, sn);
+ verify(mockAppender, never()).append(any());
+ AsyncBatchRpcRetryingCaller.logActionsException(5, 4, request,
action2Error, sn);
+ verify(mockAppender, never()).append(any());
+
+ action2Error.put(get, new IOException("get error"));
+ action2Error.put(incr, new IOException("incr error"));
+ AsyncBatchRpcRetryingCaller.logActionsException(5, 4, request,
action2Error, sn);
+ verify(mockAppender, times(1)).append(any());
+ assertEquals(org.apache.logging.log4j.Level.WARN, level.get());
+
+ String logMsg = msg.get();
+ assertThat(logMsg,
+ startsWith("Process batch for " +
request.loc.getRegion().getRegionNameAsString() + " on "
+ + sn.toString() + ", 2/4 actions failed, tries=5, sampled 2 errors:"));
+ assertThat(logMsg, containsString("=> java.io.IOException: get error"));
+ assertThat(logMsg, containsString("=> java.io.IOException: incr error"));
+ }
+}