This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new a38281707f8 HBASE-29645 AsyncBufferedMutatorImpl concurrency
improvement
a38281707f8 is described below
commit a38281707f896b170dfe121367e4fff3e508c6ff
Author: Andrew Purtell <[email protected]>
AuthorDate: Thu Dec 4 22:02:50 2025 +0800
HBASE-29645 AsyncBufferedMutatorImpl concurrency improvement
Close #7516
Close #7363
Co-authored-by: Duo Zhang <[email protected]>
Co-authored-by: David Manning <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
(cherry picked from commit 6ace9b895954a0fac67f70180078658c20b565da)
---
.../hbase/client/AsyncBufferedMutatorImpl.java | 154 ++++++++++++++++-----
.../hbase/master/assignment/AssignmentManager.java | 4 +-
.../hbase/client/TestAsyncBufferMutator.java | 23 +--
3 files changed, 135 insertions(+), 46 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index e5500b0977b..452318c1ef5 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -27,8 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
@@ -46,6 +45,18 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
private static final Logger LOG =
LoggerFactory.getLogger(AsyncBufferedMutatorImpl.class);
+ private static final int INITIAL_CAPACITY = 100;
+
+ protected static class Batch {
+ final ArrayList<Mutation> toSend;
+ final ArrayList<CompletableFuture<Void>> toComplete;
+
+ Batch(ArrayList<Mutation> toSend, ArrayList<CompletableFuture<Void>>
toComplete) {
+ this.toSend = toSend;
+ this.toComplete = toComplete;
+ }
+ }
+
private final HashedWheelTimer periodicalFlushTimer;
private final AsyncTable<?> table;
@@ -58,16 +69,20 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
private final int maxMutations;
- private List<Mutation> mutations = new ArrayList<>();
+ private ArrayList<Mutation> mutations = new ArrayList<>(INITIAL_CAPACITY);
- private List<CompletableFuture<Void>> futures = new ArrayList<>();
+ private ArrayList<CompletableFuture<Void>> futures = new
ArrayList<>(INITIAL_CAPACITY);
private long bufferedSize;
- private boolean closed;
+ private volatile boolean closed;
+ // Accessed by tests
Timeout periodicFlushTask;
+ // Accessed by tests
+ final ReentrantLock lock = new ReentrantLock();
+
AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer,
AsyncTable<?> table,
long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize,
int maxMutations) {
this.periodicalFlushTimer = periodicalFlushTimer;
@@ -88,23 +103,53 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
return table.getConfiguration();
}
- // will be overridden in test
- protected void internalFlush() {
+ /**
+ * Atomically drains the current buffered mutations and futures under {@link
#lock} and prepares
+ * this mutator to accept a new batch.
+ * <p>
+ * The {@link #lock} must be acquired before calling this method. Cancels
any pending
+ * {@link #periodicFlushTask} to avoid a redundant flush for the data we are
about to send. Swaps
+ * the shared {@link #mutations} and {@link #futures} lists into a returned
{@link Batch},
+ * replaces them with fresh lists, and resets {@link #bufferedSize} to zero.
+ * <p>
+ * If there is nothing buffered, returns {@code null} so callers can skip
sending work.
+ * <p>
+ * Protected for being overridden in tests.
+ * @return a {@link Batch} containing drained mutations and futures, or
{@code null} if empty
+ */
+ protected Batch drainBatch() {
+ ArrayList<Mutation> toSend;
+ ArrayList<CompletableFuture<Void>> toComplete;
+ // Cancel the flush task if it is pending.
if (periodicFlushTask != null) {
periodicFlushTask.cancel();
periodicFlushTask = null;
}
- List<Mutation> toSend = this.mutations;
+ toSend = this.mutations;
if (toSend.isEmpty()) {
- return;
+ return null;
}
- List<CompletableFuture<Void>> toComplete = this.futures;
+ toComplete = this.futures;
assert toSend.size() == toComplete.size();
- this.mutations = new ArrayList<>();
- this.futures = new ArrayList<>();
+ this.mutations = new ArrayList<>(INITIAL_CAPACITY);
+ this.futures = new ArrayList<>(INITIAL_CAPACITY);
bufferedSize = 0L;
- Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator();
- for (CompletableFuture<?> future : table.batch(toSend)) {
+ return new Batch(toSend, toComplete);
+ }
+
+ /**
+ * Sends a previously drained {@link Batch} and wires the user-visible
completion futures to the
+ * underlying results returned by {@link AsyncTable#batch(List)}.
+ * <p>
+ * Preserves the one-to-one, in-order mapping between mutations and their
corresponding futures.
+ * @param batch the drained batch to send; may be {@code null}
+ */
+ private void sendBatch(Batch batch) {
+ if (batch == null) {
+ return;
+ }
+ Iterator<CompletableFuture<Void>> toCompleteIter =
batch.toComplete.iterator();
+ for (CompletableFuture<?> future : table.batch(batch.toSend)) {
CompletableFuture<Void> toCompleteFuture = toCompleteIter.next();
addListener(future, (r, e) -> {
if (e != null) {
@@ -118,9 +163,15 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
@Override
public List<CompletableFuture<Void>> mutate(List<? extends Mutation>
mutations) {
- List<CompletableFuture<Void>> futures =
- Stream.<CompletableFuture<Void>>
generate(CompletableFuture::new).limit(mutations.size())
- .collect(Collectors.toList());
+ List<CompletableFuture<Void>> futures = new ArrayList<>(mutations.size());
+ for (int i = 0, n = mutations.size(); i < n; i++) {
+ futures.add(new CompletableFuture<>());
+ }
+ if (closed) {
+ IOException ioe = new IOException("Already closed");
+ futures.forEach(f -> f.completeExceptionally(ioe));
+ return futures;
+ }
long heapSize = 0;
for (Mutation mutation : mutations) {
heapSize += mutation.heapSize();
@@ -128,23 +179,27 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
validatePut((Put) mutation, maxKeyValueSize);
}
}
- synchronized (this) {
- if (closed) {
- IOException ioe = new IOException("Already closed");
- futures.forEach(f -> f.completeExceptionally(ioe));
- return futures;
- }
+ Batch batch = null;
+ lock.lock();
+ try {
if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) {
periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
- synchronized (AsyncBufferedMutatorImpl.this) {
- // confirm that we are still valid, if there is already an
internalFlush call before us,
- // then we should not execute any more. And in internalFlush we
will set periodicFlush
+ Batch flushBatch = null;
+ lock.lock();
+ try {
+ // confirm that we are still valid, if there is already a
drainBatch call before us,
+ // then we should not execute anymore. And in drainBatch we will
set periodicFlush
// to null, and since we may schedule a new one, so here we check
whether the references
// are equal.
if (timeout == periodicFlushTask) {
periodicFlushTask = null;
- internalFlush();
+ flushBatch = drainBatch(); // Drains under lock
}
+ } finally {
+ lock.unlock();
+ }
+ if (flushBatch != null) {
+ sendBatch(flushBatch); // Sends outside of lock
}
}, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
}
@@ -153,24 +208,55 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
bufferedSize += heapSize;
if (bufferedSize >= writeBufferSize) {
LOG.trace("Flushing because write buffer size {} reached",
writeBufferSize);
- internalFlush();
+ // drain now and send after releasing the lock
+ batch = drainBatch();
} else if (maxMutations > 0 && this.mutations.size() >= maxMutations) {
LOG.trace("Flushing because max mutations {} reached", maxMutations);
- internalFlush();
+ batch = drainBatch();
}
+ } finally {
+ lock.unlock();
+ }
+ // Send outside of lock
+ if (batch != null) {
+ sendBatch(batch);
}
return futures;
}
+ // The only difference bewteen flush and close is that, we will set closed
to true before sending
+ // out the batch to prevent further flush or close
+ private void flushOrClose(boolean close) {
+ Batch batch = null;
+ if (!closed) {
+ lock.lock();
+ try {
+ if (!closed) {
+ // Drains under lock
+ batch = drainBatch();
+ if (close) {
+ closed = true;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ // Send the batch
+ if (batch != null) {
+ // Sends outside of lock
+ sendBatch(batch);
+ }
+ }
+
@Override
- public synchronized void flush() {
- internalFlush();
+ public void flush() {
+ flushOrClose(false);
}
@Override
- public synchronized void close() {
- internalFlush();
- closed = true;
+ public void close() {
+ flushOrClose(true);
}
@Override
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
index c925a9222cc..d88c1e7b86f 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
@@ -2315,8 +2315,8 @@ public class AssignmentManager {
node.crashed(scp.getSubmittedTime());
regionInTransitionTracker.regionCrashed(node);
} else {
- LOG.warn("Region {} should be on crashed region server {} but is
recorded on {}", regionInfo,
- crashedServerName, node.getRegionLocation());
+ LOG.warn("Region {} should be on crashed region server {} but is
recorded on {}",
+ regionInfo, crashedServerName, node.getRegionLocation());
}
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
index 2802c77b5dd..4ef62b1bc36 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
@@ -258,7 +258,7 @@ public class TestAsyncBufferMutator {
private static final class AsyncBufferMutatorForTest extends
AsyncBufferedMutatorImpl {
- private int flushCount;
+ private int drainCount;
AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer,
AsyncTable<?> table,
long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize,
int maxMutation) {
@@ -267,9 +267,9 @@ public class TestAsyncBufferMutator {
}
@Override
- protected void internalFlush() {
- flushCount++;
- super.internalFlush();
+ protected Batch drainBatch() {
+ drainCount++;
+ return super.drainBatch();
}
}
@@ -284,16 +284,19 @@ public class TestAsyncBufferMutator {
Timeout task = mutator.periodicFlushTask;
// we should have scheduled a periodic flush task
assertNotNull(task);
- synchronized (mutator) {
- // synchronized on mutator to prevent periodic flush to be executed
+ // get the lock toprevent periodic flush to be executed
+ mutator.lock.lock();
+ try {
Thread.sleep(500);
// the timeout should be issued
assertTrue(task.isExpired());
- // but no flush is issued as we hold the lock
- assertEquals(0, mutator.flushCount);
+ // but no drain is issued as we hold the lock
+ assertEquals(0, mutator.drainCount);
assertFalse(future.isDone());
- // manually flush, then release the lock
+ // manually flush and drain, then release the lock
mutator.flush();
+ } finally {
+ mutator.lock.unlock();
}
// this is a bit deep into the implementation in netty but anyway let's
add a check here to
// confirm that an issued timeout can not be canceled by netty framework.
@@ -303,7 +306,7 @@ public class TestAsyncBufferMutator {
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
assertArrayEquals(VALUE, table.get(new
Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
// only the manual flush, the periodic flush should have been canceled
by us
- assertEquals(1, mutator.flushCount);
+ assertEquals(1, mutator.drainCount);
}
}
}