This is an automated email from the ASF dual-hosted git repository.
bharathv pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new 33f175f HBASE-25984: Avoid premature reuse of sync futures in FSHLog
(#3371) (#3394)
33f175f is described below
commit 33f175fa03bdcb1227a86f4d4361d137a453e844
Author: Bharath Vissapragada <[email protected]>
AuthorDate: Thu Jun 17 12:29:02 2021 -0700
HBASE-25984: Avoid premature reuse of sync futures in FSHLog (#3371) (#3394)
Signed-off-by: Viraj Jasani <[email protected]>
(cherry picked from commit 5a19bcfa98b3ccd9f7fb1fb933248c808676d91c)
---
.../hbase/regionserver/wal/AbstractFSWAL.java | 22 ++----
.../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 14 +++-
.../hadoop/hbase/regionserver/wal/FSHLog.java | 23 +++++-
.../hadoop/hbase/regionserver/wal/SyncFuture.java | 12 ++-
.../hbase/regionserver/wal/SyncFutureCache.java | 74 ++++++++++++++++++
.../hadoop/hbase/regionserver/wal/TestFSHLog.java | 91 ++++++++++++++++++++++
.../regionserver/wal/TestSyncFutureCache.java | 68 ++++++++++++++++
7 files changed, 284 insertions(+), 20 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index c017e9f..7bf20cc 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -295,11 +295,9 @@ public abstract class AbstractFSWAL<W extends WriterBase>
implements WAL {
new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
/**
- * Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse
SyncFutures.
- * Thread local is used so JVM can GC the terminated thread for us. See
HBASE-21228
- * <p>
+ * A cache of sync futures reused by threads.
*/
- private final ThreadLocal<SyncFuture> cachedSyncFutures;
+ protected final SyncFutureCache syncFutureCache;
/**
* The class name of the runtime implementation, used as prefix for
logging/tracing.
@@ -455,12 +453,7 @@ public abstract class AbstractFSWAL<W extends WriterBase>
implements WAL {
DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
this.walSyncTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS,
conf.getLong("hbase.regionserver.hlog.sync.timeout",
DEFAULT_WAL_SYNC_TIMEOUT_MS)));
- this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
- @Override
- protected SyncFuture initialValue() {
- return new SyncFuture();
- }
- };
+ this.syncFutureCache = new SyncFutureCache(conf);
this.implClassName = getClass().getSimpleName();
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY,
HRegion.DEFAULT_WAL_HSYNC);
}
@@ -753,10 +746,6 @@ public abstract class AbstractFSWAL<W extends WriterBase>
implements WAL {
}
}
} catch (TimeoutIOException tioe) {
- // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
- // still refer to it, so if this thread use it next time may get a wrong
- // result.
- this.cachedSyncFutures.remove();
throw tioe;
} catch (InterruptedException ie) {
LOG.warn("Interrupted", ie);
@@ -859,6 +848,9 @@ public abstract class AbstractFSWAL<W extends WriterBase>
implements WAL {
rollWriterLock.lock();
try {
doShutdown();
+ if (syncFutureCache != null) {
+ syncFutureCache.clear();
+ }
} finally {
rollWriterLock.unlock();
}
@@ -905,7 +897,7 @@ public abstract class AbstractFSWAL<W extends WriterBase>
implements WAL {
}
protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
- return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync);
+ return
syncFutureCache.getIfPresentOrNew().reset(sequence).setForceSync(forceSync);
}
protected boolean isLogRollRequested() {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 3c799bf..c9168e2 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -253,6 +253,14 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter>
{
DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
}
+ /**
+ * Helper that marks the future as DONE and offers it back to the cache.
+ */
+ private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable
t) {
+ future.done(txid, t);
+ syncFutureCache.offer(future);
+ }
+
private static boolean waitingRoll(int epochAndState) {
return (epochAndState & 1) != 0;
}
@@ -393,7 +401,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
SyncFuture sync = iter.next();
if (sync.getTxid() <= txid) {
- sync.done(txid, null);
+ markFutureDoneAndOffer(sync, txid, null);
iter.remove();
finished++;
if (addSyncTrace) {
@@ -415,7 +423,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
long maxSyncTxid = highestSyncedTxid.get();
for (SyncFuture sync : syncFutures) {
maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
- sync.done(maxSyncTxid, null);
+ markFutureDoneAndOffer(sync, maxSyncTxid, null);
if (addSyncTrace) {
addTimeAnnotation(sync, "writer synced");
}
@@ -751,7 +759,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
// and fail them
- syncFutures.forEach(f -> f.done(f.getTxid(), error));
+ syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error));
if (!(consumeExecutor instanceof EventLoop)) {
consumeExecutor.shutdown();
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 2aae540..9674aca 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -860,6 +860,14 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
/**
+ * @return if the safepoint has been attained.
+ */
+ @InterfaceAudience.Private
+ boolean isSafePointAttained() {
+ return this.safePointAttainedLatch.getCount() == 0;
+ }
+
+ /**
* Called by Thread B when it attains the 'safe point'. In this method,
Thread B signals Thread
* A it can proceed. Thread B will be held in here until {@link
#releaseSafePoint()} is called
* by Thread A.
@@ -945,6 +953,16 @@ public class FSHLog extends AbstractFSWAL<Writer> {
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
this.syncFutures[i].done(sequence, e);
}
+ offerDoneSyncsBackToCache();
+ }
+
+ /**
+ * Offers the finished syncs back to the cache for reuse.
+ */
+ private void offerDoneSyncsBackToCache() {
+ for (int i = 0; i < this.syncFuturesCount.get(); i++) {
+ syncFutureCache.offer(syncFutures[i]);
+ }
this.syncFuturesCount.set(0);
}
@@ -1059,7 +1077,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
? this.exception : new DamagedWALException("On sync",
this.exception));
}
attainSafePoint(sequence);
- this.syncFuturesCount.set(0);
+ // It is critical that we offer the futures back to the cache for
reuse here after the
+ // safe point is attained and all the clean up has been done. There
have been
+ // issues with reusing sync futures early causing WAL lockups, see
HBASE-25984.
+ offerDoneSyncsBackToCache();
} catch (Throwable t) {
LOG.error("UNEXPECTED!!! syncFutures.length=" +
this.syncFutures.length, t);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
index 650f68c..edba5df 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
@@ -90,7 +90,8 @@ class SyncFuture {
@Override
public synchronized String toString() {
- return "done=" + isDone() + ", txid=" + this.txid;
+ return "done=" + isDone() + ", txid=" + this.txid + " threadID=" +
t.getId() +
+ " threadName=" + t.getName();
}
synchronized long getTxid() {
@@ -107,6 +108,15 @@ class SyncFuture {
}
/**
+ * Returns the thread that owned this sync future, use with caution as we
return the reference to
+ * the actual thread object.
+ * @return the associated thread instance.
+ */
+ Thread getThread() {
+ return t;
+ }
+
+ /**
* @param txid the transaction id at which this future 'completed'.
* @param t Can be null. Set if we are 'completing' on error (and this 't'
is the error).
* @return True if we successfully marked this outstanding future as
completed/done. Returns false
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFutureCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFutureCache.java
new file mode 100644
index 0000000..9dd2e0a
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFutureCache.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+
+/**
+ * A cache of {@link SyncFuture}s. This class supports two methods
+ * {@link SyncFutureCache#getIfPresentOrNew()} and {@link
SyncFutureCache#offer(SyncFuture)}.
+ *
+ * Usage pattern:
+ * SyncFuture sf = syncFutureCache.getIfPresentOrNew();
+ * sf.reset(...);
+ * // Use the sync future
+ * finally: syncFutureCache.offer(sf);
+ *
+ * Offering the sync future back to the cache makes it eligible for reuse
within the same thread
+ * context. Cache keyed by the accessing thread instance and automatically
invalidated if it remains
+ * unused for {@link SyncFutureCache#SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS}
minutes.
+ */
[email protected]
+public final class SyncFutureCache {
+
+ private static final long SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS = 2;
+
+ private final Cache<Thread, SyncFuture> syncFutureCache;
+
+ public SyncFutureCache(final Configuration conf) {
+ final int handlerCount =
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+ syncFutureCache = CacheBuilder.newBuilder().initialCapacity(handlerCount)
+ .expireAfterWrite(SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS,
TimeUnit.MINUTES).build();
+ }
+
+ public SyncFuture getIfPresentOrNew() {
+ // Invalidate the entry if a mapping exists. We do not want it to be
reused at the same time.
+ SyncFuture future = syncFutureCache.asMap().remove(Thread.currentThread());
+ return (future == null) ? new SyncFuture() : future;
+ }
+
+ /**
+ * Offers the sync future back to the cache for reuse.
+ */
+ public void offer(SyncFuture syncFuture) {
+ // It is ok to overwrite an existing mapping.
+ syncFutureCache.asMap().put(syncFuture.getThread(), syncFuture);
+ }
+
+ public void clear() {
+ if (syncFutureCache != null) {
+ syncFutureCache.invalidateAll();
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 0376dfa..30bdea7 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -27,6 +29,7 @@ import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -34,6 +37,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -49,8 +53,10 @@ import
org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@@ -67,6 +73,8 @@ public class TestFSHLog extends AbstractTestFSWAL {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFSHLog.class);
+ private static final long TEST_TIMEOUT_MS = 10000;
+
@Rule
public TestName name = new TestName();
@@ -132,6 +140,89 @@ public class TestFSHLog extends AbstractTestFSWAL {
}
/**
+ * Test for WAL stall due to sync future overwrites. See HBASE-25984.
+ */
+ @Test
+ public void testDeadlockWithSyncOverwrites() throws Exception {
+ final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1);
+
+ class FailingWriter implements WALProvider.Writer {
+ @Override public void sync(boolean forceSync) throws IOException {
+ throw new IOException("Injected failure..");
+ }
+
+ @Override public void append(WAL.Entry entry) throws IOException {
+ }
+
+ @Override public long getLength() {
+ return 0;
+ }
+
+ @Override
+ public long getSyncedLength() {
+ return 0;
+ }
+
+ @Override public void close() throws IOException {
+ }
+ }
+
+ /*
+ * Custom FSHLog implementation with a conditional wait before attaining
safe point.
+ */
+ class CustomFSHLog extends FSHLog {
+ public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String
archiveDir,
+ Configuration conf, List<WALActionsListener>
listeners, boolean failIfWALExists,
+ String prefix, String suffix) throws IOException {
+ super(fs, rootDir, logDir, archiveDir, conf, listeners,
failIfWALExists, prefix, suffix);
+ }
+
+ @Override
+ protected void beforeWaitOnSafePoint() {
+ try {
+ assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS,
TimeUnit.MILLISECONDS));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public SyncFuture publishSyncOnRingBuffer() {
+ long sequence = getSequenceOnRingBuffer();
+ return publishSyncOnRingBuffer(sequence, false);
+ }
+ }
+
+ final String name = this.name.getMethodName();
+ try (CustomFSHLog log = new CustomFSHLog(FS,
CommonFSUtils.getRootDir(CONF), name,
+ HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
+ log.setWriter(new FailingWriter());
+ Field ringBufferEventHandlerField =
+ FSHLog.class.getDeclaredField("ringBufferEventHandler");
+ ringBufferEventHandlerField.setAccessible(true);
+ FSHLog.RingBufferEventHandler ringBufferEventHandler =
+ (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
+ // Force a safe point
+ FSHLog.SafePointZigZagLatch latch =
ringBufferEventHandler.attainSafePoint();
+ try {
+ SyncFuture future0 = log.publishSyncOnRingBuffer();
+ // Wait for the sync to be done.
+ Waiter.waitFor(CONF, TEST_TIMEOUT_MS, future0::isDone);
+ // Publish another sync from the same thread, this should not
overwrite the done sync.
+ SyncFuture future1 = log.publishSyncOnRingBuffer();
+ assertFalse(future1.isDone());
+ // Unblock the safe point trigger..
+ blockBeforeSafePoint.countDown();
+ // Wait for the safe point to be reached.
+ // With the deadlock in HBASE-25984, this is never possible, thus
blocking the sync pipeline.
+ Waiter.waitFor(CONF, TEST_TIMEOUT_MS, latch::isSafePointAttained);
+ } finally {
+ // Force release the safe point, for the clean up.
+ latch.releaseSafePoint();
+ }
+ }
+ }
+
+ /**
* Test case for https://issues.apache.org/jira/browse/HBASE-16721
*/
@Test
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
new file mode 100644
index 0000000..070aaf2
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestSyncFutureCache {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSyncFutureCache.class);
+
+ @Test
+ public void testSyncFutureCacheLifeCycle() throws Exception {
+ final Configuration conf = HBaseConfiguration.create();
+ SyncFutureCache cache = new SyncFutureCache(conf);
+ try {
+ SyncFuture future0 = cache.getIfPresentOrNew().reset(0);
+ assertNotNull(future0);
+ // Get another future from the same thread, should be different one.
+ SyncFuture future1 = cache.getIfPresentOrNew().reset(1);
+ assertNotNull(future1);
+ assertNotSame(future0, future1);
+ cache.offer(future1);
+ // Should override.
+ cache.offer(future0);
+ SyncFuture future3 = cache.getIfPresentOrNew();
+ assertEquals(future3, future0);
+ final SyncFuture[] future4 = new SyncFuture[1];
+ // From a different thread
+ CompletableFuture.runAsync(() -> future4[0] =
cache.getIfPresentOrNew().reset(4)).get();
+ assertNotNull(future4[0]);
+ assertNotSame(future3, future4[0]);
+ // Clean up
+ cache.offer(future3);
+ cache.offer(future4[0]);
+ } finally {
+ cache.clear();
+ }
+ }
+}