This is an automated email from the ASF dual-hosted git repository.
niuyulin pushed a commit to branch HBASE-25714
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-25714 by this push:
new 85f0291 HBASE-26043 CompactionServer support compact TTL data (#3494)
85f0291 is described below
commit 85f02919da5498399ab727b6273e7b969dde9ad9
Author: niuyulin <[email protected]>
AuthorDate: Mon Jul 26 17:14:38 2021 +0800
HBASE-26043 CompactionServer support compact TTL data (#3494)
Signed-off-by: Duo Zhang <[email protected]>
---
.../compactionserver/CompactionThreadManager.java | 6 ++-
.../apache/hadoop/hbase/regionserver/HStore.java | 8 ++--
.../compactionserver/TestCompactionServer.java | 46 ++++++++++++++++++++++
3 files changed, 55 insertions(+), 5 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
index bd78cc4..2eb697d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
@@ -99,7 +99,8 @@ public class CompactionThreadManager implements
ThroughputControllerService {
this.server = server;
try {
this.rootDir = CommonFSUtils.getRootDir(this.conf);
- this.tableDescriptors = new FSTableDescriptors(conf);
+ this.tableDescriptors = new
FSTableDescriptors(CommonFSUtils.getCurrentFileSystem(conf),
+ CommonFSUtils.getRootDir(conf), true, false);
int largeThreads =
Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS,
LARGE_COMPACTION_THREADS_DEFAULT));
int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS,
SMALL_COMPACTION_THREADS_DEFAULT);
@@ -213,7 +214,8 @@ public class CompactionThreadManager implements
ThroughputControllerService {
tableDescriptors.get(regionInfo.getTable());
HStore store = getStore(conf, server.getFileSystem(), rootDir,
tableDescriptors.get(regionInfo.getTable()), regionInfo,
cfd.getNameAsString());
-
+ // handle TTL case
+ store.removeUnneededFiles(false);
// CompactedHFilesDischarger only run on regionserver, so compactionserver
does not have
// opportunity to clean compacted file at that time, we clean compacted
files here
compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 5cd005a..0569ad3 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1928,7 +1928,7 @@ public class HStore implements Store, HeapSize,
StoreConfigInformation,
return Optional.empty();
}
// Before we do compaction, try to get rid of unneeded files to simplify
things.
- removeUnneededFiles();
+ removeUnneededFiles(true);
if (region.getRegionServerServices() != null
&& region.getRegionServerServices().isCompactionOffloadEnabled()
@@ -2063,7 +2063,7 @@ public class HStore implements Store, HeapSize,
StoreConfigInformation,
Collections.sort(filesCompacting,
storeEngine.getStoreFileManager().getStoreFileComparator());
}
- private void removeUnneededFiles() throws IOException {
+ public void removeUnneededFiles(boolean writeWalRecord) throws IOException {
if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) {
return;
}
@@ -2092,7 +2092,9 @@ public class HStore implements Store, HeapSize,
StoreConfigInformation,
}
Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files.
- writeCompactionWalRecord(delSfs, newFiles);
+ if (writeWalRecord) {
+ writeCompactionWalRecord(delSfs, newFiles);
+ }
replaceStoreFiles(delSfs, newFiles);
refreshStoreSizeAndTotalBytes();
LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired)
file(s) in "
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
index 6d0cccc..18b07d9 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
@@ -304,4 +304,50 @@ public class TestCompactionServer {
TEST_UTIL.compact(TABLENAME, false);
TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
}
+
+ @Test
+ public void testCompactionWithTTL() throws Exception {
+ TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
+ ColumnFamilyDescriptor cfd =
+
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).setTimeToLive(10).build();
+ TableDescriptor modifiedTableDescriptor =
TableDescriptorBuilder.newBuilder(TABLENAME)
+ .setColumnFamily(cfd).setCompactionOffloadEnabled(true).build();
+ TEST_UTIL.getAdmin().modifyTable(modifiedTableDescriptor);
+ TEST_UTIL.waitTableAvailable(TABLENAME);
+ // generate hfile all kv are expired
+ doPutRecord(1, 500, true);
+ int kVCount = 0;
+ for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
+ for (HStoreFile hStoreFile :
region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) {
+ kVCount +=
hStoreFile.getReader().getHFileReader().getTrailer().getEntryCount();
+ }
+ }
+ assertEquals(500, kVCount);
+ // generate hfile mixed with expired and valid KV
+ doPutRecord(1, 500, false);
+ Thread.sleep(10000);
+ doPutRecord(501, 1000, true);
+
+ TEST_UTIL.getAdmin().compactionSwitch(true, new ArrayList<>());
+ TEST_UTIL.compact(TABLENAME, true);
+ TEST_UTIL.waitFor(60000, () -> {
+ int hFileCount = 0;
+ for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME))
{
+ hFileCount +=
region.getStore(Bytes.toBytes(FAMILY)).getStorefilesCount();
+ }
+ return hFileCount == 1;
+ });
+ kVCount = 0;
+ for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
+ for (HStoreFile hStoreFile :
region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) {
+ kVCount +=
hStoreFile.getReader().getHFileReader().getTrailer().getEntryCount();
+ }
+ }
+ // To ensure do compaction on compaction server
+ TEST_UTIL.waitFor(60000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
+
+ assertEquals(500, kVCount);
+ verifyRecord(1,500, false);
+ verifyRecord(501,1000, true);
+ }
}