This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 58f067d  HBASE-25998: Redo synchronization in SyncFuture (#3400)
58f067d is described below

commit 58f067dc4d29a1fdfe59d3cf6541a11cb10425c8
Author: Bharath Vissapragada <[email protected]>
AuthorDate: Fri Jun 18 10:37:01 2021 -0700

    HBASE-25998: Redo synchronization in SyncFuture (#3400)
    
    Currently uses coarse grained synchronized approach that seems to
    create a lot of contention. This patch
    
    - Uses a reentrant lock instead of synchronized monitor
    - Switches to a condition variable based waiting rather than busy wait
    - Removed synchronization for unnecessary fields
    
    Signed-off-by: Michael Stack <[email protected]>
    Signed-off-by: Andrew Purtell <[email protected]>
    Signed-off-by: Duo Zhang <[email protected]>
    Signed-off-by: Viraj Jasani <[email protected]>
    (cherry picked from commit 6bafb596421974717697b28d0856453245759c15)
---
 .../hbase/regionserver/wal/AbstractFSWAL.java      |   2 +-
 .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java  |  10 +-
 .../hadoop/hbase/regionserver/wal/FSHLog.java      |   5 +-
 .../hadoop/hbase/regionserver/wal/SyncFuture.java  | 143 +++++++++++++--------
 .../hbase/regionserver/wal/TestSyncFuture.java     |   4 +-
 .../regionserver/wal/TestSyncFutureCache.java      |   7 +-
 6 files changed, 101 insertions(+), 70 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 20600fc..6eb435d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -961,7 +961,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
   }
 
   protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
-    return 
syncFutureCache.getIfPresentOrNew().reset(sequence).setForceSync(forceSync);
+    return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync);
   }
 
   protected boolean isLogRollRequested() {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 3bb0a63..ae26a47 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -31,7 +31,6 @@ import java.util.Comparator;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
-import java.util.OptionalLong;
 import java.util.Queue;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -133,10 +132,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> 
{
 
   private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class);
 
-  private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> {
-    int c = Long.compare(o1.getTxid(), o2.getTxid());
-    return c != 0 ? c : Integer.compare(System.identityHashCode(o1), 
System.identityHashCode(o2));
-  };
+  private static final Comparator<SyncFuture> SEQ_COMPARATOR = 
Comparator.comparingLong(
+      SyncFuture::getTxid).thenComparingInt(System::identityHashCode);
 
   public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
   public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
@@ -373,7 +370,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
   // sync futures then just use the default one.
   private boolean isHsync(long beginTxid, long endTxid) {
     SortedSet<SyncFuture> futures =
-      syncFutures.subSet(new SyncFuture().reset(beginTxid), new 
SyncFuture().reset(endTxid + 1));
+      syncFutures.subSet(new SyncFuture().reset(beginTxid, false),
+          new SyncFuture().reset(endTxid + 1, false));
     if (futures.isEmpty()) {
       return useHsync;
     }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index ba1c640..690f545 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -888,8 +888,9 @@ public class FSHLog extends AbstractFSWAL<Writer> {
     private volatile CountDownLatch safePointReleasedLatch = new 
CountDownLatch(1);
 
     private void checkIfSyncFailed(SyncFuture syncFuture) throws 
FailedSyncBeforeLogCloseException {
-      if (syncFuture.isThrowable()) {
-        throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
+      Throwable t = syncFuture.getThrowable();
+      if (t != null) {
+        throw new FailedSyncBeforeLogCloseException(t);
       }
     }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
index edba5df..862e918 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -44,15 +45,23 @@ import org.apache.yetus.audience.InterfaceAudience;
  */
 @InterfaceAudience.Private
 class SyncFuture {
-  // Implementation notes: I tried using a cyclicbarrier in here for handler 
and sync threads
-  // to coordinate on but it did not give any obvious advantage and some 
issues with order in which
-  // events happen.
+
   private static final long NOT_DONE = -1L;
+  private Thread t;
 
   /**
-   * The transaction id of this operation, monotonically increases.
+   * Lock protecting the thread-safe fields.
+   */
+  private final ReentrantLock doneLock;
+
+  /**
+   * Condition to wait on for client threads.
+   */
+  private final Condition doneCondition;
+
+  /*
+   * Fields below are protected by {@link SyncFuture#doneLock}.
    */
-  private long txid;
 
   /**
    * The transaction id that was set in here when we were marked done. Should 
be equal or > txnId.
@@ -65,16 +74,30 @@ class SyncFuture {
    */
   private Throwable throwable;
 
-  private Thread t;
+  /*
+   * Fields below are created once at reset() and accessed without any lock. 
Should be ok as they
+   * are immutable for this instance of sync future until it is reset.
+   */
+
+  /**
+   * The transaction id of this operation, monotonically increases.
+   */
+  private long txid;
 
   private boolean forceSync;
 
+  SyncFuture() {
+    this.doneLock = new ReentrantLock();
+    this.doneCondition = doneLock.newCondition();
+  }
+
   /**
    * Call this method to clear old usage and get it ready for new deploy.
+   *
    * @param txid the new transaction id
    * @return this
    */
-  synchronized SyncFuture reset(long txid) {
+  SyncFuture reset(long txid, boolean forceSync) {
     if (t != null && t != Thread.currentThread()) {
       throw new IllegalStateException();
     }
@@ -83,30 +106,26 @@ class SyncFuture {
       throw new IllegalStateException("" + txid + " " + 
Thread.currentThread());
     }
     this.doneTxid = NOT_DONE;
+    this.forceSync = forceSync;
     this.txid = txid;
     this.throwable = null;
     return this;
   }
 
   @Override
-  public synchronized String toString() {
+  public String toString() {
     return "done=" + isDone() + ", txid=" + this.txid + " threadID=" + 
t.getId() +
         " threadName=" + t.getName();
   }
 
-  synchronized long getTxid() {
+  long getTxid() {
     return this.txid;
   }
 
-  synchronized boolean isForceSync() {
+  boolean isForceSync() {
     return forceSync;
   }
 
-  synchronized SyncFuture setForceSync(boolean forceSync) {
-    this.forceSync = forceSync;
-    return this;
-  }
-
   /**
    * Returns the thread that owned this sync future, use with caution as we 
return the reference to
    * the actual thread object.
@@ -122,55 +141,67 @@ class SyncFuture {
    * @return True if we successfully marked this outstanding future as 
completed/done. Returns false
    *         if this future is already 'done' when this method called.
    */
-  synchronized boolean done(final long txid, final Throwable t) {
-    if (isDone()) {
-      return false;
-    }
-    this.throwable = t;
-    if (txid < this.txid) {
-      // Something badly wrong.
-      if (throwable == null) {
-        this.throwable =
-            new IllegalStateException("done txid=" + txid + ", my txid=" + 
this.txid);
+  boolean done(final long txid, final Throwable t) {
+    doneLock.lock();
+    try {
+      if (doneTxid != NOT_DONE) {
+        return false;
+      }
+      this.throwable = t;
+      if (txid < this.txid) {
+        // Something badly wrong.
+        if (throwable == null) {
+          this.throwable =
+              new IllegalStateException("done txid=" + txid + ", my txid=" + 
this.txid);
+        }
       }
+      // Mark done.
+      this.doneTxid = txid;
+      doneCondition.signalAll();
+      return true;
+    } finally {
+      doneLock.unlock();
     }
-    // Mark done.
-    this.doneTxid = txid;
-    // Wake up waiting threads.
-    notify();
-    return true;
   }
 
-  boolean cancel(boolean mayInterruptIfRunning) {
-    throw new UnsupportedOperationException();
-  }
-
-  synchronized long get(long timeoutNs) throws InterruptedException,
+  long get(long timeoutNs) throws InterruptedException,
       ExecutionException, TimeoutIOException {
-    final long done = System.nanoTime() + timeoutNs;
-    while (!isDone()) {
-      wait(1000);
-      if (System.nanoTime() >= done) {
-        throw new TimeoutIOException(
-            "Failed to get sync result after " + 
TimeUnit.NANOSECONDS.toMillis(timeoutNs)
-                + " ms for txid=" + this.txid + ", WAL system stuck?");
+    doneLock.lock();
+    try {
+      while (doneTxid == NOT_DONE) {
+        if (!doneCondition.await(timeoutNs, TimeUnit.NANOSECONDS)) {
+          throw new TimeoutIOException("Failed to get sync result after "
+              + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + " ms for txid=" + 
this.txid
+              + ", WAL system stuck?");
+        }
       }
+      if (this.throwable != null) {
+        throw new ExecutionException(this.throwable);
+      }
+      return this.doneTxid;
+    } finally {
+      doneLock.unlock();
     }
-    if (this.throwable != null) {
-      throw new ExecutionException(this.throwable);
-    }
-    return this.doneTxid;
   }
 
-  synchronized boolean isDone() {
-    return this.doneTxid != NOT_DONE;
-  }
-
-  synchronized boolean isThrowable() {
-    return isDone() && getThrowable() != null;
+  boolean isDone() {
+    doneLock.lock();
+    try {
+      return this.doneTxid != NOT_DONE;
+    } finally {
+      doneLock.unlock();
+    }
   }
 
-  synchronized Throwable getThrowable() {
-    return this.throwable;
+  Throwable getThrowable() {
+    doneLock.lock();
+    try {
+      if (doneTxid == NOT_DONE) {
+        return null;
+      }
+      return this.throwable;
+    } finally {
+      doneLock.unlock();
+    }
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java
index 1b2477c..6495653 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java
@@ -38,10 +38,10 @@ public class TestSyncFuture {
   public void testGet() throws Exception {
     long timeout = 5000;
     long txid = 100000;
-    SyncFuture syncFulture = new SyncFuture().reset(txid);
+    SyncFuture syncFulture = new SyncFuture().reset(txid, false);
     syncFulture.done(txid, null);
     assertEquals(txid, syncFulture.get(timeout));
 
-    syncFulture.reset(txid).get(timeout);
+    syncFulture.reset(txid, false).get(timeout);
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
index 070aaf2..dd4590a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
@@ -42,10 +42,10 @@ public class TestSyncFutureCache {
     final Configuration conf = HBaseConfiguration.create();
     SyncFutureCache cache = new SyncFutureCache(conf);
     try {
-      SyncFuture future0 = cache.getIfPresentOrNew().reset(0);
+      SyncFuture future0 = cache.getIfPresentOrNew().reset(0, false);
       assertNotNull(future0);
       // Get another future from the same thread, should be different one.
-      SyncFuture future1 = cache.getIfPresentOrNew().reset(1);
+      SyncFuture future1 = cache.getIfPresentOrNew().reset(1, false);
       assertNotNull(future1);
       assertNotSame(future0, future1);
       cache.offer(future1);
@@ -55,7 +55,8 @@ public class TestSyncFutureCache {
       assertEquals(future3, future0);
       final SyncFuture[] future4 = new SyncFuture[1];
       // From a different thread
-      CompletableFuture.runAsync(() -> future4[0] = 
cache.getIfPresentOrNew().reset(4)).get();
+      CompletableFuture.runAsync(() ->
+          future4[0] = cache.getIfPresentOrNew().reset(4, false)).get();
       assertNotNull(future4[0]);
       assertNotSame(future3, future4[0]);
       // Clean up

Reply via email to