This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch hubspot-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 35ec27d079cdb0fe8882486534dd95c95748884f
Author: Xiaolin Ha <haxiao...@apache.org>
AuthorDate: Mon Mar 7 11:21:39 2022 +0800

    HubSpot Backport: HBASE-25709 Close region may stuck when region is 
compacting and skipped most cells read (#3117)
    
    Signed-off-by: Andrew Purtell <apurt...@apache.org>
---
 .../hadoop/hbase/regionserver/StoreScanner.java    |  5 ++
 .../hadoop/hbase/regionserver/TestHRegion.java     | 68 +++++++++++++++++++++
 .../hadoop/hbase/regionserver/TestHStore.java      | 71 ++++++++++++++++++++++
 3 files changed, 144 insertions(+)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index e6495eca91f..d4558778c64 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -755,6 +755,11 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
           default:
             throw new RuntimeException("UNEXPECTED");
         }
+
+        // when reaching the heartbeat cells, try to return from the loop.
+        if (kvsScanned % cellsPerHeartbeatCheck == 0) {
+          return 
scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
+        }
       } while ((cell = this.heap.peek()) != null);
 
       if (count > 0) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index bf5d7825252..5b4ce19476a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -7023,6 +7023,74 @@ public class TestHRegion {
     assertNull(r.getValue(fam1, q1));
   }
 
+  @Test
+  public void testTTLsUsingSmallHeartBeatCells() throws IOException {
+    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(edge);
+
+    final byte[] row = Bytes.toBytes("testRow");
+    final byte[] q1 = Bytes.toBytes("q1");
+    final byte[] q2 = Bytes.toBytes("q2");
+    final byte[] q3 = Bytes.toBytes("q3");
+    final byte[] q4 = Bytes.toBytes("q4");
+    final byte[] q5 = Bytes.toBytes("q5");
+    final byte[] q6 = Bytes.toBytes("q6");
+    final byte[] q7 = Bytes.toBytes("q7");
+    final byte[] q8 = Bytes.toBytes("q8");
+
+    // 10 seconds
+    int ttlSecs = 10;
+    TableDescriptor tableDescriptor =
+      
TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setColumnFamily(
+        
ColumnFamilyDescriptorBuilder.newBuilder(fam1).setTimeToLive(ttlSecs).build()).build();
+
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
+    // using small heart beat cells
+    conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2);
+
+    region = HBaseTestingUtility
+      
.createRegionAndWAL(RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(),
+        TEST_UTIL.getDataTestDir(), conf, tableDescriptor);
+    assertNotNull(region);
+    long now = EnvironmentEdgeManager.currentTime();
+    // Add a cell that will expire in 5 seconds via cell TTL
+    region.put(new Put(row).addColumn(fam1, q1, now, 
HConstants.EMPTY_BYTE_ARRAY));
+    region.put(new Put(row).addColumn(fam1, q2, now, 
HConstants.EMPTY_BYTE_ARRAY));
+    region.put(new Put(row).addColumn(fam1, q3, now, 
HConstants.EMPTY_BYTE_ARRAY));
+    // Add a cell that will expire after 10 seconds via family setting
+    region
+      .put(new Put(row).addColumn(fam1, q4, now + ttlSecs * 1000 + 1, 
HConstants.EMPTY_BYTE_ARRAY));
+    region
+      .put(new Put(row).addColumn(fam1, q5, now + ttlSecs * 1000 + 1, 
HConstants.EMPTY_BYTE_ARRAY));
+
+    region.put(new Put(row).addColumn(fam1, q6, now, 
HConstants.EMPTY_BYTE_ARRAY));
+    region.put(new Put(row).addColumn(fam1, q7, now, 
HConstants.EMPTY_BYTE_ARRAY));
+    region
+      .put(new Put(row).addColumn(fam1, q8, now + ttlSecs * 1000 + 1, 
HConstants.EMPTY_BYTE_ARRAY));
+
+    // Flush so we are sure store scanning gets this right
+    region.flush(true);
+
+    // A query at time T+0 should return all cells
+    checkScan(8);
+
+    // Increment time to T+ttlSecs seconds
+    edge.incrementTime(ttlSecs * 1000);
+    checkScan(3);
+  }
+
+  private void checkScan(int expectCellSize) throws IOException{
+    Scan s = new Scan().withStartRow(row);
+    ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
+    ScannerContext scannerContext = contextBuilder.build();
+    RegionScanner scanner = region.getScanner(s);
+    List<Cell> kvs = new ArrayList<>();
+    scanner.next(kvs, scannerContext);
+    assertEquals(expectCellSize, kvs.size());
+    scanner.close();
+  }
+
   @Test
   public void testIncrementTimestampsAreMonotonic() throws IOException {
     region = initHRegion(tableName, method, CONF, fam1);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index a8ad4edc289..0a1c95bf892 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -1275,6 +1275,77 @@ public class TestHStore {
     }
   }
 
+  @Test
+  public void testPreventLoopRead() throws Exception {
+    init(this.name.getMethodName());
+    Configuration conf = HBaseConfiguration.create();
+    // use small heart beat cells
+    conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2);
+    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(edge);
+    byte[] r0 = Bytes.toBytes("row0");
+    byte[] value0 = Bytes.toBytes("value0");
+    byte[] value1 = Bytes.toBytes("value1");
+    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
+    long ts = EnvironmentEdgeManager.currentTime();
+    long seqId = 100;
+    init(name.getMethodName(), conf, 
TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
+      
ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(10).build(),
+      new MyStoreHook() {
+        @Override public long getSmallestReadPoint(HStore store) {
+          return seqId + 3;
+        }
+      });
+    // The cells having the value0 will be expired
+    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
+    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
+    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
+    store.add(createCell(r0, qf4, ts + 10000 + 1, seqId, value1), 
memStoreSizing);
+    store.add(createCell(r0, qf5, ts, seqId, value0), memStoreSizing);
+    store.add(createCell(r0, qf6, ts + 10000 + 1, seqId, value1), 
memStoreSizing);
+
+    List<Cell> myList = new ArrayList<>();
+    Scan scan = new Scan().withStartRow(r0);
+    ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(false);
+    // test normal scan, should return all the cells
+    ScannerContext scannerContext = contextBuilder.build();
+    try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, 
null,
+      seqId + 3)) {
+      scanner.next(myList, scannerContext);
+      assertEquals(6, myList.size());
+    }
+
+    // test skip two ttl cells and return with empty results, default prevent 
loop skip is on
+    edge.incrementTime(10 * 1000);
+    scannerContext = contextBuilder.build();
+    myList.clear();
+    try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, 
null,
+      seqId + 3)) {
+      // r0
+      scanner.next(myList, scannerContext);
+      assertEquals(0, myList.size());
+    }
+
+    // should scan all non-ttl expired cells by iterative next
+    int resultCells = 0;
+    try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, 
null,
+      seqId + 3)) {
+      boolean hasMore = true;
+      while (hasMore) {
+        myList.clear();
+        hasMore = scanner.next(myList, scannerContext);
+        assertTrue(myList.size() < 6);
+        resultCells += myList.size();
+      }
+      for (Cell c : myList) {
+        byte[] actualValue = CellUtil.cloneValue(c);
+        assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" + 
Bytes
+          .toStringBinary(actualValue), Bytes.equals(actualValue, value1));
+      }
+    }
+    assertEquals(2, resultCells);
+  }
+
   @Test
   public void testCreateScannerAndSnapshotConcurrently() throws IOException, 
InterruptedException {
     Configuration conf = HBaseConfiguration.create();

Reply via email to