This is an automated email from the ASF dual-hosted git repository.
bharathv pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 58f067d HBASE-25998: Redo synchronization in SyncFuture (#3400)
58f067d is described below
commit 58f067dc4d29a1fdfe59d3cf6541a11cb10425c8
Author: Bharath Vissapragada <[email protected]>
AuthorDate: Fri Jun 18 10:37:01 2021 -0700
HBASE-25998: Redo synchronization in SyncFuture (#3400)
Currently uses coarse grained synchronized approach that seems to
create a lot of contention. This patch
- Uses a reentrant lock instead of synchronized monitor
- Switches to a condition variable based waiting rather than busy wait
- Removed synchronization for unnecessary fields
Signed-off-by: Michael Stack <[email protected]>
Signed-off-by: Andrew Purtell <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Viraj Jasani <[email protected]>
(cherry picked from commit 6bafb596421974717697b28d0856453245759c15)
---
.../hbase/regionserver/wal/AbstractFSWAL.java | 2 +-
.../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 10 +-
.../hadoop/hbase/regionserver/wal/FSHLog.java | 5 +-
.../hadoop/hbase/regionserver/wal/SyncFuture.java | 143 +++++++++++++--------
.../hbase/regionserver/wal/TestSyncFuture.java | 4 +-
.../regionserver/wal/TestSyncFutureCache.java | 7 +-
6 files changed, 101 insertions(+), 70 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 20600fc..6eb435d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -961,7 +961,7 @@ public abstract class AbstractFSWAL<W extends WriterBase>
implements WAL {
}
protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
- return
syncFutureCache.getIfPresentOrNew().reset(sequence).setForceSync(forceSync);
+ return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync);
}
protected boolean isLogRollRequested() {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 3bb0a63..ae26a47 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -31,7 +31,6 @@ import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
-import java.util.OptionalLong;
import java.util.Queue;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -133,10 +132,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter>
{
private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class);
- private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> {
- int c = Long.compare(o1.getTxid(), o2.getTxid());
- return c != 0 ? c : Integer.compare(System.identityHashCode(o1),
System.identityHashCode(o2));
- };
+ private static final Comparator<SyncFuture> SEQ_COMPARATOR =
Comparator.comparingLong(
+ SyncFuture::getTxid).thenComparingInt(System::identityHashCode);
public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
@@ -373,7 +370,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
// sync futures then just use the default one.
private boolean isHsync(long beginTxid, long endTxid) {
SortedSet<SyncFuture> futures =
- syncFutures.subSet(new SyncFuture().reset(beginTxid), new
SyncFuture().reset(endTxid + 1));
+ syncFutures.subSet(new SyncFuture().reset(beginTxid, false),
+ new SyncFuture().reset(endTxid + 1, false));
if (futures.isEmpty()) {
return useHsync;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index ba1c640..690f545 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -888,8 +888,9 @@ public class FSHLog extends AbstractFSWAL<Writer> {
private volatile CountDownLatch safePointReleasedLatch = new
CountDownLatch(1);
private void checkIfSyncFailed(SyncFuture syncFuture) throws
FailedSyncBeforeLogCloseException {
- if (syncFuture.isThrowable()) {
- throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
+ Throwable t = syncFuture.getThrowable();
+ if (t != null) {
+ throw new FailedSyncBeforeLogCloseException(t);
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
index edba5df..862e918 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.yetus.audience.InterfaceAudience;
@@ -44,15 +45,23 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Private
class SyncFuture {
- // Implementation notes: I tried using a cyclicbarrier in here for handler
and sync threads
- // to coordinate on but it did not give any obvious advantage and some
issues with order in which
- // events happen.
+
private static final long NOT_DONE = -1L;
+ private Thread t;
/**
- * The transaction id of this operation, monotonically increases.
+ * Lock protecting the thread-safe fields.
+ */
+ private final ReentrantLock doneLock;
+
+ /**
+ * Condition to wait on for client threads.
+ */
+ private final Condition doneCondition;
+
+ /*
+ * Fields below are protected by {@link SyncFuture#doneLock}.
*/
- private long txid;
/**
* The transaction id that was set in here when we were marked done. Should
be equal or > txnId.
@@ -65,16 +74,30 @@ class SyncFuture {
*/
private Throwable throwable;
- private Thread t;
+ /*
+ * Fields below are created once at reset() and accessed without any lock.
Should be ok as they
+ * are immutable for this instance of sync future until it is reset.
+ */
+
+ /**
+ * The transaction id of this operation, monotonically increases.
+ */
+ private long txid;
private boolean forceSync;
+ SyncFuture() {
+ this.doneLock = new ReentrantLock();
+ this.doneCondition = doneLock.newCondition();
+ }
+
/**
* Call this method to clear old usage and get it ready for new deploy.
+ *
* @param txid the new transaction id
* @return this
*/
- synchronized SyncFuture reset(long txid) {
+ SyncFuture reset(long txid, boolean forceSync) {
if (t != null && t != Thread.currentThread()) {
throw new IllegalStateException();
}
@@ -83,30 +106,26 @@ class SyncFuture {
throw new IllegalStateException("" + txid + " " +
Thread.currentThread());
}
this.doneTxid = NOT_DONE;
+ this.forceSync = forceSync;
this.txid = txid;
this.throwable = null;
return this;
}
@Override
- public synchronized String toString() {
+ public String toString() {
return "done=" + isDone() + ", txid=" + this.txid + " threadID=" +
t.getId() +
" threadName=" + t.getName();
}
- synchronized long getTxid() {
+ long getTxid() {
return this.txid;
}
- synchronized boolean isForceSync() {
+ boolean isForceSync() {
return forceSync;
}
- synchronized SyncFuture setForceSync(boolean forceSync) {
- this.forceSync = forceSync;
- return this;
- }
-
/**
* Returns the thread that owned this sync future, use with caution as we
return the reference to
* the actual thread object.
@@ -122,55 +141,67 @@ class SyncFuture {
* @return True if we successfully marked this outstanding future as
completed/done. Returns false
* if this future is already 'done' when this method called.
*/
- synchronized boolean done(final long txid, final Throwable t) {
- if (isDone()) {
- return false;
- }
- this.throwable = t;
- if (txid < this.txid) {
- // Something badly wrong.
- if (throwable == null) {
- this.throwable =
- new IllegalStateException("done txid=" + txid + ", my txid=" +
this.txid);
+ boolean done(final long txid, final Throwable t) {
+ doneLock.lock();
+ try {
+ if (doneTxid != NOT_DONE) {
+ return false;
+ }
+ this.throwable = t;
+ if (txid < this.txid) {
+ // Something badly wrong.
+ if (throwable == null) {
+ this.throwable =
+ new IllegalStateException("done txid=" + txid + ", my txid=" +
this.txid);
+ }
}
+ // Mark done.
+ this.doneTxid = txid;
+ doneCondition.signalAll();
+ return true;
+ } finally {
+ doneLock.unlock();
}
- // Mark done.
- this.doneTxid = txid;
- // Wake up waiting threads.
- notify();
- return true;
}
- boolean cancel(boolean mayInterruptIfRunning) {
- throw new UnsupportedOperationException();
- }
-
- synchronized long get(long timeoutNs) throws InterruptedException,
+ long get(long timeoutNs) throws InterruptedException,
ExecutionException, TimeoutIOException {
- final long done = System.nanoTime() + timeoutNs;
- while (!isDone()) {
- wait(1000);
- if (System.nanoTime() >= done) {
- throw new TimeoutIOException(
- "Failed to get sync result after " +
TimeUnit.NANOSECONDS.toMillis(timeoutNs)
- + " ms for txid=" + this.txid + ", WAL system stuck?");
+ doneLock.lock();
+ try {
+ while (doneTxid == NOT_DONE) {
+ if (!doneCondition.await(timeoutNs, TimeUnit.NANOSECONDS)) {
+ throw new TimeoutIOException("Failed to get sync result after "
+ + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + " ms for txid=" +
this.txid
+ + ", WAL system stuck?");
+ }
}
+ if (this.throwable != null) {
+ throw new ExecutionException(this.throwable);
+ }
+ return this.doneTxid;
+ } finally {
+ doneLock.unlock();
}
- if (this.throwable != null) {
- throw new ExecutionException(this.throwable);
- }
- return this.doneTxid;
}
- synchronized boolean isDone() {
- return this.doneTxid != NOT_DONE;
- }
-
- synchronized boolean isThrowable() {
- return isDone() && getThrowable() != null;
+ boolean isDone() {
+ doneLock.lock();
+ try {
+ return this.doneTxid != NOT_DONE;
+ } finally {
+ doneLock.unlock();
+ }
}
- synchronized Throwable getThrowable() {
- return this.throwable;
+ Throwable getThrowable() {
+ doneLock.lock();
+ try {
+ if (doneTxid == NOT_DONE) {
+ return null;
+ }
+ return this.throwable;
+ } finally {
+ doneLock.unlock();
+ }
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java
index 1b2477c..6495653 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java
@@ -38,10 +38,10 @@ public class TestSyncFuture {
public void testGet() throws Exception {
long timeout = 5000;
long txid = 100000;
- SyncFuture syncFulture = new SyncFuture().reset(txid);
+ SyncFuture syncFulture = new SyncFuture().reset(txid, false);
syncFulture.done(txid, null);
assertEquals(txid, syncFulture.get(timeout));
- syncFulture.reset(txid).get(timeout);
+ syncFulture.reset(txid, false).get(timeout);
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
index 070aaf2..dd4590a 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
@@ -42,10 +42,10 @@ public class TestSyncFutureCache {
final Configuration conf = HBaseConfiguration.create();
SyncFutureCache cache = new SyncFutureCache(conf);
try {
- SyncFuture future0 = cache.getIfPresentOrNew().reset(0);
+ SyncFuture future0 = cache.getIfPresentOrNew().reset(0, false);
assertNotNull(future0);
// Get another future from the same thread, should be different one.
- SyncFuture future1 = cache.getIfPresentOrNew().reset(1);
+ SyncFuture future1 = cache.getIfPresentOrNew().reset(1, false);
assertNotNull(future1);
assertNotSame(future0, future1);
cache.offer(future1);
@@ -55,7 +55,8 @@ public class TestSyncFutureCache {
assertEquals(future3, future0);
final SyncFuture[] future4 = new SyncFuture[1];
// From a different thread
- CompletableFuture.runAsync(() -> future4[0] =
cache.getIfPresentOrNew().reset(4)).get();
+ CompletableFuture.runAsync(() ->
+ future4[0] = cache.getIfPresentOrNew().reset(4, false)).get();
assertNotNull(future4[0]);
assertNotSame(future3, future4[0]);
// Clean up