This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new a38281707f8 HBASE-29645 AsyncBufferedMutatorImpl concurrency 
improvement
a38281707f8 is described below

commit a38281707f896b170dfe121367e4fff3e508c6ff
Author: Andrew Purtell <[email protected]>
AuthorDate: Thu Dec 4 22:02:50 2025 +0800

    HBASE-29645 AsyncBufferedMutatorImpl concurrency improvement
    
    Close #7516
    Close #7363
    
    Co-authored-by: Duo Zhang <[email protected]>
    Co-authored-by: David Manning <[email protected]>
    
    Signed-off-by: Duo Zhang <[email protected]>
    (cherry picked from commit 6ace9b895954a0fac67f70180078658c20b565da)
---
 .../hbase/client/AsyncBufferedMutatorImpl.java     | 154 ++++++++++++++++-----
 .../hbase/master/assignment/AssignmentManager.java |   4 +-
 .../hbase/client/TestAsyncBufferMutator.java       |  23 +--
 3 files changed, 135 insertions(+), 46 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index e5500b0977b..452318c1ef5 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -27,8 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -46,6 +45,18 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncBufferedMutatorImpl.class);
 
+  private static final int INITIAL_CAPACITY = 100;
+
+  protected static class Batch {
+    final ArrayList<Mutation> toSend;
+    final ArrayList<CompletableFuture<Void>> toComplete;
+
+    Batch(ArrayList<Mutation> toSend, ArrayList<CompletableFuture<Void>> 
toComplete) {
+      this.toSend = toSend;
+      this.toComplete = toComplete;
+    }
+  }
+
   private final HashedWheelTimer periodicalFlushTimer;
 
   private final AsyncTable<?> table;
@@ -58,16 +69,20 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
 
   private final int maxMutations;
 
-  private List<Mutation> mutations = new ArrayList<>();
+  private ArrayList<Mutation> mutations = new ArrayList<>(INITIAL_CAPACITY);
 
-  private List<CompletableFuture<Void>> futures = new ArrayList<>();
+  private ArrayList<CompletableFuture<Void>> futures = new 
ArrayList<>(INITIAL_CAPACITY);
 
   private long bufferedSize;
 
-  private boolean closed;
+  private volatile boolean closed;
 
+  // Accessed by tests
   Timeout periodicFlushTask;
 
+  // Accessed by tests
+  final ReentrantLock lock = new ReentrantLock();
+
   AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, 
AsyncTable<?> table,
     long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, 
int maxMutations) {
     this.periodicalFlushTimer = periodicalFlushTimer;
@@ -88,23 +103,53 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
     return table.getConfiguration();
   }
 
-  // will be overridden in test
-  protected void internalFlush() {
+  /**
+   * Atomically drains the current buffered mutations and futures under {@link 
#lock} and prepares
+   * this mutator to accept a new batch.
+   * <p>
+   * The {@link #lock} must be acquired before calling this method. Cancels 
any pending
+   * {@link #periodicFlushTask} to avoid a redundant flush for the data we are 
about to send. Swaps
+   * the shared {@link #mutations} and {@link #futures} lists into a returned 
{@link Batch},
+   * replaces them with fresh lists, and resets {@link #bufferedSize} to zero.
+   * <p>
+   * If there is nothing buffered, returns {@code null} so callers can skip 
sending work.
+   * <p>
+   * Protected for being overridden in tests.
+   * @return a {@link Batch} containing drained mutations and futures, or 
{@code null} if empty
+   */
+  protected Batch drainBatch() {
+    ArrayList<Mutation> toSend;
+    ArrayList<CompletableFuture<Void>> toComplete;
+    // Cancel the flush task if it is pending.
     if (periodicFlushTask != null) {
       periodicFlushTask.cancel();
       periodicFlushTask = null;
     }
-    List<Mutation> toSend = this.mutations;
+    toSend = this.mutations;
     if (toSend.isEmpty()) {
-      return;
+      return null;
     }
-    List<CompletableFuture<Void>> toComplete = this.futures;
+    toComplete = this.futures;
     assert toSend.size() == toComplete.size();
-    this.mutations = new ArrayList<>();
-    this.futures = new ArrayList<>();
+    this.mutations = new ArrayList<>(INITIAL_CAPACITY);
+    this.futures = new ArrayList<>(INITIAL_CAPACITY);
     bufferedSize = 0L;
-    Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator();
-    for (CompletableFuture<?> future : table.batch(toSend)) {
+    return new Batch(toSend, toComplete);
+  }
+
+  /**
+   * Sends a previously drained {@link Batch} and wires the user-visible 
completion futures to the
+   * underlying results returned by {@link AsyncTable#batch(List)}.
+   * <p>
+   * Preserves the one-to-one, in-order mapping between mutations and their 
corresponding futures.
+   * @param batch the drained batch to send; may be {@code null}
+   */
+  private void sendBatch(Batch batch) {
+    if (batch == null) {
+      return;
+    }
+    Iterator<CompletableFuture<Void>> toCompleteIter = 
batch.toComplete.iterator();
+    for (CompletableFuture<?> future : table.batch(batch.toSend)) {
       CompletableFuture<Void> toCompleteFuture = toCompleteIter.next();
       addListener(future, (r, e) -> {
         if (e != null) {
@@ -118,9 +163,15 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
 
   @Override
   public List<CompletableFuture<Void>> mutate(List<? extends Mutation> 
mutations) {
-    List<CompletableFuture<Void>> futures =
-      Stream.<CompletableFuture<Void>> 
generate(CompletableFuture::new).limit(mutations.size())
-        .collect(Collectors.toList());
+    List<CompletableFuture<Void>> futures = new ArrayList<>(mutations.size());
+    for (int i = 0, n = mutations.size(); i < n; i++) {
+      futures.add(new CompletableFuture<>());
+    }
+    if (closed) {
+      IOException ioe = new IOException("Already closed");
+      futures.forEach(f -> f.completeExceptionally(ioe));
+      return futures;
+    }
     long heapSize = 0;
     for (Mutation mutation : mutations) {
       heapSize += mutation.heapSize();
@@ -128,23 +179,27 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
         validatePut((Put) mutation, maxKeyValueSize);
       }
     }
-    synchronized (this) {
-      if (closed) {
-        IOException ioe = new IOException("Already closed");
-        futures.forEach(f -> f.completeExceptionally(ioe));
-        return futures;
-      }
+    Batch batch = null;
+    lock.lock();
+    try {
       if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) {
         periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
-          synchronized (AsyncBufferedMutatorImpl.this) {
-            // confirm that we are still valid, if there is already an 
internalFlush call before us,
-            // then we should not execute any more. And in internalFlush we 
will set periodicFlush
+          Batch flushBatch = null;
+          lock.lock();
+          try {
+            // confirm that we are still valid, if there is already a 
drainBatch call before us,
+            // then we should not execute anymore. And in drainBatch we will 
set periodicFlush
             // to null, and since we may schedule a new one, so here we check 
whether the references
             // are equal.
             if (timeout == periodicFlushTask) {
               periodicFlushTask = null;
-              internalFlush();
+              flushBatch = drainBatch(); // Drains under lock
             }
+          } finally {
+            lock.unlock();
+          }
+          if (flushBatch != null) {
+            sendBatch(flushBatch); // Sends outside of lock
           }
         }, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
       }
@@ -153,24 +208,55 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
       bufferedSize += heapSize;
       if (bufferedSize >= writeBufferSize) {
         LOG.trace("Flushing because write buffer size {} reached", 
writeBufferSize);
-        internalFlush();
+        // drain now and send after releasing the lock
+        batch = drainBatch();
       } else if (maxMutations > 0 && this.mutations.size() >= maxMutations) {
         LOG.trace("Flushing because max mutations {} reached", maxMutations);
-        internalFlush();
+        batch = drainBatch();
       }
+    } finally {
+      lock.unlock();
+    }
+    // Send outside of lock
+    if (batch != null) {
+      sendBatch(batch);
     }
     return futures;
   }
 
+  // The only difference bewteen flush and close is that, we will set closed 
to true before sending
+  // out the batch to prevent further flush or close
+  private void flushOrClose(boolean close) {
+    Batch batch = null;
+    if (!closed) {
+      lock.lock();
+      try {
+        if (!closed) {
+          // Drains under lock
+          batch = drainBatch();
+          if (close) {
+            closed = true;
+          }
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+    // Send the batch
+    if (batch != null) {
+      // Sends outside of lock
+      sendBatch(batch);
+    }
+  }
+
   @Override
-  public synchronized void flush() {
-    internalFlush();
+  public void flush() {
+    flushOrClose(false);
   }
 
   @Override
-  public synchronized void close() {
-    internalFlush();
-    closed = true;
+  public void close() {
+    flushOrClose(true);
   }
 
   @Override
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
index c925a9222cc..d88c1e7b86f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
@@ -2315,8 +2315,8 @@ public class AssignmentManager {
         node.crashed(scp.getSubmittedTime());
         regionInTransitionTracker.regionCrashed(node);
       } else {
-        LOG.warn("Region {} should be on crashed region server {} but is 
recorded on {}", regionInfo,
-          crashedServerName, node.getRegionLocation());
+        LOG.warn("Region {} should be on crashed region server {} but is 
recorded on {}",
+          regionInfo, crashedServerName, node.getRegionLocation());
       }
     }
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
index 2802c77b5dd..4ef62b1bc36 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
@@ -258,7 +258,7 @@ public class TestAsyncBufferMutator {
 
   private static final class AsyncBufferMutatorForTest extends 
AsyncBufferedMutatorImpl {
 
-    private int flushCount;
+    private int drainCount;
 
     AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, 
AsyncTable<?> table,
       long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, 
int maxMutation) {
@@ -267,9 +267,9 @@ public class TestAsyncBufferMutator {
     }
 
     @Override
-    protected void internalFlush() {
-      flushCount++;
-      super.internalFlush();
+    protected Batch drainBatch() {
+      drainCount++;
+      return super.drainBatch();
     }
   }
 
@@ -284,16 +284,19 @@ public class TestAsyncBufferMutator {
       Timeout task = mutator.periodicFlushTask;
       // we should have scheduled a periodic flush task
       assertNotNull(task);
-      synchronized (mutator) {
-        // synchronized on mutator to prevent periodic flush to be executed
+      // get the lock toprevent periodic flush to be executed
+      mutator.lock.lock();
+      try {
         Thread.sleep(500);
         // the timeout should be issued
         assertTrue(task.isExpired());
-        // but no flush is issued as we hold the lock
-        assertEquals(0, mutator.flushCount);
+        // but no drain is issued as we hold the lock
+        assertEquals(0, mutator.drainCount);
         assertFalse(future.isDone());
-        // manually flush, then release the lock
+        // manually flush and drain, then release the lock
         mutator.flush();
+      } finally {
+        mutator.lock.unlock();
       }
       // this is a bit deep into the implementation in netty but anyway let's 
add a check here to
       // confirm that an issued timeout can not be canceled by netty framework.
@@ -303,7 +306,7 @@ public class TestAsyncBufferMutator {
       AsyncTable<?> table = CONN.getTable(TABLE_NAME);
       assertArrayEquals(VALUE, table.get(new 
Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
       // only the manual flush, the periodic flush should have been canceled 
by us
-      assertEquals(1, mutator.flushCount);
+      assertEquals(1, mutator.drainCount);
     }
   }
 }

Reply via email to