This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 01f74ac0749 HBASE-27519 Another case for FNFE on StoreFileScanner 
after a flush followed by a compaction (#4922)
01f74ac0749 is described below

commit 01f74ac0749a3ebd6a6ed760256e743e06eb8b20
Author: chenglei <[email protected]>
AuthorDate: Sat Dec 10 11:19:08 2022 +0800

    HBASE-27519 Another case for FNFE on StoreFileScanner after a flush 
followed by a compaction (#4922)
    
    Signed-off-by: Wellington Chevreuil <[email protected]>
---
 .../hbase/regionserver/ChangedReadersObserver.java |   8 +-
 .../apache/hadoop/hbase/regionserver/HStore.java   |  30 +++++-
 .../hadoop/hbase/regionserver/HStoreFile.java      |  25 +++++
 .../hadoop/hbase/regionserver/TestHStore.java      | 102 +++++++++++++++++++++
 4 files changed, 159 insertions(+), 6 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
index d45a8046873..9ad93395a7e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
@@ -31,7 +31,13 @@ public interface ChangedReadersObserver {
   long getReadPoint();
 
   /**
-   * Notify observers.
+   * Notify observers. <br/>
+   * NOTE:Before we invoke this method,{@link HStoreFile#increaseRefCount} is 
invoked for every
+   * {@link HStoreFile} in 'sfs' input parameter to prevent {@link HStoreFile} 
is archived after a
+   * concurrent compaction, and after this method is invoked,{@link 
HStoreFile#decreaseRefCount} is
+   * invoked.So if you open the {@link StoreFileReader} or {@link 
StoreFileScanner} asynchronously
+   * in this method,you may need to invoke {@link HStoreFile#increaseRefCount} 
or
+   * {@link HStoreFile#decreaseRefCount} by yourself to prevent the {@link 
HStoreFile}s be archived.
    * @param sfs              The new files
    * @param memStoreScanners scanner of current memstore
    * @throws IOException e
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index a41c8da1607..ec47ebf8da8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -885,15 +885,29 @@ public class HStore
     return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();
   }
 
-  private boolean completeFlush(List<HStoreFile> sfs, long snapshotId) throws 
IOException {
+  private boolean completeFlush(final List<HStoreFile> sfs, long snapshotId) 
throws IOException {
     // NOTE:we should keep clearSnapshot method inside the write lock because 
clearSnapshot may
     // close {@link DefaultMemStore#snapshot}, which may be used by
     // {@link DefaultMemStore#getScanners}.
     storeEngine.addStoreFiles(sfs,
-      snapshotId > 0 ? () -> this.memstore.clearSnapshot(snapshotId) : () -> {
+      // NOTE: here we must increase the refCount for storeFiles because we 
would open the
+      // storeFiles and get the StoreFileScanners for them in 
HStore.notifyChangedReadersObservers.
+      // If we don't increase the refCount here, 
HStore.closeAndArchiveCompactedFiles called by
+      // CompactedHFilesDischarger may archive the storeFiles after a 
concurrent compaction.Because
+      // HStore.requestCompaction is under storeEngine lock, so here we 
increase the refCount under
+      // storeEngine lock. see HBASE-27519 for more details.
+      snapshotId > 0 ? () -> {
+        this.memstore.clearSnapshot(snapshotId);
+        HStoreFile.increaseStoreFilesRefeCount(sfs);
+      } : () -> {
+        HStoreFile.increaseStoreFilesRefeCount(sfs);
       });
     // notify to be called here - only in case of flushes
-    notifyChangedReadersObservers(sfs);
+    try {
+      notifyChangedReadersObservers(sfs);
+    } finally {
+      HStoreFile.decreaseStoreFilesRefeCount(sfs);
+    }
     if (LOG.isTraceEnabled()) {
       long totalSize = getTotalSize(sfs);
       String traceMessage = "FLUSH time,count,size,store size,store files ["
@@ -961,7 +975,13 @@ public class HStore
       storeFilesToScan = 
this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
         includeStartRow, stopRow, includeStopRow);
       memStoreScanners = this.memstore.getScanners(readPt);
-      storeFilesToScan.stream().forEach(f -> 
f.getFileInfo().refCount.incrementAndGet());
+      // NOTE: here we must increase the refCount for storeFiles because we 
would open the
+      // storeFiles and get the StoreFileScanners for them.If we don't 
increase the refCount here,
+      // HStore.closeAndArchiveCompactedFiles called by 
CompactedHFilesDischarger may archive the
+      // storeFiles after a concurrent compaction.Because 
HStore.requestCompaction is under
+      // storeEngine lock, so here we increase the refCount under storeEngine 
lock. see HBASE-27484
+      // for more details.
+      HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);
     } finally {
       this.storeEngine.readUnlock();
     }
@@ -982,7 +1002,7 @@ public class HStore
       clearAndClose(memStoreScanners);
       throw t instanceof IOException ? (IOException) t : new IOException(t);
     } finally {
-      storeFilesToScan.stream().forEach(f -> 
f.getFileInfo().refCount.decrementAndGet());
+      HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);
     }
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
index 2b1acb86400..58d97a8743d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
@@ -48,6 +49,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 
 /**
@@ -648,4 +651,26 @@ public class HStoreFile implements StoreFile {
   Set<String> getCompactedStoreFiles() {
     return Collections.unmodifiableSet(this.compactedStoreFiles);
   }
+
+  long increaseRefCount() {
+    return this.fileInfo.refCount.incrementAndGet();
+  }
+
+  long decreaseRefCount() {
+    return this.fileInfo.refCount.decrementAndGet();
+  }
+
+  static void increaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
+    if (CollectionUtils.isEmpty(storeFiles)) {
+      return;
+    }
+    storeFiles.forEach(HStoreFile::increaseRefCount);
+  }
+
+  static void decreaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
+    if (CollectionUtils.isEmpty(storeFiles)) {
+      return;
+    }
+    storeFiles.forEach(HStoreFile::decreaseRefCount);
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index a052520e54d..86187172569 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.ref.SoftReference;
 import java.security.PrivilegedExceptionAction;
@@ -44,6 +45,7 @@ import java.util.ListIterator;
 import java.util.NavigableSet;
 import java.util.Optional;
 import java.util.TreeSet;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -1531,6 +1533,106 @@ public class TestHStore {
     }
   }
 
+  /**
+   * This test is for HBASE-27519, when the {@link StoreScanner} is 
scanning,the Flush and the
+   * Compaction execute concurrently and theCcompaction compact and archive 
the flushed
+   * {@link HStoreFile} which is used by {@link 
StoreScanner#updateReaders}.Before
+   * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link 
FileNotFoundException}.
+   */
+  @Test
+  public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() 
throws IOException {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean(WALFactory.WAL_ENABLED, false);
+    conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, 
EverythingPolicy.class.getName());
+    byte[] r0 = Bytes.toBytes("row0");
+    byte[] r1 = Bytes.toBytes("row1");
+    final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
+    final AtomicBoolean shouldWaitRef = new AtomicBoolean(false);
+    // Initialize region
+    final MyStore myStore = initMyStore(name.getMethodName(), conf, new 
MyStoreHook() {
+      @Override
+      public void getScanners(MyStore store) throws IOException {
+        try {
+          // Here this method is called by StoreScanner.updateReaders which is 
invoked by the
+          // following TestHStore.flushStore
+          if (shouldWaitRef.get()) {
+            // wait the following compaction Task start
+            cyclicBarrier.await();
+            // wait the following HStore.closeAndArchiveCompactedFiles end.
+            cyclicBarrier.await();
+          }
+        } catch (BrokenBarrierException | InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    });
+
+    final AtomicReference<Throwable> compactionExceptionRef = new 
AtomicReference<Throwable>(null);
+    Runnable compactionTask = () -> {
+      try {
+        // Only when the StoreScanner.updateReaders invoked by 
TestHStore.flushStore prepares for
+        // entering the MyStore.getScanners, compactionTask could start.
+        cyclicBarrier.await();
+        region.compactStore(family, new NoLimitThroughputController());
+        myStore.closeAndArchiveCompactedFiles();
+        // Notify StoreScanner.updateReaders could enter MyStore.getScanners.
+        cyclicBarrier.await();
+      } catch (Throwable e) {
+        compactionExceptionRef.set(e);
+      }
+    };
+
+    long ts = EnvironmentEdgeManager.currentTime();
+    long seqId = 100;
+    byte[] value = Bytes.toBytes("value");
+    // older data whihc shouldn't be "seen" by client
+    myStore.add(createCell(r0, qf1, ts, seqId, value), null);
+    flushStore(myStore, id++);
+    myStore.add(createCell(r0, qf2, ts, seqId, value), null);
+    flushStore(myStore, id++);
+    myStore.add(createCell(r0, qf3, ts, seqId, value), null);
+    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    quals.add(qf1);
+    quals.add(qf2);
+    quals.add(qf3);
+
+    myStore.add(createCell(r1, qf1, ts, seqId, value), null);
+    myStore.add(createCell(r1, qf2, ts, seqId, value), null);
+    myStore.add(createCell(r1, qf3, ts, seqId, value), null);
+
+    Thread.currentThread()
+      .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently 
thread");
+    Scan scan = new Scan();
+    scan.withStartRow(r0, true);
+    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, 
quals, seqId)) {
+      List<Cell> results = new MyList<>(size -> {
+        switch (size) {
+          case 1:
+            shouldWaitRef.set(true);
+            Thread thread = new Thread(compactionTask);
+            thread.setName("MyCompacting Thread.");
+            thread.start();
+            try {
+              flushStore(myStore, id++);
+              thread.join();
+            } catch (IOException | InterruptedException e) {
+              throw new RuntimeException(e);
+            }
+            shouldWaitRef.set(false);
+            break;
+          default:
+            break;
+        }
+      });
+      // Before HBASE-27519, here would throw java.io.FileNotFoundException 
because the storeFile
+      // which used by StoreScanner.updateReaders is deleted by compactionTask.
+      scanner.next(results);
+      // The results is r0 row cells.
+      assertEquals(3, results.size());
+      assertTrue(compactionExceptionRef.get() == null);
+    }
+  }
+
   @Test
   public void testReclaimChunkWhenScaning() throws IOException {
     init("testReclaimChunkWhenScaning");

Reply via email to