This is an automated email from the ASF dual-hosted git repository.
chenglei pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 01f74ac0749 HBASE-27519 Another case for FNFE on StoreFileScanner
after a flush followed by a compaction (#4922)
01f74ac0749 is described below
commit 01f74ac0749a3ebd6a6ed760256e743e06eb8b20
Author: chenglei <[email protected]>
AuthorDate: Sat Dec 10 11:19:08 2022 +0800
HBASE-27519 Another case for FNFE on StoreFileScanner after a flush
followed by a compaction (#4922)
Signed-off-by: Wellington Chevreuil <[email protected]>
---
.../hbase/regionserver/ChangedReadersObserver.java | 8 +-
.../apache/hadoop/hbase/regionserver/HStore.java | 30 +++++-
.../hadoop/hbase/regionserver/HStoreFile.java | 25 +++++
.../hadoop/hbase/regionserver/TestHStore.java | 102 +++++++++++++++++++++
4 files changed, 159 insertions(+), 6 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
index d45a8046873..9ad93395a7e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
@@ -31,7 +31,13 @@ public interface ChangedReadersObserver {
long getReadPoint();
/**
- * Notify observers.
+ * Notify observers. <br/>
+ * NOTE:Before we invoke this method,{@link HStoreFile#increaseRefCount} is
invoked for every
+ * {@link HStoreFile} in 'sfs' input parameter to prevent {@link HStoreFile}
is archived after a
+ * concurrent compaction, and after this method is invoked,{@link
HStoreFile#decreaseRefCount} is
+ * invoked.So if you open the {@link StoreFileReader} or {@link
StoreFileScanner} asynchronously
+ * in this method,you may need to invoke {@link HStoreFile#increaseRefCount}
or
+ * {@link HStoreFile#decreaseRefCount} by yourself to prevent the {@link
HStoreFile}s be archived.
* @param sfs The new files
* @param memStoreScanners scanner of current memstore
* @throws IOException e
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index a41c8da1607..ec47ebf8da8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -885,15 +885,29 @@ public class HStore
return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();
}
- private boolean completeFlush(List<HStoreFile> sfs, long snapshotId) throws
IOException {
+ private boolean completeFlush(final List<HStoreFile> sfs, long snapshotId)
throws IOException {
// NOTE:we should keep clearSnapshot method inside the write lock because
clearSnapshot may
// close {@link DefaultMemStore#snapshot}, which may be used by
// {@link DefaultMemStore#getScanners}.
storeEngine.addStoreFiles(sfs,
- snapshotId > 0 ? () -> this.memstore.clearSnapshot(snapshotId) : () -> {
+ // NOTE: here we must increase the refCount for storeFiles because we
would open the
+ // storeFiles and get the StoreFileScanners for them in
HStore.notifyChangedReadersObservers.
+ // If we don't increase the refCount here,
HStore.closeAndArchiveCompactedFiles called by
+ // CompactedHFilesDischarger may archive the storeFiles after a
concurrent compaction.Because
+ // HStore.requestCompaction is under storeEngine lock, so here we
increase the refCount under
+ // storeEngine lock. see HBASE-27519 for more details.
+ snapshotId > 0 ? () -> {
+ this.memstore.clearSnapshot(snapshotId);
+ HStoreFile.increaseStoreFilesRefeCount(sfs);
+ } : () -> {
+ HStoreFile.increaseStoreFilesRefeCount(sfs);
});
// notify to be called here - only in case of flushes
- notifyChangedReadersObservers(sfs);
+ try {
+ notifyChangedReadersObservers(sfs);
+ } finally {
+ HStoreFile.decreaseStoreFilesRefeCount(sfs);
+ }
if (LOG.isTraceEnabled()) {
long totalSize = getTotalSize(sfs);
String traceMessage = "FLUSH time,count,size,store size,store files ["
@@ -961,7 +975,13 @@ public class HStore
storeFilesToScan =
this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
includeStartRow, stopRow, includeStopRow);
memStoreScanners = this.memstore.getScanners(readPt);
- storeFilesToScan.stream().forEach(f ->
f.getFileInfo().refCount.incrementAndGet());
+ // NOTE: here we must increase the refCount for storeFiles because we
would open the
+ // storeFiles and get the StoreFileScanners for them.If we don't
increase the refCount here,
+ // HStore.closeAndArchiveCompactedFiles called by
CompactedHFilesDischarger may archive the
+ // storeFiles after a concurrent compaction.Because
HStore.requestCompaction is under
+ // storeEngine lock, so here we increase the refCount under storeEngine
lock. see HBASE-27484
+ // for more details.
+ HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);
} finally {
this.storeEngine.readUnlock();
}
@@ -982,7 +1002,7 @@ public class HStore
clearAndClose(memStoreScanners);
throw t instanceof IOException ? (IOException) t : new IOException(t);
} finally {
- storeFilesToScan.stream().forEach(f ->
f.getFileInfo().refCount.decrementAndGet());
+ HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
index 2b1acb86400..58d97a8743d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
@@ -48,6 +49,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import
org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
@@ -648,4 +651,26 @@ public class HStoreFile implements StoreFile {
Set<String> getCompactedStoreFiles() {
return Collections.unmodifiableSet(this.compactedStoreFiles);
}
+
+ long increaseRefCount() {
+ return this.fileInfo.refCount.incrementAndGet();
+ }
+
+ long decreaseRefCount() {
+ return this.fileInfo.refCount.decrementAndGet();
+ }
+
+ static void increaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
+ if (CollectionUtils.isEmpty(storeFiles)) {
+ return;
+ }
+ storeFiles.forEach(HStoreFile::increaseRefCount);
+ }
+
+ static void decreaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
+ if (CollectionUtils.isEmpty(storeFiles)) {
+ return;
+ }
+ storeFiles.forEach(HStoreFile::decreaseRefCount);
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index a052520e54d..86187172569 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.security.PrivilegedExceptionAction;
@@ -44,6 +45,7 @@ import java.util.ListIterator;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeSet;
+import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@@ -1531,6 +1533,106 @@ public class TestHStore {
}
}
+ /**
+ * This test is for HBASE-27519, when the {@link StoreScanner} is
scanning,the Flush and the
+ * Compaction execute concurrently and theCcompaction compact and archive
the flushed
+ * {@link HStoreFile} which is used by {@link
StoreScanner#updateReaders}.Before
+ * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link
FileNotFoundException}.
+ */
+ @Test
+ public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently()
throws IOException {
+ Configuration conf = HBaseConfiguration.create();
+ conf.setBoolean(WALFactory.WAL_ENABLED, false);
+ conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
EverythingPolicy.class.getName());
+ byte[] r0 = Bytes.toBytes("row0");
+ byte[] r1 = Bytes.toBytes("row1");
+ final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
+ final AtomicBoolean shouldWaitRef = new AtomicBoolean(false);
+ // Initialize region
+ final MyStore myStore = initMyStore(name.getMethodName(), conf, new
MyStoreHook() {
+ @Override
+ public void getScanners(MyStore store) throws IOException {
+ try {
+ // Here this method is called by StoreScanner.updateReaders which is
invoked by the
+ // following TestHStore.flushStore
+ if (shouldWaitRef.get()) {
+ // wait the following compaction Task start
+ cyclicBarrier.await();
+ // wait the following HStore.closeAndArchiveCompactedFiles end.
+ cyclicBarrier.await();
+ }
+ } catch (BrokenBarrierException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ final AtomicReference<Throwable> compactionExceptionRef = new
AtomicReference<Throwable>(null);
+ Runnable compactionTask = () -> {
+ try {
+ // Only when the StoreScanner.updateReaders invoked by
TestHStore.flushStore prepares for
+ // entering the MyStore.getScanners, compactionTask could start.
+ cyclicBarrier.await();
+ region.compactStore(family, new NoLimitThroughputController());
+ myStore.closeAndArchiveCompactedFiles();
+ // Notify StoreScanner.updateReaders could enter MyStore.getScanners.
+ cyclicBarrier.await();
+ } catch (Throwable e) {
+ compactionExceptionRef.set(e);
+ }
+ };
+
+ long ts = EnvironmentEdgeManager.currentTime();
+ long seqId = 100;
+ byte[] value = Bytes.toBytes("value");
+ // older data whihc shouldn't be "seen" by client
+ myStore.add(createCell(r0, qf1, ts, seqId, value), null);
+ flushStore(myStore, id++);
+ myStore.add(createCell(r0, qf2, ts, seqId, value), null);
+ flushStore(myStore, id++);
+ myStore.add(createCell(r0, qf3, ts, seqId, value), null);
+ TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ quals.add(qf1);
+ quals.add(qf2);
+ quals.add(qf3);
+
+ myStore.add(createCell(r1, qf1, ts, seqId, value), null);
+ myStore.add(createCell(r1, qf2, ts, seqId, value), null);
+ myStore.add(createCell(r1, qf3, ts, seqId, value), null);
+
+ Thread.currentThread()
+ .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently
thread");
+ Scan scan = new Scan();
+ scan.withStartRow(r0, true);
+ try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan,
quals, seqId)) {
+ List<Cell> results = new MyList<>(size -> {
+ switch (size) {
+ case 1:
+ shouldWaitRef.set(true);
+ Thread thread = new Thread(compactionTask);
+ thread.setName("MyCompacting Thread.");
+ thread.start();
+ try {
+ flushStore(myStore, id++);
+ thread.join();
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ shouldWaitRef.set(false);
+ break;
+ default:
+ break;
+ }
+ });
+ // Before HBASE-27519, here would throw java.io.FileNotFoundException
because the storeFile
+ // which used by StoreScanner.updateReaders is deleted by compactionTask.
+ scanner.next(results);
+ // The results is r0 row cells.
+ assertEquals(3, results.size());
+ assertTrue(compactionExceptionRef.get() == null);
+ }
+ }
+
@Test
public void testReclaimChunkWhenScaning() throws IOException {
init("testReclaimChunkWhenScaning");