This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cf728da5a0 HBASE-26969:Eliminate MOB renames when SFT is enabled 
(#4418)
5cf728da5a0 is described below

commit 5cf728da5a0fea5fd94bb31f06c9a43158ace6b4
Author: BukrosSzabolcs <[email protected]>
AuthorDate: Tue Jun 21 11:18:55 2022 +0200

    HBASE-26969:Eliminate MOB renames when SFT is enabled (#4418)
    
    Signed-off-by: Wellington Chevreuil <[email protected]>
---
 .../hadoop/hbase/mob/DefaultMobStoreCompactor.java |  29 ++-
 .../hadoop/hbase/mob/DefaultMobStoreFlusher.java   |  16 +-
 .../hadoop/hbase/mob/MobFileCleanerChore.java      |  44 ++--
 .../java/org/apache/hadoop/hbase/mob/MobUtils.java |  31 ++-
 .../hadoop/hbase/mob/RSMobFileCleanerChore.java    | 272 +++++++++++++++++++++
 .../hadoop/hbase/regionserver/HMobStore.java       |  39 ++-
 .../hadoop/hbase/regionserver/HRegionServer.java   |  15 ++
 .../apache/hadoop/hbase/regionserver/HStore.java   |   2 +-
 .../hbase/regionserver/compactions/Compactor.java  |  23 +-
 .../hadoop/hbase/mob/FaultyMobStoreCompactor.java  |   6 +-
 .../org/apache/hadoop/hbase/mob/MobTestUtil.java   |  22 ++
 .../hbase/mob/TestDefaultMobStoreFlusher.java      |  38 ++-
 .../hadoop/hbase/mob/TestMobCompactionOptMode.java |  11 +-
 .../mob/TestMobCompactionOptRegionBatchMode.java   |  14 +-
 .../TestMobCompactionRegularRegionBatchMode.java   |  13 +-
 .../hbase/mob/TestMobCompactionWithDefaults.java   |  57 +++--
 .../hadoop/hbase/mob/TestMobFileCleanerChore.java  |  29 ++-
 .../hadoop/hbase/mob/TestMobStoreCompaction.java   |  29 ++-
 .../org/apache/hadoop/hbase/mob/TestMobUtils.java  |   5 +
 ...erChore.java => TestRSMobFileCleanerChore.java} |  85 +++++--
 20 files changed, 664 insertions(+), 116 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index 57a991e45f3..f568af0b19a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -293,20 +293,20 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
    * @param cleanSeqId           When true, remove seqId(used to be mvcc) 
value which is <=
    *                             smallestReadPoint
    * @param throughputController The compaction throughput controller.
-   * @param major                Is a major compaction.
-   * @param numofFilesToCompact  the number of files to compact
+   * @param request              compaction request.
    * @param progress             Progress reporter.
    * @return Whether compaction ended; false if it was interrupted for any 
reason.
    */
   @Override
   protected boolean performCompaction(FileDetails fd, InternalScanner scanner, 
CellSink writer,
     long smallestReadPoint, boolean cleanSeqId, ThroughputController 
throughputController,
-    boolean major, int numofFilesToCompact, CompactionProgress progress) 
throws IOException {
+    CompactionRequestImpl request, CompactionProgress progress) throws 
IOException {
     long bytesWrittenProgressForLog = 0;
     long bytesWrittenProgressForShippedCall = 0;
     // Clear old mob references
     mobRefSet.get().clear();
     boolean isUserRequest = userRequest.get();
+    boolean major = request.isAllFiles();
     boolean compactMOBs = major && isUserRequest;
     boolean discardMobMiss = 
conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY,
       MobConstants.DEFAULT_MOB_DISCARD_MISS);
@@ -350,12 +350,12 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
     throughputController.start(compactionName);
     KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? 
(KeyValueScanner) scanner : null;
     long shippedCallSizeLimit =
-      (long) numofFilesToCompact * 
this.store.getColumnFamilyDescriptor().getBlocksize();
+      (long) request.getFiles().size() * 
this.store.getColumnFamilyDescriptor().getBlocksize();
 
     Cell mobCell = null;
     try {
 
-      mobFileWriter = newMobWriter(fd, major);
+      mobFileWriter = newMobWriter(fd, major, 
request.getWriterCreationTracker());
       fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
 
       do {
@@ -435,7 +435,7 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
                       LOG.debug("Closing output MOB File, length={} file={}, 
store={}", len,
                         mobFileWriter.getPath().getName(), getStoreInfo());
                       commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, 
mobCells, major);
-                      mobFileWriter = newMobWriter(fd, major);
+                      mobFileWriter = newMobWriter(fd, major, 
request.getWriterCreationTracker());
                       fileName = 
Bytes.toBytes(mobFileWriter.getPath().getName());
                       mobCells = 0;
                     }
@@ -479,7 +479,7 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
                   long len = mobFileWriter.getPos();
                   if (len > maxMobFileSize) {
                     commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, 
mobCells, major);
-                    mobFileWriter = newMobWriter(fd, major);
+                    mobFileWriter = newMobWriter(fd, major, 
request.getWriterCreationTracker());
                     fileName = 
Bytes.toBytes(mobFileWriter.getPath().getName());
                     mobCells = 0;
                   }
@@ -531,7 +531,7 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
               long len = mobFileWriter.getPos();
               if (len > maxMobFileSize) {
                 commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, 
major);
-                mobFileWriter = newMobWriter(fd, major);
+                mobFileWriter = newMobWriter(fd, major, 
request.getWriterCreationTracker());
                 fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
                 mobCells = 0;
               }
@@ -617,11 +617,16 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
     }
   }
 
-  private StoreFileWriter newMobWriter(FileDetails fd, boolean major) throws 
IOException {
+  private StoreFileWriter newMobWriter(FileDetails fd, boolean major,
+    Consumer<Path> writerCreationTracker) throws IOException {
     try {
-      StoreFileWriter mobFileWriter = mobStore.createWriterInTmp(new 
Date(fd.latestPutTs),
-        fd.maxKeyCount, major ? majorCompactionCompression : 
minorCompactionCompression,
-        store.getRegionInfo().getStartKey(), true);
+      StoreFileWriter mobFileWriter = 
mobStore.getStoreEngine().requireWritingToTmpDirFirst()
+        ? mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
+          major ? majorCompactionCompression : minorCompactionCompression,
+          store.getRegionInfo().getStartKey(), true)
+        : mobStore.createWriter(new Date(fd.latestPutTs), fd.maxKeyCount,
+          major ? majorCompactionCompression : minorCompactionCompression,
+          store.getRegionInfo().getStartKey(), true, writerCreationTracker);
       LOG.debug("New MOB writer created={} store={}", 
mobFileWriter.getPath().getName(),
         getStoreInfo());
       // Add reference we get for compact MOB
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index a7f2ecdf242..6df17e58e22 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -127,7 +127,8 @@ public class DefaultMobStoreFlusher extends 
DefaultStoreFlusher {
         try {
           // It's a mob store, flush the cells in a mob way. This is the 
difference of flushing
           // between a normal and a mob store.
-          performMobFlush(snapshot, cacheFlushId, scanner, writer, status, 
throughputController);
+          performMobFlush(snapshot, cacheFlushId, scanner, writer, status, 
throughputController,
+            writerCreationTracker);
         } catch (IOException ioe) {
           e = ioe;
           // throw the exception out
@@ -171,16 +172,21 @@ public class DefaultMobStoreFlusher extends 
DefaultStoreFlusher {
    */
   protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
     InternalScanner scanner, StoreFileWriter writer, MonitoredTask status,
-    ThroughputController throughputController) throws IOException {
+    ThroughputController throughputController, Consumer<Path> 
writerCreationTracker)
+    throws IOException {
     StoreFileWriter mobFileWriter = null;
     int compactionKVMax =
       conf.getInt(HConstants.COMPACTION_KV_MAX, 
HConstants.COMPACTION_KV_MAX_DEFAULT);
     long mobCount = 0;
     long mobSize = 0;
     long time = snapshot.getTimeRangeTracker().getMax();
-    mobFileWriter = mobStore.createWriterInTmp(new Date(time), 
snapshot.getCellsCount(),
-      store.getColumnFamilyDescriptor().getCompressionType(), 
store.getRegionInfo().getStartKey(),
-      false);
+    mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst()
+      ? mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
+        store.getColumnFamilyDescriptor().getCompressionType(), 
store.getRegionInfo().getStartKey(),
+        false)
+      : mobStore.createWriter(new Date(time), snapshot.getCellsCount(),
+        store.getColumnFamilyDescriptor().getCompressionType(), 
store.getRegionInfo().getStartKey(),
+        false, writerCreationTracker);
     // the target path is {tableName}/.mob/{cfName}/mobFiles
     // the relative path is mobFiles
     byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
index 1258a17a8eb..2c78c6f5ac7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
@@ -160,14 +160,15 @@ public class MobFileCleanerChore extends ScheduledChore {
           maxCreationTimeToArchive, table);
       }
 
+      FileSystem fs = FileSystem.get(conf);
+      Set<String> regionNames = new HashSet<>();
       Path rootDir = CommonFSUtils.getRootDir(conf);
       Path tableDir = CommonFSUtils.getTableDir(rootDir, table);
-      // How safe is this call?
-      List<Path> regionDirs = FSUtils.getRegionDirs(FileSystem.get(conf), 
tableDir);
+      List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
 
       Set<String> allActiveMobFileName = new HashSet<String>();
-      FileSystem fs = FileSystem.get(conf);
       for (Path regionPath : regionDirs) {
+        regionNames.add(regionPath.getName());
         for (ColumnFamilyDescriptor hcd : list) {
           String family = hcd.getNameAsString();
           Path storePath = new Path(regionPath, family);
@@ -195,13 +196,26 @@ public class MobFileCleanerChore extends ScheduledChore {
               for (Path pp : storeFiles) {
                 currentPath = pp;
                 LOG.trace("Store file: {}", pp);
-                HStoreFile sf =
-                  new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, 
BloomType.NONE, true);
-                sf.initReader();
-                byte[] mobRefData = 
sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
-                byte[] bulkloadMarkerData = 
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
-                // close store file to avoid memory leaks
-                sf.closeStoreFile(true);
+                HStoreFile sf = null;
+                byte[] mobRefData = null;
+                byte[] bulkloadMarkerData = null;
+                try {
+                  sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, 
BloomType.NONE, true);
+                  sf.initReader();
+                  mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
+                  bulkloadMarkerData = 
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
+                  // close store file to avoid memory leaks
+                  sf.closeStoreFile(true);
+                } catch (IOException ex) {
+                  // When FileBased SFT is active the store dir can contain 
corrupted or incomplete
+                  // files. So read errors are expected. We just skip these 
files.
+                  if (ex instanceof FileNotFoundException) {
+                    throw ex;
+                  }
+                  LOG.debug("Failed to get mob data from file: {} due to 
error.", pp.toString(),
+                    ex);
+                  continue;
+                }
                 if (mobRefData == null) {
                   if (bulkloadMarkerData == null) {
                     LOG.warn("Found old store file with no MOB_FILE_REFS: {} - 
"
@@ -256,9 +270,11 @@ public class MobFileCleanerChore extends ScheduledChore {
         while (rit.hasNext()) {
           LocatedFileStatus lfs = rit.next();
           Path p = lfs.getPath();
-          if (!allActiveMobFileName.contains(p.getName())) {
-            // MOB is not in a list of active references, but it can be too
-            // fresh, skip it in this case
+          String[] mobParts = p.getName().split("_");
+          String regionName = mobParts[mobParts.length - 1];
+
+          if (!regionNames.contains(regionName)) {
+            // MOB belonged to a region no longer hosted
             long creationTime = fs.getFileStatus(p).getModificationTime();
             if (creationTime < maxCreationTimeToArchive) {
               LOG.trace("Archiving MOB file {} creation time={}", p,
@@ -269,7 +285,7 @@ public class MobFileCleanerChore extends ScheduledChore {
                 fs.getFileStatus(p).getModificationTime());
             }
           } else {
-            LOG.trace("Keeping active MOB file: {}", p);
+            LOG.trace("Keeping MOB file with existing region: {}", p);
           }
         }
         LOG.info(" MOB Cleaner found {} files to archive for table={} 
family={}", toArchive.size(),
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index 5e2ee9eb411..43cf4255235 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -33,6 +33,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -584,6 +585,33 @@ public final class MobUtils {
     CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType 
checksumType,
     int bytesPerChecksum, int blocksize, BloomType bloomType, boolean 
isCompaction)
     throws IOException {
+    return createWriter(conf, fs, family, path, maxKeyCount, compression, 
cacheConfig,
+      cryptoContext, checksumType, bytesPerChecksum, blocksize, bloomType, 
isCompaction, null);
+  }
+
+  /**
+   * Creates a writer for the mob file in temp directory.
+   * @param conf                  The current configuration.
+   * @param fs                    The current file system.
+   * @param family                The descriptor of the current column family.
+   * @param path                  The path for a temp directory.
+   * @param maxKeyCount           The key count.
+   * @param compression           The compression algorithm.
+   * @param cacheConfig           The current cache config.
+   * @param cryptoContext         The encryption context.
+   * @param checksumType          The checksum type.
+   * @param bytesPerChecksum      The bytes per checksum.
+   * @param blocksize             The HFile block size.
+   * @param bloomType             The bloom filter type.
+   * @param isCompaction          If the writer is used in compaction.
+   * @param writerCreationTracker to track the current writer in the store
+   * @return The writer for the mob file.
+   */
+  public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
+    ColumnFamilyDescriptor family, Path path, long maxKeyCount, 
Compression.Algorithm compression,
+    CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType 
checksumType,
+    int bytesPerChecksum, int blocksize, BloomType bloomType, boolean 
isCompaction,
+    Consumer<Path> writerCreationTracker) throws IOException {
     if (compression == null) {
       compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
     }
@@ -602,7 +630,8 @@ public final class MobUtils {
       .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
 
     StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, 
fs).withFilePath(path)
-      
.withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
+      
.withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext)
+      .withWriterCreationTracker(writerCreationTracker).build();
     return w;
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
new file mode 100644
index 00000000000..06e34988733
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
+
+/**
+ * The class RSMobFileCleanerChore for running cleaner regularly to remove the 
obsolete (files which
+ * have no active references to) mob files that were referenced from the 
current RS.
+ */
[email protected]
+public class RSMobFileCleanerChore extends ScheduledChore {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RSMobFileCleanerChore.class);
+  private final HRegionServer rs;
+
+  public RSMobFileCleanerChore(HRegionServer rs) {
+    super(rs.getServerName() + "-MobFileCleanerChore", rs,
+      rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
+        MobConstants.DEFAULT_MOB_CLEANER_PERIOD),
+      Math.round(rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
+        MobConstants.DEFAULT_MOB_CLEANER_PERIOD)
+        * ((ThreadLocalRandom.current().nextDouble() + 0.5D))),
+      TimeUnit.SECONDS);
+    // to prevent a load spike on the fs the initial delay is modified by +/- 
50%
+    this.rs = rs;
+  }
+
+  public RSMobFileCleanerChore() {
+    this.rs = null;
+  }
+
+  @Override
+  protected void chore() {
+
+    long minAgeToArchive = 
rs.getConfiguration().getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY,
+      MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
+    // We check only those MOB files, which creation time is less
+    // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
+    // gives us full confidence that all corresponding store files will
+    // exist at the time cleaning procedure begins and will be examined.
+    // So, if MOB file creation time is greater than this maxTimeToArchive,
+    // this will be skipped and won't be archived.
+    long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - 
minAgeToArchive;
+
+    TableDescriptors htds = rs.getTableDescriptors();
+    try {
+      FileSystem fs = FileSystem.get(rs.getConfiguration());
+
+      Map<String, TableDescriptor> map = null;
+      try {
+        map = htds.getAll();
+      } catch (IOException e) {
+        LOG.error("MobFileCleanerChore failed", e);
+        return;
+      }
+      Map<String, Map<String, List<String>>> referencedMOBs = new HashMap<>();
+      for (TableDescriptor htd : map.values()) {
+        // Now clean obsolete files for a table
+        LOG.info("Cleaning obsolete MOB files from table={}", 
htd.getTableName());
+        List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
+        List<HRegion> regions = rs.getRegions(htd.getTableName());
+        for (HRegion region : regions) {
+          for (ColumnFamilyDescriptor hcd : list) {
+            HStore store = region.getStore(hcd.getName());
+            Collection<HStoreFile> sfs = store.getStorefiles();
+            Set<String> regionMobs = new HashSet<String>();
+            Path currentPath = null;
+            try {
+              // collectinng referenced MOBs
+              for (HStoreFile sf : sfs) {
+                currentPath = sf.getPath();
+                sf.initReader();
+                byte[] mobRefData = 
sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
+                byte[] bulkloadMarkerData = 
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
+                // close store file to avoid memory leaks
+                sf.closeStoreFile(true);
+                if (mobRefData == null) {
+                  if (bulkloadMarkerData == null) {
+                    LOG.warn(
+                      "Found old store file with no MOB_FILE_REFS: {} - "
+                        + "can not proceed until all old files will be 
MOB-compacted.",
+                      currentPath);
+                    return;
+                  } else {
+                    LOG.debug("Skipping file without MOB references 
(bulkloaded file):{}",
+                      currentPath);
+                    continue;
+                  }
+                }
+                // file may or may not have MOB references, but was created by 
the distributed
+                // mob compaction code.
+                try {
+                  SetMultimap<TableName, String> mobs =
+                    MobUtils.deserializeMobFileRefs(mobRefData).build();
+                  LOG.debug("Found {} mob references for store={}", 
mobs.size(), sf);
+                  LOG.trace("Specific mob references found for store={} : {}", 
sf, mobs);
+                  regionMobs.addAll(mobs.values());
+                } catch (RuntimeException exception) {
+                  throw new IOException("failure getting mob references for 
hfile " + sf,
+                    exception);
+                }
+              }
+              // collecting files, MOB included currently being written
+              regionMobs.addAll(store.getStoreFilesBeingWritten().stream()
+                .map(path -> path.getName()).collect(Collectors.toList()));
+
+              referencedMOBs
+                .computeIfAbsent(hcd.getNameAsString(), cf -> new 
HashMap<String, List<String>>())
+                .computeIfAbsent(region.getRegionInfo().getEncodedName(), name 
-> new ArrayList<>())
+                .addAll(regionMobs);
+
+            } catch (FileNotFoundException e) {
+              LOG.warn(
+                "Missing file:{} Starting MOB cleaning cycle from the 
beginning" + " due to error",
+                currentPath, e);
+              regionMobs.clear();
+              continue;
+            } catch (IOException e) {
+              LOG.error("Failed to clean the obsolete mob files for table={}",
+                htd.getTableName().getNameAsString(), e);
+            }
+          }
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Found: {} active mob refs for table={}",
+            referencedMOBs.values().stream().map(inner -> inner.values())
+              .flatMap(lists -> lists.stream()).mapToInt(lists -> 
lists.size()).sum(),
+            htd.getTableName().getNameAsString());
+        }
+        if (LOG.isTraceEnabled()) {
+          referencedMOBs.values().stream().forEach(innerMap -> 
innerMap.values().stream()
+            .forEach(mobFileList -> mobFileList.stream().forEach(LOG::trace)));
+        }
+
+        // collect regions referencing MOB files belonging to the current rs
+        Set<String> regionsCovered = new HashSet<>();
+        referencedMOBs.values().stream()
+          .forEach(regionMap -> regionsCovered.addAll(regionMap.keySet()));
+
+        for (ColumnFamilyDescriptor hcd : list) {
+          List<Path> toArchive = new ArrayList<Path>();
+          String family = hcd.getNameAsString();
+          Path dir = MobUtils.getMobFamilyPath(rs.getConfiguration(), 
htd.getTableName(), family);
+          RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
+          while (rit.hasNext()) {
+            LocatedFileStatus lfs = rit.next();
+            Path p = lfs.getPath();
+            String[] mobParts = p.getName().split("_");
+            String regionName = mobParts[mobParts.length - 1];
+
+            // skip MOB files not belonging to a region assigned to the 
current rs
+            if (!regionsCovered.contains(regionName)) {
+              LOG.trace("MOB file does not belong to current rs: {}", p);
+              continue;
+            }
+
+            // check active or actively written mob files
+            Map<String, List<String>> cfMobs = 
referencedMOBs.get(hcd.getNameAsString());
+            if (
+              cfMobs != null && cfMobs.get(regionName) != null
+                && cfMobs.get(regionName).contains(p.getName())
+            ) {
+              LOG.trace("Keeping active MOB file: {}", p);
+              continue;
+            }
+
+            // MOB is not in a list of active references, but it can be too
+            // fresh, skip it in this case
+            long creationTime = fs.getFileStatus(p).getModificationTime();
+            if (creationTime < maxCreationTimeToArchive) {
+              LOG.trace("Archiving MOB file {} creation time={}", p,
+                (fs.getFileStatus(p).getModificationTime()));
+              toArchive.add(p);
+            } else {
+              LOG.trace("Skipping fresh file: {}. Creation time={}", p,
+                fs.getFileStatus(p).getModificationTime());
+            }
+
+          }
+          LOG.info(" MOB Cleaner found {} files to archive for table={} 
family={}",
+            toArchive.size(), htd.getTableName().getNameAsString(), family);
+          archiveMobFiles(rs.getConfiguration(), htd.getTableName(), 
family.getBytes(), toArchive);
+          LOG.info(" MOB Cleaner archived {} files, table={} family={}", 
toArchive.size(),
+            htd.getTableName().getNameAsString(), family);
+        }
+
+        LOG.info("Cleaning obsolete MOB files finished for table={}", 
htd.getTableName());
+
+      }
+    } catch (IOException e) {
+      LOG.error("MOB Cleaner failed when trying to access the file system", e);
+    }
+  }
+
+  /**
+   * Archives the mob files.
+   * @param conf       The current configuration.
+   * @param tableName  The table name.
+   * @param family     The name of the column family.
+   * @param storeFiles The files to be archived.
+   * @throws IOException exception
+   */
+  public void archiveMobFiles(Configuration conf, TableName tableName, byte[] 
family,
+    List<Path> storeFiles) throws IOException {
+
+    if (storeFiles.size() == 0) {
+      // nothing to remove
+      LOG.debug("Skipping archiving old MOB files - no files found for 
table={} cf={}", tableName,
+        Bytes.toString(family));
+      return;
+    }
+    Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), 
tableName);
+    FileSystem fs = storeFiles.get(0).getFileSystem(conf);
+
+    for (Path p : storeFiles) {
+      LOG.debug("MOB Cleaner is archiving: {}", p);
+      HFileArchiver.archiveStoreFile(conf, fs, 
MobUtils.getMobRegionInfo(tableName), mobTableDir,
+        family, p);
+    }
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 931357a9fc5..13b7cc022bb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -28,6 +28,7 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -184,7 +185,27 @@ public class HMobStore extends HStore {
     }
     Path path = getTempDir();
     return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, 
compression, startKey,
-      isCompaction);
+      isCompaction, null);
+  }
+
+  /**
+   * Creates the writer for the mob file in the mob family directory.
+   * @param date         The latest date of written cells.
+   * @param maxKeyCount  The key count.
+   * @param compression  The compression algorithm.
+   * @param startKey     The start key.
+   * @param isCompaction If the writer is used in compaction.
+   * @return The writer for the mob file. n
+   */
+  public StoreFileWriter createWriter(Date date, long maxKeyCount,
+    Compression.Algorithm compression, byte[] startKey, boolean isCompaction,
+    Consumer<Path> writerCreationTracker) throws IOException {
+    if (startKey == null) {
+      startKey = HConstants.EMPTY_START_ROW;
+    }
+    Path path = getPath();
+    return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, 
compression, startKey,
+      isCompaction, writerCreationTracker);
   }
 
   /**
@@ -198,11 +219,13 @@ public class HMobStore extends HStore {
    * @return The writer for the mob file. n
    */
   public StoreFileWriter createWriterInTmp(String date, Path basePath, long 
maxKeyCount,
-    Compression.Algorithm compression, byte[] startKey, boolean isCompaction) 
throws IOException {
+    Compression.Algorithm compression, byte[] startKey, boolean isCompaction,
+    Consumer<Path> writerCreationTracker) throws IOException {
     MobFileName mobFileName =
       MobFileName.create(startKey, date, 
UUID.randomUUID().toString().replaceAll("-", ""),
         getHRegion().getRegionInfo().getEncodedName());
-    return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, 
isCompaction);
+    return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, 
isCompaction,
+      writerCreationTracker);
   }
 
   /**
@@ -214,13 +237,15 @@ public class HMobStore extends HStore {
    * @param isCompaction If the writer is used in compaction.
    * @return The writer for the mob file. n
    */
+
   public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path 
basePath, long maxKeyCount,
-    Compression.Algorithm compression, boolean isCompaction) throws 
IOException {
+    Compression.Algorithm compression, boolean isCompaction, Consumer<Path> 
writerCreationTracker)
+    throws IOException {
     return MobUtils.createWriter(conf, getFileSystem(), 
getColumnFamilyDescriptor(),
       new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, 
getCacheConfig(),
       getStoreContext().getEncryptionContext(), 
StoreUtils.getChecksumType(conf),
       StoreUtils.getBytesPerChecksum(conf), getStoreContext().getBlockSize(), 
BloomType.NONE,
-      isCompaction);
+      isCompaction, writerCreationTracker);
   }
 
   /**
@@ -234,6 +259,10 @@ public class HMobStore extends HStore {
     }
     Path dstPath = new Path(targetPath, sourceFile.getName());
     validateMobFile(sourceFile);
+    if (sourceFile.equals(targetPath)) {
+      LOG.info("File is already in the destination dir: {}", sourceFile);
+      return;
+    }
     LOG.info(" FLUSH Renaming flushed file from {} to {}", sourceFile, 
dstPath);
     Path parent = dstPath.getParent();
     if (!getFileSystem().exists(parent)) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 1865929dc5e..e79f4bec612 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -108,6 +108,7 @@ import 
org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.mob.MobFileCache;
+import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
 import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
@@ -438,6 +439,8 @@ public class HRegionServer extends 
HBaseServerBase<RSRpcServices>
 
   private BrokenStoreFileCleaner brokenStoreFileCleaner;
 
+  private RSMobFileCleanerChore rsMobFileCleanerChore;
+
   @InterfaceAudience.Private
   CompactedHFilesDischarger compactedFileDischarger;
 
@@ -1898,6 +1901,10 @@ public class HRegionServer extends 
HBaseServerBase<RSRpcServices>
       choreService.scheduleChore(brokenStoreFileCleaner);
     }
 
+    if (this.rsMobFileCleanerChore != null) {
+      choreService.scheduleChore(rsMobFileCleanerChore);
+    }
+
     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
     // an unhandled exception, it will just exit.
     Threads.setDaemonThreadRunning(this.leaseManager, getName() + 
".leaseChecker",
@@ -1994,6 +2001,8 @@ public class HRegionServer extends 
HBaseServerBase<RSRpcServices>
       new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + 
jitterValue),
         brokenStoreFileCleanerPeriod, this, conf, this);
 
+    this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
+
     registerConfigurationObservers();
   }
 
@@ -3550,6 +3559,11 @@ public class HRegionServer extends 
HBaseServerBase<RSRpcServices>
     return brokenStoreFileCleaner;
   }
 
+  @InterfaceAudience.Private
+  public RSMobFileCleanerChore getRSMobFileCleanerChore() {
+    return rsMobFileCleanerChore;
+  }
+
   RSSnapshotVerifier getRsSnapshotVerifier() {
     return rsSnapshotVerifier;
   }
@@ -3566,6 +3580,7 @@ public class HRegionServer extends 
HBaseServerBase<RSRpcServices>
     shutdownChore(fsUtilizationChore);
     shutdownChore(slowLogTableOpsChore);
     shutdownChore(brokenStoreFileCleaner);
+    shutdownChore(rsMobFileCleanerChore);
   }
 
   @Override
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 020009c7f5c..33c2910fcea 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2432,7 +2432,7 @@ public class HStore
    * {@link BrokenStoreFileCleaner} to prevent deleting the these files as 
they are not present in
    * SFT yet.
    */
-  Set<Path> getStoreFilesBeingWritten() {
+  public Set<Path> getStoreFilesBeingWritten() {
     return storeFileWriterCreationTrackers.stream().flatMap(t -> 
t.get().stream())
       .collect(Collectors.toSet());
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index e66a3e05a42..bd9ce6035ad 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -362,7 +362,7 @@ public abstract class Compactor<T extends CellSink> {
       writer = sinkFactory.createWriter(scanner, fd, dropCache, 
request.isMajor(),
         request.getWriterCreationTracker());
       finished = performCompaction(fd, scanner, writer, smallestReadPoint, 
cleanSeqId,
-        throughputController, request.isAllFiles(), request.getFiles().size(), 
progress);
+        throughputController, request, progress);
       if (!finished) {
         throw new InterruptedIOException("Aborting compaction of store " + 
store + " in region "
           + store.getRegionInfo().getRegionNameAsString() + " because it was 
interrupted.");
@@ -401,20 +401,19 @@ public abstract class Compactor<T extends CellSink> {
 
   /**
    * Performs the compaction.
-   * @param fd                  FileDetails of cell sink writer
-   * @param scanner             Where to read from.
-   * @param writer              Where to write to.
-   * @param smallestReadPoint   Smallest read point.
-   * @param cleanSeqId          When true, remove seqId(used to be mvcc) value 
which is &lt;=
-   *                            smallestReadPoint
-   * @param major               Is a major compaction.
-   * @param numofFilesToCompact the number of files to compact
-   * @param progress            Progress reporter.
+   * @param fd                FileDetails of cell sink writer
+   * @param scanner           Where to read from.
+   * @param writer            Where to write to.
+   * @param smallestReadPoint Smallest read point.
+   * @param cleanSeqId        When true, remove seqId(used to be mvcc) value 
which is &lt;=
+   *                          smallestReadPoint
+   * @param request           compaction request.
+   * @param progress          Progress reporter.
    * @return Whether compaction ended; false if it was interrupted for some 
reason.
    */
   protected boolean performCompaction(FileDetails fd, InternalScanner scanner, 
CellSink writer,
     long smallestReadPoint, boolean cleanSeqId, ThroughputController 
throughputController,
-    boolean major, int numofFilesToCompact, CompactionProgress progress) 
throws IOException {
+    CompactionRequestImpl request, CompactionProgress progress) throws 
IOException {
     assert writer instanceof ShipperListener;
     long bytesWrittenProgressForLog = 0;
     long bytesWrittenProgressForShippedCall = 0;
@@ -436,7 +435,7 @@ public abstract class Compactor<T extends CellSink> {
     throughputController.start(compactionName);
     KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? 
(KeyValueScanner) scanner : null;
     long shippedCallSizeLimit =
-      (long) numofFilesToCompact * 
this.store.getColumnFamilyDescriptor().getBlocksize();
+      (long) request.getFiles().size() * 
this.store.getColumnFamilyDescriptor().getBlocksize();
     try {
       do {
         hasMore = scanner.next(cells, scannerContext);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
index af45263a17c..96675fb69e5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.ShipperListener;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -92,8 +93,9 @@ public class FaultyMobStoreCompactor extends 
DefaultMobStoreCompactor {
   @Override
   protected boolean performCompaction(FileDetails fd, InternalScanner scanner, 
CellSink writer,
     long smallestReadPoint, boolean cleanSeqId, ThroughputController 
throughputController,
-    boolean major, int numofFilesToCompact, CompactionProgress progress) 
throws IOException {
+    CompactionRequestImpl request, CompactionProgress progress) throws 
IOException {
 
+    boolean major = request.isAllFiles();
     totalCompactions.incrementAndGet();
     if (major) {
       totalMajorCompactions.incrementAndGet();
@@ -145,7 +147,7 @@ public class FaultyMobStoreCompactor extends 
DefaultMobStoreCompactor {
     throughputController.start(compactionName);
     KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? 
(KeyValueScanner) scanner : null;
     long shippedCallSizeLimit =
-      (long) numofFilesToCompact * 
this.store.getColumnFamilyDescriptor().getBlocksize();
+      (long) request.getFiles().size() * 
this.store.getColumnFamilyDescriptor().getBlocksize();
 
     Cell mobCell = null;
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
index 2753ba33af4..9b51c22db19 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
@@ -18,17 +18,26 @@
 package org.apache.hadoop.hbase.mob;
 
 import java.io.IOException;
+import java.util.Date;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -112,4 +121,17 @@ public class MobTestUtil {
     scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
     return HBaseTestingUtil.countRows(table, scan);
   }
+
+  public static Path generateMOBFileForRegion(Configuration conf, TableName 
tableName,
+    ColumnFamilyDescriptor familyDescriptor, String regionName) throws 
IOException {
+    Date date = new Date();
+    String dateStr = MobUtils.formatDate(date);
+    FileSystem fs = FileSystem.get(conf);
+    Path cfMOBDir = MobUtils.getMobFamilyPath(conf, tableName, 
familyDescriptor.getNameAsString());
+    StoreFileWriter writer = MobUtils.createWriter(conf, fs, familyDescriptor, 
dateStr, cfMOBDir,
+      1000L, Compression.Algorithm.NONE, "startKey", CacheConfig.DISABLED, 
Encryption.Context.NONE,
+      false, "");
+    writer.close();
+    return writer.getPath();
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
index 70e7bbf4cb4..6b7ec952d05 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.mob;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -31,17 +33,21 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import 
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 @Category(LargeTests.class)
 public class TestDefaultMobStoreFlusher {
 
@@ -61,19 +67,35 @@ public class TestDefaultMobStoreFlusher {
   @Rule
   public TestName name = new TestName();
 
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
+  protected Boolean useFileBasedSFT;
+
+  public TestDefaultMobStoreFlusher(Boolean useFileBasedSFT) {
+    this.useFileBasedSFT = useFileBasedSFT;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Boolean> data() {
+    Boolean[] data = { false, true };
+    return Arrays.asList(data);
+  }
+
+  @Before
+  public void setUpBefore() throws Exception {
+    if (useFileBasedSFT) {
+      TEST_UTIL.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL,
+        
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
+    }
     TEST_UTIL.startMiniCluster(1);
   }
 
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
+  @After
+  public void tearDownAfter() throws Exception {
     TEST_UTIL.shutdownMiniCluster();
   }
 
   @Test
   public void testFlushNonMobFile() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = 
TableName.valueOf(TestMobUtils.getTableName(name));
     TableDescriptor tableDescriptor = 
TableDescriptorBuilder.newBuilder(tableName)
       
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build())
       .build();
@@ -82,7 +104,7 @@ public class TestDefaultMobStoreFlusher {
 
   @Test
   public void testFlushMobFile() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = 
TableName.valueOf(TestMobUtils.getTableName(name));
     TableDescriptor tableDescriptor = 
TableDescriptorBuilder.newBuilder(tableName)
       
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setMobEnabled(true)
         .setMobThreshold(3L).setMaxVersions(4).build())
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
index 5f3cc629959..1c586bbd10c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
@@ -17,10 +17,8 @@
  */
 package org.apache.hadoop.hbase.mob;
 
-import java.io.IOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
 
@@ -41,12 +39,13 @@ public class TestMobCompactionOptMode extends 
TestMobCompactionWithDefaults {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestMobCompactionOptMode.class);
 
-  @BeforeClass
-  public static void configureOptimizedCompaction() throws 
InterruptedException, IOException {
-    HTU.shutdownMiniHBaseCluster();
+  public TestMobCompactionOptMode(Boolean useFileBasedSFT) {
+    super(useFileBasedSFT);
+  }
+
+  protected void additonalConfigSetup() {
     conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, 
MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
     conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
-    HTU.startMiniHBaseCluster();
   }
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
index 7b6b44d0e31..dfd6435d364 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
@@ -23,9 +23,10 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
  * time larger than minimum age to archive 10. Runs Mob cleaner chore 11 
Verifies that number of MOB
  * files in a mob directory is 20. 12 Runs scanner and checks all 3 * 1000 
rows.
  */
+@RunWith(Parameterized.class)
 @Category(LargeTests.class)
 public class TestMobCompactionOptRegionBatchMode extends 
TestMobCompactionWithDefaults {
   private static final Logger LOG =
@@ -50,20 +52,20 @@ public class TestMobCompactionOptRegionBatchMode extends 
TestMobCompactionWithDe
   private static final int batchSize = 7;
   private MobFileCompactionChore compactionChore;
 
+  public TestMobCompactionOptRegionBatchMode(Boolean useFileBasedSFT) {
+    super(useFileBasedSFT);
+  }
+
   @Before
   public void setUp() throws Exception {
     super.setUp();
     compactionChore = new MobFileCompactionChore(conf, batchSize);
   }
 
-  @BeforeClass
-  public static void configureOptimizedCompactionAndBatches()
-    throws InterruptedException, IOException {
-    HTU.shutdownMiniHBaseCluster();
+  protected void additonalConfigSetup() {
     conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, 
batchSize);
     conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, 
MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
     conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
-    HTU.startMiniHBaseCluster();
   }
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
index 3d6eaa0a25a..5e2806bdee2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
@@ -23,9 +23,10 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
  * to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB files 
in a mob directory is
  * 20. 12 Runs scanner and checks all 3 * 1000 rows.
  */
+@RunWith(Parameterized.class)
 @Category(LargeTests.class)
 public class TestMobCompactionRegularRegionBatchMode extends 
TestMobCompactionWithDefaults {
   private static final Logger LOG =
@@ -50,17 +52,18 @@ public class TestMobCompactionRegularRegionBatchMode 
extends TestMobCompactionWi
   private static final int batchSize = 7;
   private MobFileCompactionChore compactionChore;
 
+  public TestMobCompactionRegularRegionBatchMode(Boolean useFileBasedSFT) {
+    super(useFileBasedSFT);
+  }
+
   @Before
   public void setUp() throws Exception {
     super.setUp();
     compactionChore = new MobFileCompactionChore(conf, batchSize);
   }
 
-  @BeforeClass
-  public static void configureCompactionBatches() throws InterruptedException, 
IOException {
-    HTU.shutdownMiniHBaseCluster();
+  protected void additonalConfigSetup() {
     conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, 
batchSize);
-    HTU.startMiniHBaseCluster();
   }
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
index 4a1b3f1b8b3..69ba4ea24b2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
@@ -31,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -41,18 +43,19 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
+import 
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.RegionSplitter;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,6 +69,7 @@ import org.slf4j.LoggerFactory;
  * Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob 
directory is 20. 12 Runs
  * scanner and checks all 3 * 1000 rows.
  */
+@RunWith(Parameterized.class)
 @Category(LargeTests.class)
 public class TestMobCompactionWithDefaults {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestMobCompactionWithDefaults.class);
@@ -73,7 +77,7 @@ public class TestMobCompactionWithDefaults {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestMobCompactionWithDefaults.class);
 
-  protected static HBaseTestingUtil HTU;
+  protected HBaseTestingUtil HTU;
   protected static Configuration conf;
   protected static long minAgeToArchive = 10000;
 
@@ -95,8 +99,19 @@ public class TestMobCompactionWithDefaults {
 
   protected MobFileCleanerChore cleanerChore;
 
-  @BeforeClass
-  public static void htuStart() throws Exception {
+  protected Boolean useFileBasedSFT;
+
+  public TestMobCompactionWithDefaults(Boolean useFileBasedSFT) {
+    this.useFileBasedSFT = useFileBasedSFT;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Boolean> data() {
+    Boolean[] data = { false, true };
+    return Arrays.asList(data);
+  }
+
+  protected void htuStart() throws Exception {
     HTU = new HBaseTestingUtil();
     conf = HTU.getConfiguration();
     conf.setInt("hfile.format.version", 3);
@@ -109,21 +124,25 @@ public class TestMobCompactionWithDefaults {
     // Set compacted file discharger interval to a half minAgeToArchive
     conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive 
/ 2);
     conf.setBoolean("hbase.regionserver.compaction.enabled", false);
+    if (useFileBasedSFT) {
+      conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
+        
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
+    }
+    additonalConfigSetup();
     HTU.startMiniCluster();
   }
 
-  @AfterClass
-  public static void htuStop() throws Exception {
-    HTU.shutdownMiniCluster();
+  protected void additonalConfigSetup() {
   }
 
   @Before
   public void setUp() throws Exception {
+    htuStart();
     admin = HTU.getAdmin();
     cleanerChore = new MobFileCleanerChore();
     familyDescriptor = 
ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true)
       .setMobThreshold(mobLen).setMaxVersions(1).build();
-    tableDescriptor = HTU.createModifyableTableDescriptor(test.getMethodName())
+    tableDescriptor = 
HTU.createModifyableTableDescriptor(TestMobUtils.getTableName(test))
       .setColumnFamily(familyDescriptor).build();
     RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
     byte[][] splitKeys = splitAlgo.split(numRegions);
@@ -152,6 +171,7 @@ public class TestMobCompactionWithDefaults {
   public void tearDown() throws Exception {
     admin.disableTable(tableDescriptor.getTableName());
     admin.deleteTable(tableDescriptor.getTableName());
+    HTU.shutdownMiniCluster();
   }
 
   @Test
@@ -167,12 +187,12 @@ public class TestMobCompactionWithDefaults {
 
   @Test
   public void testMobFileCompactionAfterSnapshotClone() throws 
InterruptedException, IOException {
-    final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
+    final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) 
+ "-clone");
     LOG.info("MOB compaction of cloned snapshot, " + description() + " 
started");
     loadAndFlushThreeTimes(rows, table, famStr);
     LOG.debug("Taking snapshot and cloning table {}", table);
-    admin.snapshot(test.getMethodName(), table);
-    admin.cloneSnapshot(test.getMethodName(), clone);
+    admin.snapshot(TestMobUtils.getTableName(test), table);
+    admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
     assertEquals("Should have 3 hlinks per region in MOB area from snapshot 
clone", 3 * numRegions,
       getNumberOfMobFiles(clone, famStr));
     mobCompact(admin.getDescriptor(clone), familyDescriptor);
@@ -185,12 +205,12 @@ public class TestMobCompactionWithDefaults {
   @Test
   public void testMobFileCompactionAfterSnapshotCloneAndFlush()
     throws InterruptedException, IOException {
-    final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
+    final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) 
+ "-clone");
     LOG.info("MOB compaction of cloned snapshot after flush, " + description() 
+ " started");
     loadAndFlushThreeTimes(rows, table, famStr);
     LOG.debug("Taking snapshot and cloning table {}", table);
-    admin.snapshot(test.getMethodName(), table);
-    admin.cloneSnapshot(test.getMethodName(), clone);
+    admin.snapshot(TestMobUtils.getTableName(test), table);
+    admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
     assertEquals("Should have 3 hlinks per region in MOB area from snapshot 
clone", 3 * numRegions,
       getNumberOfMobFiles(clone, famStr));
     loadAndFlushThreeTimes(rows, clone, famStr);
@@ -269,8 +289,11 @@ public class TestMobCompactionWithDefaults {
 
     Thread.sleep(minAgeToArchive + 1000);
     LOG.info("Cleaning up MOB files");
-    // Cleanup again
-    cleanerChore.cleanupObsoleteMobFiles(conf, table);
+
+    // run cleaner chore on each RS
+    for (ServerName sn : admin.getRegionServers()) {
+      
HTU.getMiniHBaseCluster().getRegionServer(sn).getRSMobFileCleanerChore().chore();
+    }
 
     assertEquals("After cleaning, we should have 1 MOB file per region based 
on size.", numRegions,
       getNumberOfMobFiles(table, family));
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
index f950c18dd18..bdc3cce13e4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.mob;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -166,14 +167,38 @@ public class TestMobFileCleanerChore {
 
     Thread.sleep(minAgeToArchive + 1000);
     LOG.info("Cleaning up MOB files");
-    // Cleanup again
+    // Cleanup
     chore.cleanupObsoleteMobFiles(conf, table.getName());
 
+    // verify that nothing have happened
     num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
-    assertEquals(1, num);
+    assertEquals(4, num);
 
     long scanned = scanTable();
     assertEquals(30, scanned);
+
+    // add a MOB file to with a name refering to a non-existing region
+    Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, 
table.getName(),
+      familyDescriptor, "nonExistentRegion");
+    num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
+    assertEquals(5, num);
+
+    LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
+
+    Thread.sleep(minAgeToArchive + 1000);
+    LOG.info("Cleaning up MOB files");
+    chore.cleanupObsoleteMobFiles(conf, table.getName());
+
+    // check that the extra file got deleted
+    num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
+    assertEquals(4, num);
+
+    FileSystem fs = FileSystem.get(conf);
+    assertFalse(fs.exists(extraMOBFile));
+
+    scanned = scanTable();
+    assertEquals(30, scanned);
+
   }
 
   private long getNumberOfMobFiles(Configuration conf, TableName tableName, 
String family)
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java
index 9951bef4747..c2f2b3fd426 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java
@@ -26,11 +26,14 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -63,6 +66,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionAsTable;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import 
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
 import 
org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -76,12 +80,15 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Test mob store compaction
  */
+@RunWith(Parameterized.class)
 @Category(MediumTests.class)
 public class TestMobStoreCompaction {
 
@@ -106,15 +113,33 @@ public class TestMobStoreCompaction {
   private final byte[] STARTROW = Bytes.toBytes(START_KEY);
   private int compactionThreshold;
 
+  private Boolean useFileBasedSFT;
+
+  public TestMobStoreCompaction(Boolean useFileBasedSFT) {
+    this.useFileBasedSFT = useFileBasedSFT;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Boolean> data() {
+    Boolean[] data = { false, true };
+    return Arrays.asList(data);
+  }
+
   private void init(Configuration conf, long mobThreshold) throws Exception {
+    if (useFileBasedSFT) {
+      conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
+        
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
+    }
+
     this.conf = conf;
     this.mobCellThreshold = mobThreshold;
+
     HBaseTestingUtil UTIL = new HBaseTestingUtil(conf);
 
     compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
     familyDescriptor = 
ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).setMobEnabled(true)
       .setMobThreshold(mobThreshold).setMaxVersions(1).build();
-    tableDescriptor = 
UTIL.createModifyableTableDescriptor(name.getMethodName())
+    tableDescriptor = 
UTIL.createModifyableTableDescriptor(TestMobUtils.getTableName(name))
       .modifyColumnFamily(familyDescriptor).build();
 
     RegionInfo regionInfo = 
RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
@@ -223,7 +248,7 @@ public class TestMobStoreCompaction {
     Path basedir = new Path(hbaseRootDir, 
tableDescriptor.getTableName().getNameAsString());
     List<Pair<byte[], String>> hfiles = new ArrayList<>(1);
     for (int i = 0; i < compactionThreshold; i++) {
-      Path hpath = new Path(basedir, "hfile" + i);
+      Path hpath = new Path(basedir, UUID.randomUUID().toString().replace("-", 
""));
       hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
       createHFile(hpath, i, dummyData);
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java
index 247dfff77d1..32170e98b35 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
 import 
org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
@@ -89,4 +90,8 @@ public class TestMobUtils {
     assertTrue(testTable3Refs.contains("file3a"));
     assertTrue(testTable3Refs.contains("file3b"));
   }
+
+  public static String getTableName(TestName test) {
+    return test.getMethodName().replace("[", "-").replace("]", "");
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
similarity index 74%
copy from 
hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
copy to 
hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
index f950c18dd18..86cb3558f67 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
@@ -22,18 +22,22 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.CompactionState;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Table;
@@ -51,16 +55,16 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Mob file cleaner chore test. 1. Creates MOB table 2. Load MOB data and 
flushes it N times 3. Runs
- * major MOB compaction (N MOB files go to archive) 4. Verifies that number of 
MOB files in a mob
- * directory is N+1 5. Waits for a period of time larger than minimum age to 
archive 6. Runs Mob
- * cleaner chore 7 Verifies that number of MOB files in a mob directory is 1.
+ * major MOB compaction 4. Verifies that number of MOB files in a mob 
directory is N+1 5. Waits for
+ * a period of time larger than minimum age to archive 6. Runs Mob cleaner 
chore 7 Verifies that
+ * every old MOB file referenced from current RS was archived
  */
 @Category(MediumTests.class)
-public class TestMobFileCleanerChore {
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestMobFileCleanerChore.class);
+public class TestRSMobFileCleanerChore {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestRSMobFileCleanerChore.class);
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestMobFileCleanerChore.class);
+    HBaseClassTestRule.forClass(TestRSMobFileCleanerChore.class);
 
   private HBaseTestingUtil HTU;
 
@@ -76,10 +80,10 @@ public class TestMobFileCleanerChore {
   private ColumnFamilyDescriptor familyDescriptor;
   private Admin admin;
   private Table table = null;
-  private MobFileCleanerChore chore;
+  private RSMobFileCleanerChore chore;
   private long minAgeToArchive = 10000;
 
-  public TestMobFileCleanerChore() {
+  public TestRSMobFileCleanerChore() {
   }
 
   @Before
@@ -91,12 +95,11 @@ public class TestMobFileCleanerChore {
 
     HTU.startMiniCluster();
     admin = HTU.getAdmin();
-    chore = new MobFileCleanerChore();
     familyDescriptor = 
ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true)
       .setMobThreshold(mobLen).setMaxVersions(1).build();
     tableDescriptor = 
HTU.createModifyableTableDescriptor("testMobCompactTable")
       .setColumnFamily(familyDescriptor).build();
-    table = HTU.createTable(tableDescriptor, null);
+    table = HTU.createTable(tableDescriptor, Bytes.toByteArrays("1"));
   }
 
   private void initConf() {
@@ -145,12 +148,11 @@ public class TestMobFileCleanerChore {
 
   @Test
   public void testMobFileCleanerChore() throws InterruptedException, 
IOException {
-
     loadData(0, 10);
     loadData(10, 10);
-    loadData(20, 10);
+    // loadData(20, 10);
     long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
-    assertEquals(3, num);
+    assertEquals(2, num);
     // Major compact
     admin.majorCompact(tableDescriptor.getTableName(), fam);
     // wait until compaction is complete
@@ -159,21 +161,68 @@ public class TestMobFileCleanerChore {
     }
 
     num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
-    assertEquals(4, num);
+    assertEquals(3, num);
     // We have guarantee, that compcated file discharger will run during this 
pause
     // because it has interval less than this wait time
     LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
 
     Thread.sleep(minAgeToArchive + 1000);
     LOG.info("Cleaning up MOB files");
-    // Cleanup again
-    chore.cleanupObsoleteMobFiles(conf, table.getName());
+
+    ServerName serverUsed = null;
+    List<RegionInfo> serverRegions = null;
+    for (ServerName sn : admin.getRegionServers()) {
+      serverRegions = admin.getRegions(sn);
+      if (serverRegions != null && serverRegions.size() > 0) {
+        // filtering out non test table regions
+        serverRegions = serverRegions.stream().filter(r -> r.getTable() == 
table.getName())
+          .collect(Collectors.toList());
+        // if such one is found use this rs
+        if (serverRegions.size() > 0) {
+          serverUsed = sn;
+        }
+        break;
+      }
+    }
+
+    chore = 
HTU.getMiniHBaseCluster().getRegionServer(serverUsed).getRSMobFileCleanerChore();
+
+    chore.chore();
 
     num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
-    assertEquals(1, num);
+    assertEquals(3 - serverRegions.size(), num);
 
     long scanned = scanTable();
-    assertEquals(30, scanned);
+    assertEquals(20, scanned);
+
+    // creating a MOB file not referenced from the current RS
+    Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, 
table.getName(),
+      familyDescriptor, "nonExistentRegion");
+
+    // verifying the new MOBfile is added
+    num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
+    assertEquals(4 - serverRegions.size(), num);
+
+    FileSystem fs = FileSystem.get(conf);
+    assertTrue(fs.exists(extraMOBFile));
+
+    LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
+
+    Thread.sleep(minAgeToArchive + 1000);
+    LOG.info("Cleaning up MOB files");
+
+    // running chore again
+    chore.chore();
+
+    // the chore should only archive old MOB files that were referenced from 
the current RS
+    // the unrelated MOB file is still there
+    num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
+    assertEquals(4 - serverRegions.size(), num);
+
+    assertTrue(fs.exists(extraMOBFile));
+
+    scanned = scanTable();
+    assertEquals(20, scanned);
   }
 
   private long getNumberOfMobFiles(Configuration conf, TableName tableName, 
String family)

Reply via email to