This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-3 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit eac7db2ac44188da5f7d8342251502cae1ce962f Author: Andrew Purtell <[email protected]> AuthorDate: Thu Dec 4 22:02:50 2025 +0800 HBASE-29645 AsyncBufferedMutatorImpl concurrency improvement Close #7516 Close #7363 Co-authored-by: Duo Zhang <[email protected]> Co-authored-by: David Manning <[email protected]> Signed-off-by: Duo Zhang <[email protected]> (cherry picked from commit 6ace9b895954a0fac67f70180078658c20b565da) --- .../hbase/client/AsyncBufferedMutatorImpl.java | 154 ++++++++++++++++----- .../hbase/client/TestAsyncBufferMutator.java | 23 +-- 2 files changed, 133 insertions(+), 44 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java index 0e7e1b91e44..452318c1ef5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java @@ -27,8 +27,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -46,6 +45,18 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { private static final Logger LOG = LoggerFactory.getLogger(AsyncBufferedMutatorImpl.class); + private static final int INITIAL_CAPACITY = 100; + + protected static class Batch { + final ArrayList<Mutation> toSend; + final ArrayList<CompletableFuture<Void>> toComplete; + + Batch(ArrayList<Mutation> toSend, ArrayList<CompletableFuture<Void>> toComplete) { + this.toSend = toSend; + this.toComplete = toComplete; + } + } + private final HashedWheelTimer periodicalFlushTimer; private final AsyncTable<?> table; @@ -58,16 +69,20 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { private final int maxMutations; - private List<Mutation> mutations = new ArrayList<>(); + private ArrayList<Mutation> mutations = new ArrayList<>(INITIAL_CAPACITY); - private List<CompletableFuture<Void>> futures = new ArrayList<>(); + private ArrayList<CompletableFuture<Void>> futures = new ArrayList<>(INITIAL_CAPACITY); private long bufferedSize; - private boolean closed; + private volatile boolean closed; + // Accessed by tests Timeout periodicFlushTask; + // Accessed by tests + final ReentrantLock lock = new ReentrantLock(); + AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table, long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, int maxMutations) { this.periodicalFlushTimer = periodicalFlushTimer; @@ -88,23 +103,53 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { return table.getConfiguration(); } - // will be overridden in test - protected void internalFlush() { + /** + * Atomically drains the current buffered mutations and futures under {@link #lock} and prepares + * this mutator to accept a new batch. + * <p> + * The {@link #lock} must be acquired before calling this method. Cancels any pending + * {@link #periodicFlushTask} to avoid a redundant flush for the data we are about to send. Swaps + * the shared {@link #mutations} and {@link #futures} lists into a returned {@link Batch}, + * replaces them with fresh lists, and resets {@link #bufferedSize} to zero. + * <p> + * If there is nothing buffered, returns {@code null} so callers can skip sending work. + * <p> + * Protected for being overridden in tests. + * @return a {@link Batch} containing drained mutations and futures, or {@code null} if empty + */ + protected Batch drainBatch() { + ArrayList<Mutation> toSend; + ArrayList<CompletableFuture<Void>> toComplete; + // Cancel the flush task if it is pending. if (periodicFlushTask != null) { periodicFlushTask.cancel(); periodicFlushTask = null; } - List<Mutation> toSend = this.mutations; + toSend = this.mutations; if (toSend.isEmpty()) { - return; + return null; } - List<CompletableFuture<Void>> toComplete = this.futures; + toComplete = this.futures; assert toSend.size() == toComplete.size(); - this.mutations = new ArrayList<>(); - this.futures = new ArrayList<>(); + this.mutations = new ArrayList<>(INITIAL_CAPACITY); + this.futures = new ArrayList<>(INITIAL_CAPACITY); bufferedSize = 0L; - Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator(); - for (CompletableFuture<?> future : table.batch(toSend)) { + return new Batch(toSend, toComplete); + } + + /** + * Sends a previously drained {@link Batch} and wires the user-visible completion futures to the + * underlying results returned by {@link AsyncTable#batch(List)}. + * <p> + * Preserves the one-to-one, in-order mapping between mutations and their corresponding futures. + * @param batch the drained batch to send; may be {@code null} + */ + private void sendBatch(Batch batch) { + if (batch == null) { + return; + } + Iterator<CompletableFuture<Void>> toCompleteIter = batch.toComplete.iterator(); + for (CompletableFuture<?> future : table.batch(batch.toSend)) { CompletableFuture<Void> toCompleteFuture = toCompleteIter.next(); addListener(future, (r, e) -> { if (e != null) { @@ -118,9 +163,15 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { @Override public List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations) { - List<CompletableFuture<Void>> futures = - Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size()) - .collect(Collectors.toList()); + List<CompletableFuture<Void>> futures = new ArrayList<>(mutations.size()); + for (int i = 0, n = mutations.size(); i < n; i++) { + futures.add(new CompletableFuture<>()); + } + if (closed) { + IOException ioe = new IOException("Already closed"); + futures.forEach(f -> f.completeExceptionally(ioe)); + return futures; + } long heapSize = 0; for (Mutation mutation : mutations) { heapSize += mutation.heapSize(); @@ -128,23 +179,27 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { validatePut((Put) mutation, maxKeyValueSize); } } - synchronized (this) { - if (closed) { - IOException ioe = new IOException("Already closed"); - futures.forEach(f -> f.completeExceptionally(ioe)); - return futures; - } + Batch batch = null; + lock.lock(); + try { if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) { periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> { - synchronized (AsyncBufferedMutatorImpl.this) { - // confirm that we are still valid, if there is already an internalFlush call before us, - // then we should not execute anymore. And in internalFlush we will set periodicFlush + Batch flushBatch = null; + lock.lock(); + try { + // confirm that we are still valid, if there is already a drainBatch call before us, + // then we should not execute anymore. And in drainBatch we will set periodicFlush // to null, and since we may schedule a new one, so here we check whether the references // are equal. if (timeout == periodicFlushTask) { periodicFlushTask = null; - internalFlush(); + flushBatch = drainBatch(); // Drains under lock } + } finally { + lock.unlock(); + } + if (flushBatch != null) { + sendBatch(flushBatch); // Sends outside of lock } }, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS); } @@ -153,24 +208,55 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { bufferedSize += heapSize; if (bufferedSize >= writeBufferSize) { LOG.trace("Flushing because write buffer size {} reached", writeBufferSize); - internalFlush(); + // drain now and send after releasing the lock + batch = drainBatch(); } else if (maxMutations > 0 && this.mutations.size() >= maxMutations) { LOG.trace("Flushing because max mutations {} reached", maxMutations); - internalFlush(); + batch = drainBatch(); } + } finally { + lock.unlock(); + } + // Send outside of lock + if (batch != null) { + sendBatch(batch); } return futures; } + // The only difference bewteen flush and close is that, we will set closed to true before sending + // out the batch to prevent further flush or close + private void flushOrClose(boolean close) { + Batch batch = null; + if (!closed) { + lock.lock(); + try { + if (!closed) { + // Drains under lock + batch = drainBatch(); + if (close) { + closed = true; + } + } + } finally { + lock.unlock(); + } + } + // Send the batch + if (batch != null) { + // Sends outside of lock + sendBatch(batch); + } + } + @Override - public synchronized void flush() { - internalFlush(); + public void flush() { + flushOrClose(false); } @Override - public synchronized void close() { - internalFlush(); - closed = true; + public void close() { + flushOrClose(true); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java index 08e68def2c4..a6a6333cd08 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java @@ -258,7 +258,7 @@ public class TestAsyncBufferMutator { private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl { - private int flushCount; + private int drainCount; AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table, long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, int maxMutation) { @@ -267,9 +267,9 @@ public class TestAsyncBufferMutator { } @Override - protected void internalFlush() { - flushCount++; - super.internalFlush(); + protected Batch drainBatch() { + drainCount++; + return super.drainBatch(); } } @@ -284,16 +284,19 @@ public class TestAsyncBufferMutator { Timeout task = mutator.periodicFlushTask; // we should have scheduled a periodic flush task assertNotNull(task); - synchronized (mutator) { - // synchronized on mutator to prevent periodic flush to be executed + // get the lock toprevent periodic flush to be executed + mutator.lock.lock(); + try { Thread.sleep(500); // the timeout should be issued assertTrue(task.isExpired()); - // but no flush is issued as we hold the lock - assertEquals(0, mutator.flushCount); + // but no drain is issued as we hold the lock + assertEquals(0, mutator.drainCount); assertFalse(future.isDone()); - // manually flush, then release the lock + // manually flush and drain, then release the lock mutator.flush(); + } finally { + mutator.lock.unlock(); } // this is a bit deep into the implementation in netty but anyway let's add a check here to // confirm that an issued timeout can not be canceled by netty framework. @@ -303,7 +306,7 @@ public class TestAsyncBufferMutator { AsyncTable<?> table = CONN.getTable(TABLE_NAME); assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); // only the manual flush, the periodic flush should have been canceled by us - assertEquals(1, mutator.flushCount); + assertEquals(1, mutator.drainCount); } } }
