This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 1571f985cb2 RSMobFileCleanerChore may close the StoreFileReader object 
which is being used by Compaction thread (#6464)
1571f985cb2 is described below

commit 1571f985cb269784a1b5b747d039cb5e152b7d3a
Author: Peng Lu <[email protected]>
AuthorDate: Fri Nov 15 19:02:08 2024 +0800

    RSMobFileCleanerChore may close the StoreFileReader object which is being 
used by Compaction thread (#6464)
    
    Signed-off-by: Wellington Chevreuil <[email protected]>
---
 .../hadoop/hbase/mob/RSMobFileCleanerChore.java    | 29 +++++++--
 .../hbase/mob/TestRSMobFileCleanerChore.java       | 73 ++++++++++++++++++++--
 2 files changed, 91 insertions(+), 11 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
index 06e34988733..c791a50bdef 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
@@ -108,6 +108,10 @@ public class RSMobFileCleanerChore extends ScheduledChore {
         // Now clean obsolete files for a table
         LOG.info("Cleaning obsolete MOB files from table={}", 
htd.getTableName());
         List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
+        if (list.isEmpty()) {
+          // The table is not MOB table, just skip it
+          continue;
+        }
         List<HRegion> regions = rs.getRegions(htd.getTableName());
         for (HRegion region : regions) {
           for (ColumnFamilyDescriptor hcd : list) {
@@ -116,14 +120,27 @@ public class RSMobFileCleanerChore extends ScheduledChore 
{
             Set<String> regionMobs = new HashSet<String>();
             Path currentPath = null;
             try {
-              // collectinng referenced MOBs
+              // collecting referenced MOBs
               for (HStoreFile sf : sfs) {
                 currentPath = sf.getPath();
-                sf.initReader();
-                byte[] mobRefData = 
sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
-                byte[] bulkloadMarkerData = 
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
-                // close store file to avoid memory leaks
-                sf.closeStoreFile(true);
+                byte[] mobRefData = null;
+                byte[] bulkloadMarkerData = null;
+                if (sf.getReader() == null) {
+                  synchronized (sf) {
+                    boolean needCreateReader = sf.getReader() == null;
+                    sf.initReader();
+                    mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
+                    bulkloadMarkerData = 
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
+                    if (needCreateReader) {
+                      // close store file to avoid memory leaks
+                      sf.closeStoreFile(true);
+                    }
+                  }
+                } else {
+                  mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
+                  bulkloadMarkerData = 
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
+                }
+
                 if (mobRefData == null) {
                   if (bulkloadMarkerData == null) {
                     LOG.warn(
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
index 98187631d96..d9470420e99 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
@@ -18,11 +18,14 @@
 package org.apache.hadoop.hbase.mob;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -41,8 +44,12 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.After;
@@ -124,15 +131,15 @@ public class TestRSMobFileCleanerChore {
     conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive 
/ 2);
   }
 
-  private void loadData(int start, int num) {
+  private void loadData(Table t, int start, int num) {
     try {
 
       for (int i = 0; i < num; i++) {
         Put p = new Put(Bytes.toBytes(start + i));
         p.addColumn(fam, qualifier, mobVal);
-        table.put(p);
+        t.put(p);
       }
-      admin.flush(table.getName());
+      admin.flush(t.getName());
     } catch (Exception e) {
       LOG.error("MOB file cleaner chore test FAILED", e);
       assertTrue(false);
@@ -148,8 +155,8 @@ public class TestRSMobFileCleanerChore {
 
   @Test
   public void testMobFileCleanerChore() throws InterruptedException, 
IOException {
-    loadData(0, 10);
-    loadData(10, 10);
+    loadData(table, 0, 10);
+    loadData(table, 10, 10);
     // loadData(20, 10);
     long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
     assertEquals(2, num);
@@ -225,6 +232,62 @@ public class TestRSMobFileCleanerChore {
     assertEquals(20, scanned);
   }
 
+  @Test
+  public void testCleaningAndStoreFileReaderCreatedByOtherThreads()
+    throws IOException, InterruptedException {
+    TableName testTable = 
TableName.valueOf("testCleaningAndStoreFileReaderCreatedByOtherThreads");
+    ColumnFamilyDescriptor cfDesc = 
ColumnFamilyDescriptorBuilder.newBuilder(fam)
+      .setMobEnabled(true).setMobThreshold(mobLen).setMaxVersions(1).build();
+    TableDescriptor tDesc =
+      
TableDescriptorBuilder.newBuilder(testTable).setColumnFamily(cfDesc).build();
+    admin.createTable(tDesc);
+    assertTrue(admin.tableExists(testTable));
+
+    // put some data
+    loadData(admin.getConnection().getTable(testTable), 0, 10);
+
+    HRegion region = HTU.getHBaseCluster().getRegions(testTable).get(0);
+    HStore store = region.getStore(fam);
+    Collection<HStoreFile> storeFiles = store.getStorefiles();
+    assertEquals(1, store.getStorefiles().size());
+    final HStoreFile sf = storeFiles.iterator().next();
+    assertNotNull(sf);
+    long mobFileNum = getNumberOfMobFiles(conf, testTable, new String(fam));
+    assertEquals(1, mobFileNum);
+
+    ServerName serverName = null;
+    for (ServerName sn : admin.getRegionServers()) {
+      boolean flag = admin.getRegions(sn).stream().anyMatch(
+        r -> 
r.getRegionNameAsString().equals(region.getRegionInfo().getRegionNameAsString()));
+      if (flag) {
+        serverName = sn;
+        break;
+      }
+    }
+    assertNotNull(serverName);
+    RSMobFileCleanerChore cleanerChore =
+      
HTU.getHBaseCluster().getRegionServer(serverName).getRSMobFileCleanerChore();
+    CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
+      boolean readerIsNotNull = false;
+      try {
+        sf.initReader();
+        Thread.sleep(1000 * 10);
+        readerIsNotNull = sf.getReader() != null;
+        sf.closeStoreFile(true);
+      } catch (Exception e) {
+        LOG.error("We occur an exception", e);
+      }
+      return readerIsNotNull;
+    });
+    Thread.sleep(100);
+    // The StoreFileReader object was created by another thread
+    cleanerChore.chore();
+    Boolean readerIsNotNull = future.join();
+    assertTrue(readerIsNotNull);
+    admin.disableTable(testTable);
+    admin.deleteTable(testTable);
+  }
+
   private long getNumberOfMobFiles(Configuration conf, TableName tableName, 
String family)
     throws IOException {
     FileSystem fs = FileSystem.get(conf);

Reply via email to