This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 167892c  HBASE-23680 RegionProcedureStore missing cleaning of hfile 
archive (#1022)
167892c is described below

commit 167892ce64143f7d3058ab583b02e1143e8e5217
Author: Duo Zhang <[email protected]>
AuthorDate: Sat Jan 18 20:30:47 2020 +0800

    HBASE-23680 RegionProcedureStore missing cleaning of hfile archive (#1022)
    
    Signed-off-by: stack <[email protected]>
---
 hbase-common/src/main/resources/hbase-default.xml  |  13 ++-
 .../store/ProcedureStorePerformanceEvaluation.java |   4 +
 .../org/apache/hadoop/hbase/master/HMaster.java    |   8 +-
 .../hadoop/hbase/master/cleaner/HFileCleaner.java  |  37 ++++++--
 .../store/region/RegionFlusherAndCompactor.java    |   5 +-
 .../store/region/RegionProcedureStore.java         |  38 ++++++--
 .../RegionProcedureStorePerformanceEvaluation.java |  23 ++++-
 .../store/region/RegionProcedureStoreTestBase.java |  18 +++-
 .../region/RegionProcedureStoreTestHelper.java     |   9 +-
 .../region/TestRegionProcedureStoreCompaction.java | 102 +++++++++++++++++++++
 .../region/TestRegionProcedureStoreMigration.java  |  17 +++-
 .../region/TestRegionProcedureStoreWALCleaner.java |   3 +-
 12 files changed, 243 insertions(+), 34 deletions(-)

diff --git a/hbase-common/src/main/resources/hbase-default.xml 
b/hbase-common/src/main/resources/hbase-default.xml
index 557608f..5b7d29c 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -151,7 +151,18 @@ possible configurations would overwhelm and obscure the 
important.
     so put the cleaner that prunes the most files in front. To
     implement your own BaseHFileCleanerDelegate, just put it in HBase's 
classpath
     and add the fully qualified class name here. Always add the above
-    default log cleaners in the list as they will be overwritten in
+    default hfile cleaners in the list as they will be overwritten in
+    hbase-site.xml.</description>
+  </property>
+  <property>
+    <name>hbase.procedure.store.region.hfilecleaner.plugins</name>
+    
<value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner</value>
+    <description>A comma-separated list of BaseHFileCleanerDelegate invoked by
+    the RegionProcedureStore HFileCleaner service. These HFiles cleaners are
+    called in order, so put the cleaner that prunes the most files in front. To
+    implement your own BaseHFileCleanerDelegate, just put it in HBase's 
classpath
+    and add the fully qualified class name here. Always add the above
+    default hfile cleaners in the list as they will be overwritten in
     hbase-site.xml.</description>
   </property>
   <property>
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation.java
index b4888c5..cb31f02 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation.java
@@ -123,11 +123,15 @@ public abstract class 
ProcedureStorePerformanceEvaluation<T extends ProcedureSto
 
   protected abstract T createProcedureStore(Path storeDir) throws IOException;
 
+  protected void postStop(T store) throws IOException {
+  }
+
   private void tearDownProcedureStore() {
     Path storeDir = null;
     try {
       if (store != null) {
         store.stop(false);
+        postStop(store);
       }
       FileSystem fs = FileSystem.get(conf);
       storeDir = fs.makeQualified(new Path(outputPath));
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 90299e5..939e0dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1413,8 +1413,6 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 
1);
     startProcedureExecutor();
 
-    // Create cleaner thread pool
-    cleanerPool = new DirScanPool(conf);
     // Start log cleaner thread
     int cleanerInterval =
       conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, 
DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
@@ -1520,8 +1518,10 @@ public class HMaster extends HRegionServer implements 
MasterServices {
 
   private void createProcedureExecutor() throws IOException {
     MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
-    procedureStore =
-      new RegionProcedureStore(this, new 
MasterProcedureEnv.FsUtilsLeaseRecovery(this));
+    // Create cleaner thread pool
+    cleanerPool = new DirScanPool(conf);
+    procedureStore = new RegionProcedureStore(this, cleanerPool,
+      new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
     procedureStore.registerListener(new ProcedureStoreListener() {
 
       @Override
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
index 4b50ab4..1747da1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
@@ -117,25 +117,42 @@ public class HFileCleaner extends 
CleanerChore<BaseHFileCleanerDelegate>
    */
   public HFileCleaner(final int period, final Stoppable stopper, Configuration 
conf, FileSystem fs,
     Path directory, DirScanPool pool, Map<String, Object> params) {
-    super("HFileCleaner", period, stopper, conf, fs, directory, 
MASTER_HFILE_CLEANER_PLUGINS, pool,
+    this("HFileCleaner", period, stopper, conf, fs, directory, 
MASTER_HFILE_CLEANER_PLUGINS, pool,
       params);
+
+  }
+
+  /**
+   * For creating customized HFileCleaner.
+   * @param name name of the chore being run
+   * @param period the period of time to sleep between each run
+   * @param stopper the stopper
+   * @param conf configuration to use
+   * @param fs handle to the FS
+   * @param directory directory to be cleaned
+   * @param confKey configuration key for the classes to instantiate
+   * @param pool the thread pool used to scan directories
+   * @param params params could be used in subclass of BaseHFileCleanerDelegate
+   */
+  public HFileCleaner(String name, int period, Stoppable stopper, 
Configuration conf, FileSystem fs,
+    Path directory, String confKey, DirScanPool pool, Map<String, Object> 
params) {
+    super(name, period, stopper, conf, fs, directory, confKey, pool, params);
     throttlePoint =
-        conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, 
DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
+      conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, 
DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
     largeQueueInitSize =
-        conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, 
DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
+      conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, 
DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
     smallQueueInitSize =
-        conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, 
DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
+      conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, 
DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
     largeFileQueue = new StealJobQueue<>(largeQueueInitSize, 
smallQueueInitSize, COMPARATOR);
     smallFileQueue = largeFileQueue.getStealFromQueue();
     largeFileDeleteThreadNumber =
-        conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, 
DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
+      conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, 
DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
     smallFileDeleteThreadNumber =
-        conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, 
DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
+      conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, 
DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
     cleanerThreadTimeoutMsec =
-        conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, 
DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
-    cleanerThreadCheckIntervalMsec =
-        conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
-            DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
+      conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, 
DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
+    cleanerThreadCheckIntervalMsec = 
conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
+      DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
     startHFileDeleteThreads();
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
index 8e3ffa3..57e62dd 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
@@ -120,8 +120,8 @@ class RegionFlusherAndCompactor implements Closeable {
     flushThread.start();
     compactExecutor = Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder()
       
.setNameFormat("Procedure-Region-Store-Compactor").setDaemon(true).build());
-    LOG.info("Constructor flushSize={}, flushPerChanges={}, 
flushIntervalMs={}, " +
-        "compactMin={}", flushSize, flushPerChanges, flushIntervalMs, 
compactMin);
+    LOG.info("Constructor flushSize={}, flushPerChanges={}, 
flushIntervalMs={}, compactMin={}",
+      flushSize, flushPerChanges, flushIntervalMs, compactMin);
   }
 
   // inject our flush related configurations
@@ -139,6 +139,7 @@ class RegionFlusherAndCompactor implements Closeable {
   private void compact() {
     try {
       region.compact(true);
+      
Iterables.getOnlyElement(region.getStores()).closeAndArchiveCompactedFiles();
     } catch (IOException e) {
       LOG.error("Failed to compact procedure store region", e);
     }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
index bd86cfc..d153508 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
@@ -48,9 +48,12 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
 import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
 import org.apache.hadoop.hbase.master.assignment.UnassignProcedure;
+import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
+import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
 import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
 import org.apache.hadoop.hbase.procedure2.Procedure;
@@ -65,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -119,7 +123,7 @@ public class RegionProcedureStore extends 
ProcedureStoreBase {
 
   static final String MASTER_PROCEDURE_DIR = "MasterProcs";
 
-  static final String LOGCLEANER_PLUGINS = 
"hbase.procedure.store.region.logcleaner.plugins";
+  static final String HFILECLEANER_PLUGINS = 
"hbase.procedure.store.region.hfilecleaner.plugins";
 
   private static final String REPLAY_EDITS_DIR = "recovered.wals";
 
@@ -138,22 +142,31 @@ public class RegionProcedureStore extends 
ProcedureStoreBase {
 
   private final Server server;
 
+  private final DirScanPool cleanerPool;
+
   private final LeaseRecovery leaseRecovery;
 
+  // Used to delete the compacted hfiles. Since we put all data on WAL 
filesystem, it is not
+  // possible to move the compacted hfiles to the global hfile archive 
directory, we have to do it
+  // by ourselves.
+  private HFileCleaner cleaner;
+
   private WALFactory walFactory;
 
   @VisibleForTesting
   HRegion region;
 
-  private RegionFlusherAndCompactor flusherAndCompactor;
+  @VisibleForTesting
+  RegionFlusherAndCompactor flusherAndCompactor;
 
   @VisibleForTesting
   RegionProcedureStoreWALRoller walRoller;
 
   private int numThreads;
 
-  public RegionProcedureStore(Server server, LeaseRecovery leaseRecovery) {
+  public RegionProcedureStore(Server server, DirScanPool cleanerPool, 
LeaseRecovery leaseRecovery) {
     this.server = server;
+    this.cleanerPool = cleanerPool;
     this.leaseRecovery = leaseRecovery;
   }
 
@@ -193,6 +206,9 @@ public class RegionProcedureStore extends 
ProcedureStoreBase {
       return;
     }
     LOG.info("Stopping the Region Procedure Store, isAbort={}", abort);
+    if (cleaner != null) {
+      cleaner.cancel(abort);
+    }
     if (flusherAndCompactor != null) {
       flusherAndCompactor.close();
     }
@@ -423,11 +439,11 @@ public class RegionProcedureStore extends 
ProcedureStoreBase {
     } else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
       LOG.warn("The WALProcedureStore max pid is less than the max pid of all 
loaded procedures");
     }
+    store.stop(false);
     if (!fs.delete(procWALDir, true)) {
-      throw new IOException("Failed to delete the WALProcedureStore migrated 
proc wal directory " +
-        procWALDir);
+      throw new IOException(
+        "Failed to delete the WALProcedureStore migrated proc wal directory " 
+ procWALDir);
     }
-    store.stop(true);
     LOG.info("Migration of WALProcedureStore finished");
   }
 
@@ -463,6 +479,16 @@ public class RegionProcedureStore extends 
ProcedureStoreBase {
     }
     flusherAndCompactor = new RegionFlusherAndCompactor(conf, server, region);
     walRoller.setFlusherAndCompactor(flusherAndCompactor);
+    int cleanerInterval = conf.getInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL,
+      HMaster.DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
+    Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
+    if (!fs.mkdirs(archiveDir)) {
+      LOG.warn("Failed to create archive directory {}. Usually this should not 
happen but it will" +
+        " be created again when we actually archive the hfiles later, so 
continue", archiveDir);
+    }
+    cleaner = new HFileCleaner("RegionProcedureStoreHFileCleaner", 
cleanerInterval, server, conf,
+      fs, archiveDir, HFILECLEANER_PLUGINS, cleanerPool, 
Collections.emptyMap());
+    server.getChoreService().scheduleChore(cleaner);
     tryMigrate(fs);
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java
index f3ab2e5..92be897 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
+import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
 import 
org.apache.hadoop.hbase.procedure2.store.ProcedureStorePerformanceEvaluation;
 import org.apache.hadoop.hbase.regionserver.ChunkCreator;
 import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
@@ -46,21 +47,29 @@ public class RegionProcedureStorePerformanceEvaluation
     private final ServerName serverName =
       ServerName.valueOf("localhost", 12345, System.currentTimeMillis());
 
+    private final ChoreService choreService;
+
+    private volatile boolean abort = false;
+
     public MockServer(Configuration conf) {
       this.conf = conf;
+      this.choreService = new ChoreService("Cleaner-Chore-Service");
     }
 
     @Override
     public void abort(String why, Throwable e) {
+      abort = true;
+      choreService.shutdown();
     }
 
     @Override
     public boolean isAborted() {
-      return false;
+      return abort;
     }
 
     @Override
     public void stop(String why) {
+      choreService.shutdown();
     }
 
     @Override
@@ -105,10 +114,12 @@ public class RegionProcedureStorePerformanceEvaluation
 
     @Override
     public ChoreService getChoreService() {
-      throw new UnsupportedOperationException();
+      return choreService;
     }
   }
 
+  private DirScanPool cleanerPool;
+
   @Override
   protected RegionProcedureStore createProcedureStore(Path storeDir) throws 
IOException {
     Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf);
@@ -123,7 +134,8 @@ public class RegionProcedureStorePerformanceEvaluation
       initialCountPercentage, null);
     conf.setBoolean(RegionProcedureStore.USE_HSYNC_KEY, 
"hsync".equals(syncType));
     CommonFSUtils.setRootDir(conf, storeDir);
-    return new RegionProcedureStore(new MockServer(conf), (fs, apth) -> {
+    cleanerPool = new DirScanPool(conf);
+    return new RegionProcedureStore(new MockServer(conf), cleanerPool, (fs, 
apth) -> {
     });
   }
 
@@ -138,6 +150,11 @@ public class RegionProcedureStorePerformanceEvaluation
   protected void preWrite(long procId) throws IOException {
   }
 
+  @Override
+  protected void postStop(RegionProcedureStore store) throws IOException {
+    cleanerPool.shutdownNow();
+  }
+
   public static void main(String[] args) throws IOException {
     RegionProcedureStorePerformanceEvaluation tool =
       new RegionProcedureStorePerformanceEvaluation();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java
index 6f07805..c5694d2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java
@@ -18,8 +18,11 @@
 package org.apache.hadoop.hbase.procedure2.store.region;
 
 import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
 import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -32,18 +35,31 @@ public class RegionProcedureStoreTestBase {
 
   protected RegionProcedureStore store;
 
+  protected ChoreService choreService;
+
+  protected DirScanPool cleanerPool;
+
+  protected void configure(Configuration conf) {
+  }
+
   @Before
   public void setUp() throws IOException {
     htu = new HBaseCommonTestingUtility();
     htu.getConfiguration().setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
+    configure(htu.getConfiguration());
     Path testDir = htu.getDataTestDir();
     CommonFSUtils.setWALRootDir(htu.getConfiguration(), testDir);
-    store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), 
new LoadCounter());
+    choreService = new ChoreService(getClass().getSimpleName());
+    cleanerPool = new DirScanPool(htu.getConfiguration());
+    store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), 
choreService,
+      cleanerPool, new LoadCounter());
   }
 
   @After
   public void tearDown() throws IOException {
     store.stop(true);
+    cleanerPool.shutdownNow();
+    choreService.shutdown();
     htu.cleanupTestDir();
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
index d550e7f..5497b8a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
@@ -24,8 +24,10 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
 import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
 
@@ -34,13 +36,14 @@ final class RegionProcedureStoreTestHelper {
   private RegionProcedureStoreTestHelper() {
   }
 
-  static RegionProcedureStore createStore(Configuration conf, ProcedureLoader 
loader)
-    throws IOException {
+  static RegionProcedureStore createStore(Configuration conf, ChoreService 
choreService,
+    DirScanPool cleanerPool, ProcedureLoader loader) throws IOException {
     Server server = mock(Server.class);
     when(server.getConfiguration()).thenReturn(conf);
     when(server.getServerName())
       .thenReturn(ServerName.valueOf("localhost", 12345, 
System.currentTimeMillis()));
-    RegionProcedureStore store = new RegionProcedureStore(server, new 
LeaseRecovery() {
+    when(server.getChoreService()).thenReturn(choreService);
+    RegionProcedureStore store = new RegionProcedureStore(server, cleanerPool, 
new LeaseRecovery() {
 
       @Override
       public void recoverFileLease(FileSystem fs, Path path) throws 
IOException {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreCompaction.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreCompaction.java
new file mode 100644
index 0000000..15682bb
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreCompaction.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestRegionProcedureStoreCompaction extends 
RegionProcedureStoreTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionProcedureStoreCompaction.class);
+
+  private int compactMin = 4;
+
+  @Override
+  protected void configure(Configuration conf) {
+    conf.setInt(RegionFlusherAndCompactor.COMPACT_MIN_KEY, compactMin);
+    conf.setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 500);
+    conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 5000);
+  }
+
+  private int getStorefilesCount() {
+    return 
Iterables.getOnlyElement(store.region.getStores()).getStorefilesCount();
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException {
+    for (int i = 0; i < compactMin - 1; i++) {
+      store.insert(new RegionProcedureStoreTestProcedure(), null);
+      store.region.flush(true);
+    }
+    assertEquals(compactMin - 1, getStorefilesCount());
+    store.insert(new RegionProcedureStoreTestProcedure(), null);
+    store.flusherAndCompactor.requestFlush();
+    htu.waitFor(15000, () -> getStorefilesCount() == 1);
+    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(
+      new Path(htu.getDataTestDir(), 
RegionProcedureStore.MASTER_PROCEDURE_DIR),
+      store.region.getRegionInfo(), RegionProcedureStore.FAMILY);
+    FileSystem fs = storeArchiveDir.getFileSystem(htu.getConfiguration());
+    // after compaction, the old hfiles should have been compacted
+    htu.waitFor(15000, () -> {
+      try {
+        FileStatus[] fses = fs.listStatus(storeArchiveDir);
+        return fses != null && fses.length == compactMin;
+      } catch (FileNotFoundException e) {
+        return false;
+      }
+    });
+    // ttl has not expired, so should not delete any files
+    Thread.sleep(1000);
+    FileStatus[] compactedHFiles = fs.listStatus(storeArchiveDir);
+    assertEquals(4, compactedHFiles.length);
+    Thread.sleep(2000);
+    // touch one file
+    long currentTime = System.currentTimeMillis();
+    fs.setTimes(compactedHFiles[0].getPath(), currentTime, currentTime);
+    Thread.sleep(3000);
+    // only the touched file is still there after clean up
+    FileStatus[] remainingHFiles = fs.listStatus(storeArchiveDir);
+    assertEquals(1, remainingHFiles.length);
+    assertEquals(compactedHFiles[0].getPath(), remainingHFiles[0].getPath());
+    Thread.sleep(6000);
+    // the touched file should also be cleaned up and then the cleaner will 
delete the parent
+    // directory since it is empty.
+    assertFalse(fs.exists(storeArchiveDir));
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
index 44d7daa..9a49361 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
@@ -32,12 +32,14 @@ import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
+import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
 import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
 import 
org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
@@ -67,6 +69,10 @@ public class TestRegionProcedureStoreMigration {
 
   private WALProcedureStore walStore;
 
+  private ChoreService choreService;
+
+  private DirScanPool cleanerPool;
+
   @Before
   public void setUp() throws IOException {
     htu = new HBaseCommonTestingUtility();
@@ -83,6 +89,8 @@ public class TestRegionProcedureStoreMigration {
     walStore.start(1);
     walStore.recoverLease();
     walStore.load(new LoadCounter());
+    choreService = new ChoreService(getClass().getSimpleName());
+    cleanerPool = new DirScanPool(htu.getConfiguration());
   }
 
   @After
@@ -91,6 +99,8 @@ public class TestRegionProcedureStoreMigration {
       store.stop(true);
     }
     walStore.stop(true);
+    cleanerPool.shutdownNow();
+    choreService.shutdown();
     htu.cleanupTestDir();
   }
 
@@ -109,8 +119,8 @@ public class TestRegionProcedureStoreMigration {
     SortedSet<RegionProcedureStoreTestProcedure> loadedProcs =
       new TreeSet<>((p1, p2) -> Long.compare(p1.getProcId(), p2.getProcId()));
     MutableLong maxProcIdSet = new MutableLong(0);
-    store =
-      RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new 
ProcedureLoader() {
+    store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), 
choreService,
+      cleanerPool, new ProcedureLoader() {
 
         @Override
         public void setMaxProcId(long maxProcId) {
@@ -156,7 +166,8 @@ public class TestRegionProcedureStoreMigration {
     walStore.stop(true);
 
     try {
-      store = 
RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new 
LoadCounter());
+      store = 
RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
+        cleanerPool, new LoadCounter());
       fail("Should fail since AssignProcedure is not supported");
     } catch (HBaseIOException e) {
       assertThat(e.getMessage(), startsWith("Unsupported"));
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java
index 826c763..db49942 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java
@@ -95,7 +95,8 @@ public class TestRegionProcedureStoreWALCleaner {
       }
     }, conf, fs, globalWALArchiveDir, dirScanPool);
     choreService.scheduleChore(logCleaner);
-    store = RegionProcedureStoreTestHelper.createStore(conf, new 
LoadCounter());
+    store = RegionProcedureStoreTestHelper.createStore(conf, choreService, 
dirScanPool,
+      new LoadCounter());
   }
 
   @After

Reply via email to