This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 1571f985cb2 RSMobFileCleanerChore may close the StoreFileReader object
which is being used by Compaction thread (#6464)
1571f985cb2 is described below
commit 1571f985cb269784a1b5b747d039cb5e152b7d3a
Author: Peng Lu <[email protected]>
AuthorDate: Fri Nov 15 19:02:08 2024 +0800
RSMobFileCleanerChore may close the StoreFileReader object which is being
used by Compaction thread (#6464)
Signed-off-by: Wellington Chevreuil <[email protected]>
---
.../hadoop/hbase/mob/RSMobFileCleanerChore.java | 29 +++++++--
.../hbase/mob/TestRSMobFileCleanerChore.java | 73 ++++++++++++++++++++--
2 files changed, 91 insertions(+), 11 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
index 06e34988733..c791a50bdef 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
@@ -108,6 +108,10 @@ public class RSMobFileCleanerChore extends ScheduledChore {
// Now clean obsolete files for a table
LOG.info("Cleaning obsolete MOB files from table={}",
htd.getTableName());
List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
+ if (list.isEmpty()) {
+ // The table is not MOB table, just skip it
+ continue;
+ }
List<HRegion> regions = rs.getRegions(htd.getTableName());
for (HRegion region : regions) {
for (ColumnFamilyDescriptor hcd : list) {
@@ -116,14 +120,27 @@ public class RSMobFileCleanerChore extends ScheduledChore
{
Set<String> regionMobs = new HashSet<String>();
Path currentPath = null;
try {
- // collectinng referenced MOBs
+ // collecting referenced MOBs
for (HStoreFile sf : sfs) {
currentPath = sf.getPath();
- sf.initReader();
- byte[] mobRefData =
sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
- byte[] bulkloadMarkerData =
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
- // close store file to avoid memory leaks
- sf.closeStoreFile(true);
+ byte[] mobRefData = null;
+ byte[] bulkloadMarkerData = null;
+ if (sf.getReader() == null) {
+ synchronized (sf) {
+ boolean needCreateReader = sf.getReader() == null;
+ sf.initReader();
+ mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
+ bulkloadMarkerData =
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
+ if (needCreateReader) {
+ // close store file to avoid memory leaks
+ sf.closeStoreFile(true);
+ }
+ }
+ } else {
+ mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
+ bulkloadMarkerData =
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
+ }
+
if (mobRefData == null) {
if (bulkloadMarkerData == null) {
LOG.warn(
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
index 98187631d96..d9470420e99 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
@@ -18,11 +18,14 @@
package org.apache.hadoop.hbase.mob;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -41,8 +44,12 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
@@ -124,15 +131,15 @@ public class TestRSMobFileCleanerChore {
conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive
/ 2);
}
- private void loadData(int start, int num) {
+ private void loadData(Table t, int start, int num) {
try {
for (int i = 0; i < num; i++) {
Put p = new Put(Bytes.toBytes(start + i));
p.addColumn(fam, qualifier, mobVal);
- table.put(p);
+ t.put(p);
}
- admin.flush(table.getName());
+ admin.flush(t.getName());
} catch (Exception e) {
LOG.error("MOB file cleaner chore test FAILED", e);
assertTrue(false);
@@ -148,8 +155,8 @@ public class TestRSMobFileCleanerChore {
@Test
public void testMobFileCleanerChore() throws InterruptedException,
IOException {
- loadData(0, 10);
- loadData(10, 10);
+ loadData(table, 0, 10);
+ loadData(table, 10, 10);
// loadData(20, 10);
long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(2, num);
@@ -225,6 +232,62 @@ public class TestRSMobFileCleanerChore {
assertEquals(20, scanned);
}
+ @Test
+ public void testCleaningAndStoreFileReaderCreatedByOtherThreads()
+ throws IOException, InterruptedException {
+ TableName testTable =
TableName.valueOf("testCleaningAndStoreFileReaderCreatedByOtherThreads");
+ ColumnFamilyDescriptor cfDesc =
ColumnFamilyDescriptorBuilder.newBuilder(fam)
+ .setMobEnabled(true).setMobThreshold(mobLen).setMaxVersions(1).build();
+ TableDescriptor tDesc =
+
TableDescriptorBuilder.newBuilder(testTable).setColumnFamily(cfDesc).build();
+ admin.createTable(tDesc);
+ assertTrue(admin.tableExists(testTable));
+
+ // put some data
+ loadData(admin.getConnection().getTable(testTable), 0, 10);
+
+ HRegion region = HTU.getHBaseCluster().getRegions(testTable).get(0);
+ HStore store = region.getStore(fam);
+ Collection<HStoreFile> storeFiles = store.getStorefiles();
+ assertEquals(1, store.getStorefiles().size());
+ final HStoreFile sf = storeFiles.iterator().next();
+ assertNotNull(sf);
+ long mobFileNum = getNumberOfMobFiles(conf, testTable, new String(fam));
+ assertEquals(1, mobFileNum);
+
+ ServerName serverName = null;
+ for (ServerName sn : admin.getRegionServers()) {
+ boolean flag = admin.getRegions(sn).stream().anyMatch(
+ r ->
r.getRegionNameAsString().equals(region.getRegionInfo().getRegionNameAsString()));
+ if (flag) {
+ serverName = sn;
+ break;
+ }
+ }
+ assertNotNull(serverName);
+ RSMobFileCleanerChore cleanerChore =
+
HTU.getHBaseCluster().getRegionServer(serverName).getRSMobFileCleanerChore();
+ CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
+ boolean readerIsNotNull = false;
+ try {
+ sf.initReader();
+ Thread.sleep(1000 * 10);
+ readerIsNotNull = sf.getReader() != null;
+ sf.closeStoreFile(true);
+ } catch (Exception e) {
+ LOG.error("We occur an exception", e);
+ }
+ return readerIsNotNull;
+ });
+ Thread.sleep(100);
+ // The StoreFileReader object was created by another thread
+ cleanerChore.chore();
+ Boolean readerIsNotNull = future.join();
+ assertTrue(readerIsNotNull);
+ admin.disableTable(testTable);
+ admin.deleteTable(testTable);
+ }
+
private long getNumberOfMobFiles(Configuration conf, TableName tableName,
String family)
throws IOException {
FileSystem fs = FileSystem.get(conf);