This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 2e24bad  HBASE-25984: Avoid premature reuse of sync futures in FSHLog 
(#3371) (#3398)
2e24bad is described below

commit 2e24bad8261d9f9558e5c4d63e60a1ccf38d0514
Author: Bharath Vissapragada <[email protected]>
AuthorDate: Fri Jun 18 17:42:15 2021 -0700

    HBASE-25984: Avoid premature reuse of sync futures in FSHLog (#3371) (#3398)
    
    Signed-off-by: Viraj Jasani [email protected]
    (cherry picked from commit 5a19bcf)
---
 .../hadoop/hbase/regionserver/wal/FSHLog.java      | 48 +++++++-----
 .../hadoop/hbase/regionserver/wal/SyncFuture.java  | 12 ++-
 .../hbase/regionserver/wal/SyncFutureCache.java    | 73 ++++++++++++++++++
 .../hadoop/hbase/regionserver/wal/TestFSHLog.java  | 87 ++++++++++++++++++++++
 .../regionserver/wal/TestSyncFutureCache.java      | 68 +++++++++++++++++
 5 files changed, 267 insertions(+), 21 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index bb09186..717d3a8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -202,12 +202,7 @@ public class FSHLog implements WAL {
    */
   private final RingBufferEventHandler ringBufferEventHandler;
 
-  /**
-   * Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse 
SyncFutures.
-   * Thread local is used so JVM can GC the terminated thread for us. See 
HBASE-21228
-   * <p>
-   */
-  private final ThreadLocal<SyncFuture> cachedSyncFutures;
+  private final SyncFutureCache syncFutureCache;
 
   /**
    * The highest known outstanding unsync'd WALEdit sequence number where 
sequence number is the
@@ -597,12 +592,7 @@ public class FSHLog implements WAL {
     this.ringBufferEventHandler = new RingBufferEventHandler(syncerCount, 
maxBatchCount);
     this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
     this.disruptor.handleEventsWith(new RingBufferEventHandler [] 
{this.ringBufferEventHandler});
-    this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
-      @Override
-      protected SyncFuture initialValue() {
-        return new SyncFuture();
-      }
-    };
+    this.syncFutureCache = new SyncFutureCache(conf);
     // Starting up threads in constructor is a no no; Interface should have an 
init call.
     this.disruptor.start();
   }
@@ -1126,6 +1116,10 @@ public class FSHLog implements WAL {
       // With disruptor down, this is safe to let go.
       if (this.appendExecutor !=  null) this.appendExecutor.shutdown();
 
+      if (syncFutureCache != null) {
+        syncFutureCache.clear();
+      }
+
       // Tell our listeners that the log is closing
       if (!this.listeners.isEmpty()) {
         for (WALActionsListener i : this.listeners) {
@@ -1496,7 +1490,8 @@ public class FSHLog implements WAL {
     return this.disruptor.getRingBuffer().next();
   }
 
-  private SyncFuture publishSyncOnRingBuffer(Span span, boolean forceSync) {
+  @InterfaceAudience.Private
+  public SyncFuture publishSyncOnRingBuffer(Span span, boolean forceSync) {
     long sequence = this.disruptor.getRingBuffer().next();
     return publishSyncOnRingBuffer(sequence, span, forceSync);
   }
@@ -1523,10 +1518,6 @@ public class FSHLog implements WAL {
       syncFuture.get(walSyncTimeout);
       return syncFuture.getSpan();
     } catch (TimeoutIOException tioe) {
-      // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
-      // still refer to it, so if this thread use it next time may get a wrong
-      // result.
-      this.cachedSyncFutures.remove();
       throw tioe;
     } catch (InterruptedException ie) {
       LOG.warn("Interrupted", ie);
@@ -1544,7 +1535,7 @@ public class FSHLog implements WAL {
   }
 
   private SyncFuture getSyncFuture(final long sequence, Span span) {
-    return cachedSyncFutures.get().reset(sequence);
+    return syncFutureCache.getIfPresentOrNew().reset(sequence);
   }
 
   private void postSync(final long timeInNanos, final int handlerSyncs) {
@@ -1815,6 +1806,10 @@ public class FSHLog implements WAL {
       return syncFuture;
     }
 
+    boolean isSafePointAttained() {
+      return safePointAttainedLatch.getCount() == 0;
+    }
+
     /**
      * Called by Thread B when it attains the 'safe point'.  In this method, 
Thread B signals
      * Thread A it can proceed. Thread B will be held in here until {@link 
#releaseSafePoint()}
@@ -1902,7 +1897,7 @@ public class FSHLog implements WAL {
       for (int i = 0; i < this.syncFuturesCount.get(); i++) {
         this.syncFutures[i].done(sequence, e);
       }
-      this.syncFuturesCount.set(0);
+      offerDoneSyncsBackToCache();
     }
 
     /**
@@ -2018,12 +2013,25 @@ public class FSHLog implements WAL {
               new DamagedWALException("On sync", this.exception));
         }
         attainSafePoint(sequence);
-        this.syncFuturesCount.set(0);
+        // It is critical that we offer the futures back to the cache for 
reuse here after the
+        // safe point is attained and all the clean up has been done. There 
have been
+        // issues with reusing sync futures early causing WAL lockups, see 
HBASE-25984.
+        offerDoneSyncsBackToCache();
       } catch (Throwable t) {
         LOG.error("UNEXPECTED!!! syncFutures.length=" + 
this.syncFutures.length, t);
       }
     }
 
+    /**
+     * Offers the finished syncs back to the cache for reuse.
+     */
+    private void offerDoneSyncsBackToCache() {
+      for (int i = 0; i < this.syncFuturesCount.get(); i++) {
+        syncFutureCache.offer(syncFutures[i]);
+      }
+      this.syncFuturesCount.set(0);
+    }
+
     SafePointZigZagLatch attainSafePoint() {
       this.zigzagLatch = new SafePointZigZagLatch();
       return this.zigzagLatch;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
index c9e893c..05798b6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
@@ -115,7 +115,8 @@ class SyncFuture {
 
   @Override
   public synchronized String toString() {
-    return "done=" + isDone() + ", ringBufferSequence=" + 
this.ringBufferSequence;
+    return "done=" + isDone() + ", ringBufferSequence=" + 
this.ringBufferSequence +
+      " threadID=" + t.getId() + " threadName=" + t.getName();
   }
 
   synchronized long getRingBufferSequence() {
@@ -191,6 +192,15 @@ class SyncFuture {
     return this.doneSequence;
   }
 
+  /**
+   * Returns the thread that owned this sync future, use with caution as we 
return the reference to
+   * the actual thread object.
+   * @return the associated thread instance.
+   */
+  public Thread getThread() {
+    return t;
+  }
+
   public Long get(long timeout, TimeUnit unit)
   throws InterruptedException, ExecutionException {
     throw new UnsupportedOperationException();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFutureCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFutureCache.java
new file mode 100644
index 0000000..737c9e9
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFutureCache.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+
+/**
+ * A cache of {@link SyncFuture}s.  This class supports two methods
+ * {@link SyncFutureCache#getIfPresentOrNew()} and {@link 
SyncFutureCache#offer(
+ * org.apache.hadoop.hbase.regionserver.wal.SyncFuture)}.
+ *
+ * Usage pattern:
+ *   SyncFuture sf = syncFutureCache.getIfPresentOrNew();
+ *   sf.reset(...);
+ *   // Use the sync future
+ *   finally: syncFutureCache.offer(sf);
+ *
+ * Offering the sync future back to the cache makes it eligible for reuse 
within the same thread
+ * context. Cache keyed by the accessing thread instance and automatically 
invalidated if it remains
+ * unused for {@link SyncFutureCache#SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS} 
minutes.
+ */
+public final class SyncFutureCache {
+
+  private static final long SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS = 2;
+
+  private final Cache<Thread, SyncFuture> syncFutureCache;
+
+  public SyncFutureCache(final Configuration conf) {
+    final int handlerCount = 
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+      HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+    syncFutureCache = CacheBuilder.newBuilder().initialCapacity(handlerCount)
+      .expireAfterWrite(SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS, 
TimeUnit.MINUTES).build();
+  }
+
+  public SyncFuture getIfPresentOrNew() {
+    // Invalidate the entry if a mapping exists. We do not want it to be 
reused at the same time.
+    SyncFuture future = syncFutureCache.asMap().remove(Thread.currentThread());
+    return (future == null) ? new SyncFuture() : future;
+  }
+
+  /**
+   * Offers the sync future back to the cache for reuse.
+   */
+  public void offer(SyncFuture syncFuture) {
+    // It is ok to overwrite an existing mapping.
+    syncFutureCache.asMap().put(syncFuture.getThread(), syncFuture);
+  }
+
+  public void clear() {
+    if (syncFutureCache != null) {
+      syncFutureCache.invalidateAll();
+    }
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 7d9d037..5d7ba92 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -34,6 +35,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.mutable.MutableBoolean;
@@ -53,6 +55,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -69,6 +72,7 @@ import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -85,6 +89,8 @@ import org.junit.rules.TestName;
 public class TestFSHLog {
   private static final Log LOG = LogFactory.getLog(TestFSHLog.class);
 
+  private static final long TEST_TIMEOUT_MS = 10000;
+
   protected static Configuration conf;
   protected static FileSystem fs;
   protected static Path dir;
@@ -162,6 +168,87 @@ public class TestFSHLog {
     }
   }
 
+  /**
+   * Test for WAL stall due to sync future overwrites. See HBASE-25984.
+   */
+  @Test
+  public void testDeadlockWithSyncOverwrites() throws Exception {
+    final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1);
+
+    class FailingWriter implements WALProvider.Writer {
+      @Override public void sync(boolean forceSync) throws IOException {
+        throw new IOException("Injected failure..");
+      }
+
+      @Override public void append(WAL.Entry entry) throws IOException {
+      }
+
+      @Override public long getLength() throws IOException {
+        return 0;
+      }
+      @Override public void close() throws IOException {
+      }
+    }
+
+    /*
+     * Custom FSHLog implementation with a conditional wait before attaining 
safe point.
+     */
+    class CustomFSHLog extends FSHLog {
+      public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String 
archiveDir,
+        Configuration conf, List<WALActionsListener> listeners, boolean 
failIfWALExists,
+        String prefix, String suffix) throws IOException {
+        super(fs, rootDir, logDir, archiveDir, conf, listeners, 
failIfWALExists, prefix, suffix);
+      }
+
+      @Override
+      protected void beforeWaitOnSafePoint() {
+        try {
+          assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS, 
TimeUnit.MILLISECONDS));
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    try (FSHLog log = new CustomFSHLog(fs, walRootDir, dir.toString(),
+      HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null)) {
+      log.setWriter(new FailingWriter());
+      Field ringBufferEventHandlerField =
+        FSHLog.class.getDeclaredField("ringBufferEventHandler");
+      ringBufferEventHandlerField.setAccessible(true);
+      FSHLog.RingBufferEventHandler ringBufferEventHandler =
+        (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
+      // Force a safe point
+      final FSHLog.SafePointZigZagLatch latch = 
ringBufferEventHandler.attainSafePoint();
+      try {
+        final SyncFuture future0 = log.publishSyncOnRingBuffer(null, false);
+        // Wait for the sync to be done.
+        Waiter.waitFor(conf, TEST_TIMEOUT_MS, new 
Waiter.Predicate<Exception>() {
+          @Override
+          public boolean evaluate() throws Exception {
+            return future0.isDone();
+          }
+        });
+        // Publish another sync from the same thread, this should not 
overwrite the done sync.
+        SyncFuture future1 = log.publishSyncOnRingBuffer(null, false);
+        assertFalse(future1.isDone());
+        // Unblock the safe point trigger..
+        blockBeforeSafePoint.countDown();
+        // Wait for the safe point to be reached. With the deadlock in 
HBASE-25984, this is never
+        // possible, thus blocking the sync pipeline.
+        Waiter.waitFor(conf, TEST_TIMEOUT_MS, new 
Waiter.Predicate<Exception>() {
+          @Override
+          public boolean evaluate() throws Exception {
+            return latch.isSafePointAttained();
+          }
+        });
+      } finally {
+        // Force release the safe point, for the clean up.
+        latch.releaseSafePoint();
+      }
+    }
+  }
+
   protected void addEdits(WAL log,
                           HRegionInfo hri,
                           HTableDescriptor htd,
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
new file mode 100644
index 0000000..94d8dc7
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestSyncFutureCache {
+
+  @Test
+  public void testSyncFutureCacheLifeCycle() throws Exception {
+    final Configuration conf = HBaseConfiguration.create();
+    final SyncFutureCache cache = new SyncFutureCache(conf);
+    try {
+      SyncFuture future0 = cache.getIfPresentOrNew().reset(0);
+      assertNotNull(future0);
+      // Get another future from the same thread, should be different one.
+      SyncFuture future1 = cache.getIfPresentOrNew().reset(1);
+      assertNotNull(future1);
+      assertNotSame(future0, future1);
+      cache.offer(future1);
+      // Should override.
+      cache.offer(future0);
+      SyncFuture future3 = cache.getIfPresentOrNew();
+      // Should return the cached entry that was first offered back.
+      assertEquals(future3, future0);
+      final SyncFuture[] future4 = new SyncFuture[1];
+      // From a different thread
+      Thread t = new Thread(new Runnable() {
+        @Override public void run() {
+          future4[0] = cache.getIfPresentOrNew().reset(4);
+        }
+      });
+      t.start();
+      t.join();
+      assertNotNull(future4[0]);
+      assertNotSame(future3, future4[0]);
+      // Clean up
+      cache.offer(future3);
+      cache.offer(future4[0]);
+    } finally {
+      cache.clear();
+    }
+  }
+}

Reply via email to