Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 9b1f379f2 -> a4116b243


HBASE-19468 FNFE during scans and flushes (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a4116b24
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a4116b24
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a4116b24

Branch: refs/heads/branch-1.3
Commit: a4116b24347387909a0c717829d5cb616e6e1aef
Parents: 9b1f379
Author: ramkrish86 <ramkrishna.s.vasude...@gmail.com>
Authored: Wed Dec 20 17:02:56 2017 +0530
Committer: Francis Liu <tof...@apache.org>
Committed: Wed Feb 14 19:01:43 2018 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/StoreScanner.java | 29 +++++++++---
 .../TestCompactedHFilesDischarger.java          | 46 +++++++++++++++++++-
 2 files changed, 68 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a4116b24/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index d42852a..c95151b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.concurrent.CountDownLatch;
@@ -129,8 +130,10 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
   private boolean scanUsePread = false;
   // Indicates whether there was flush during the course of the scan
   private volatile boolean flushed = false;
+
   // generally we get one file from a flush
-  private final List<StoreFile> flushedStoreFiles = new 
ArrayList<StoreFile>(1);
+  private final List<KeyValueScanner> flushedstoreFileScanners =
+      new ArrayList<KeyValueScanner>(1);
   // generally we get one memstroe scanner from a flush
   private final List<KeyValueScanner> memStoreScannersAfterFlush = new 
ArrayList<>(1);
   // The current list of scanners
@@ -444,6 +447,10 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
     this.closing = true;
     clearAndClose(scannersForDelayedClose);
     clearAndClose(memStoreScannersAfterFlush);
+    // clear them at any case. In case scanner.next() was never called
+    // and there were some lease expiry we need to close all the scanners
+    // on the flushed files which are open
+    clearAndClose(flushedstoreFileScanners);
     // Under test, we dont have a this.store
     if (this.store != null)
       this.store.deleteChangedReaderObserver(this);
@@ -803,7 +810,17 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
     flushLock.lock();
     try {
       flushed = true;
-      flushedStoreFiles.addAll(sfs);
+      final boolean isCompaction = false;
+      boolean usePread = get || scanUsePread;
+      // SEE HBASE-19468 where the flushed files are getting compacted even 
before a scanner
+      // calls next(). So its better we create scanners here rather than 
next() call. Ensure
+      // these scanners are properly closed() whether or not the scan is 
completed successfully
+      // Eagerly creating scanners so that we have the ref counting ticking on 
the newly created
+      // store files. In case of stream scanners this eager creation does not 
induce performance
+      // penalty because in scans (that uses stream scanners) the next() call 
is bound to happen.   
+      List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, 
get, usePread,
+        isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), 
this.readPt, false);
+      flushedstoreFileScanners.addAll(scanners);
       if (!CollectionUtils.isEmpty(memStoreScanners)) {
         clearAndClose(memStoreScannersAfterFlush);
         memStoreScannersAfterFlush.addAll(memStoreScanners);
@@ -871,13 +888,13 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
     List<KeyValueScanner> scanners = null;
     flushLock.lock();
     try {
-      List<KeyValueScanner> allScanners = new 
ArrayList<>(flushedStoreFiles.size() + memStoreScannersAfterFlush.size());
-      allScanners.addAll(store.getScanners(flushedStoreFiles, cacheBlocks, 
get, usePread,
-        isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), 
this.readPt, false));
+      List<KeyValueScanner> allScanners =
+          new ArrayList<>(flushedstoreFileScanners.size() + 
memStoreScannersAfterFlush.size());
+      allScanners.addAll(flushedstoreFileScanners);
       allScanners.addAll(memStoreScannersAfterFlush);
       scanners = selectScannersFrom(allScanners);
       // Clear the current set of flushed store files so that they don't get 
added again
-      flushedStoreFiles.clear();
+      flushedstoreFileScanners.clear();
       memStoreScannersAfterFlush.clear();
     } finally {
       flushLock.unlock();

http://git-wip-us.apache.org/repos/asf/hbase/blob/a4116b24/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
index c23e794..816c357 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -336,6 +337,49 @@ public class TestCompactedHFilesDischarger {
     assertTrue(compactedfiles.size() == 0);
   }
 
+  @Test
+  public void testStoreFileMissing() throws Exception {
+    // Write 3 records and create 3 store files.
+    write("row1");
+    region.flush(true);
+    write("row2");
+    region.flush(true);
+    write("row3");
+    region.flush(true);
+
+    Scan scan = new Scan();
+    scan.setCaching(1);
+    RegionScanner scanner = region.getScanner(scan);
+    List<Cell> res = new ArrayList<Cell>();
+    // Read first item
+    scanner.next(res);
+    assertEquals("row1", Bytes.toString(CellUtil.cloneRow(res.get(0))));
+    res.clear();
+    // Create a new file in between scan nexts
+    write("row4");
+    region.flush(true);
+
+    // Compact the table
+    region.compact(true);
+
+    // Create the cleaner object
+    CompactedHFilesDischarger cleaner =
+        new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
+    cleaner.chore();
+    // This issues scan next
+    scanner.next(res);
+    assertEquals("row2", Bytes.toString(CellUtil.cloneRow(res.get(0))));
+
+    scanner.close();
+  }
+
+  private void write(String row1) throws IOException {
+    byte[] row = Bytes.toBytes(row1);
+    Put put = new Put(row);
+    put.addColumn(fam, qual1, row);
+    region.put(put);
+  }
+
   protected void countDown() {
     // count down 3 times
     latch.countDown();
@@ -369,7 +413,7 @@ public class TestCompactedHFilesDischarger {
       try {
         initiateScan(region);
       } catch (IOException e) {
-        // do nothing
+        e.printStackTrace();
       }
     }
 

Reply via email to