This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 81a3fa36f38 HBASE-29394 Possible race in 
BufferedMutatorOverAsyncBufferedMutator (#7089)
81a3fa36f38 is described below

commit 81a3fa36f389a7d8082b73e3894b2ed57c687311
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Jun 12 11:45:00 2025 +0800

    HBASE-29394 Possible race in BufferedMutatorOverAsyncBufferedMutator (#7089)
    
    Signed-off-by: Nihal Jain <[email protected]>
    (cherry picked from commit c9df60777068065a3c24b13159c414f143491e1e)
---
 .../BufferedMutatorOverAsyncBufferedMutator.java   |  16 +--
 ...estBufferedMutatorOverAsyncBufferedMutator.java | 112 +++++++++++++++++++++
 2 files changed, 121 insertions(+), 7 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java
index aec4a0cbf21..a17d08f3def 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java
@@ -135,13 +135,15 @@ class BufferedMutatorOverAsyncBufferedMutator implements 
BufferedMutator {
       long heapSize = mutation.heapSize();
       bufferedSize.addAndGet(heapSize);
       addListener(fs.get(i), (r, e) -> {
-        futures.remove(toComplete);
-        bufferedSize.addAndGet(-heapSize);
-        if (e != null) {
-          errors.add(Pair.newPair(mutation, e));
-          toComplete.completeExceptionally(e);
-        } else {
-          toComplete.complete(r);
+        synchronized (this) {
+          futures.remove(toComplete);
+          bufferedSize.addAndGet(-heapSize);
+          if (e != null) {
+            errors.add(Pair.newPair(mutation, e));
+            toComplete.completeExceptionally(e);
+          } else {
+            toComplete.complete(r);
+          }
         }
       });
     }
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorOverAsyncBufferedMutator.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorOverAsyncBufferedMutator.java
new file mode 100644
index 00000000000..bcbaadf26dd
--- /dev/null
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorOverAsyncBufferedMutator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.MockedStatic;
+
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestBufferedMutatorOverAsyncBufferedMutator {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    
HBaseClassTestRule.forClass(TestBufferedMutatorOverAsyncBufferedMutator.class);
+
+  private AsyncBufferedMutator asyncMutator;
+
+  private BufferedMutatorOverAsyncBufferedMutator mutator;
+
+  private ExecutorService executor;
+
+  private MockedStatic<Pair> mockedPair;
+
+  @Before
+  public void setUp() {
+    asyncMutator = mock(AsyncBufferedMutator.class);
+    when(asyncMutator.getWriteBufferSize()).thenReturn(1024L * 1024);
+    mutator = new BufferedMutatorOverAsyncBufferedMutator(asyncMutator, (e, m) 
-> {
+      throw e;
+    });
+    executor =
+      Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder().setDaemon(true).build());
+    mockedPair = mockStatic(Pair.class);
+  }
+
+  @After
+  public void tearDown() {
+    mockedPair.closeOnDemand();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testRace() throws IOException {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    when(asyncMutator.mutate(anyList())).thenReturn(Arrays.asList(future));
+    mutator.mutate(new Put(Bytes.toBytes("aaa")));
+    verify(asyncMutator).mutate(anyList());
+    CountDownLatch beforeFlush = new CountDownLatch(1);
+    CountDownLatch afterFlush = new CountDownLatch(1);
+    Future<?> flushFuture = executor.submit(() -> {
+      beforeFlush.await();
+      mutator.flush();
+      afterFlush.countDown();
+      return null;
+    });
+    mockedPair.when(() -> Pair.newPair(any(), any())).then(i -> {
+      beforeFlush.countDown();
+      afterFlush.await(5, TimeUnit.SECONDS);
+      return i.callRealMethod();
+    });
+    future.completeExceptionally(new IOException("inject error"));
+    ExecutionException error = assertThrows(ExecutionException.class, () -> 
flushFuture.get());
+    assertThat(error.getCause(), 
instanceOf(RetriesExhaustedWithDetailsException.class));
+    assertEquals("inject error",
+      ((RetriesExhaustedWithDetailsException) 
error.getCause()).getCause(0).getMessage());
+  }
+}

Reply via email to