This is an automated email from the ASF dual-hosted git repository.

psomogyi pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit d073404706bf2c796d78ff6f7157172558118b7a
Author: BukrosSzabolcs <[email protected]>
AuthorDate: Tue Jun 21 11:18:55 2022 +0200

    HBASE-26969:Eliminate MOB renames when SFT is enabled
    
    Signed-off-by: Wellington Chevreuil <[email protected]>
    Signed-off-by: Peter Somogyi <[email protected]>
    
    closes #4712
---
 .../hadoop/hbase/mob/DefaultMobStoreCompactor.java |  29 ++-
 .../hadoop/hbase/mob/DefaultMobStoreFlusher.java   |  16 +-
 .../hadoop/hbase/mob/MobFileCleanerChore.java      |  44 ++--
 .../java/org/apache/hadoop/hbase/mob/MobUtils.java |  31 ++-
 .../hadoop/hbase/mob/RSMobFileCleanerChore.java    | 272 +++++++++++++++++++++
 .../hadoop/hbase/regionserver/HMobStore.java       |  39 ++-
 .../hadoop/hbase/regionserver/HRegionServer.java   |  15 ++
 .../apache/hadoop/hbase/regionserver/HStore.java   |   2 +-
 .../hbase/regionserver/compactions/Compactor.java  |  23 +-
 .../hadoop/hbase/mob/FaultyMobStoreCompactor.java  |   6 +-
 .../org/apache/hadoop/hbase/mob/MobTestUtil.java   |  22 ++
 .../hbase/mob/TestDefaultMobStoreFlusher.java      |  68 ++++--
 .../hadoop/hbase/mob/TestMobCompactionBase.java    | 218 -----------------
 .../hadoop/hbase/mob/TestMobCompactionOptMode.java |  11 +-
 .../mob/TestMobCompactionOptRegionBatchMode.java   |  14 +-
 .../hbase/mob/TestMobCompactionRegularMode.java    |  68 ------
 .../TestMobCompactionRegularRegionBatchMode.java   |  13 +-
 .../hbase/mob/TestMobCompactionWithDefaults.java   |  55 +++--
 .../hadoop/hbase/mob/TestMobFileCleanerChore.java  |  33 ++-
 .../hadoop/hbase/mob/TestMobStoreCompaction.java   |  28 ++-
 .../org/apache/hadoop/hbase/mob/TestMobUtils.java  |   5 +
 ...erChore.java => TestRSMobFileCleanerChore.java} | 113 ++++++---
 22 files changed, 692 insertions(+), 433 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index e93d87237cc..e46eb0f6209 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -294,20 +294,20 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
    * @param cleanSeqId           When true, remove seqId(used to be mvcc) 
value which is <=
    *                             smallestReadPoint
    * @param throughputController The compaction throughput controller.
-   * @param major                Is a major compaction.
-   * @param numofFilesToCompact  the number of files to compact
+   * @param request              compaction request.
    * @param progress             Progress reporter.
    * @return Whether compaction ended; false if it was interrupted for any 
reason.
    */
   @Override
   protected boolean performCompaction(FileDetails fd, InternalScanner scanner, 
CellSink writer,
     long smallestReadPoint, boolean cleanSeqId, ThroughputController 
throughputController,
-    boolean major, int numofFilesToCompact, CompactionProgress progress) 
throws IOException {
+    CompactionRequestImpl request, CompactionProgress progress) throws 
IOException {
     long bytesWrittenProgressForLog = 0;
     long bytesWrittenProgressForShippedCall = 0;
     // Clear old mob references
     mobRefSet.get().clear();
     boolean isUserRequest = userRequest.get();
+    boolean major = request.isAllFiles();
     boolean compactMOBs = major && isUserRequest;
     boolean discardMobMiss = 
conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY,
       MobConstants.DEFAULT_MOB_DISCARD_MISS);
@@ -351,12 +351,12 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
     throughputController.start(compactionName);
     KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? 
(KeyValueScanner) scanner : null;
     long shippedCallSizeLimit =
-      (long) numofFilesToCompact * 
this.store.getColumnFamilyDescriptor().getBlocksize();
+      (long) request.getFiles().size() * 
this.store.getColumnFamilyDescriptor().getBlocksize();
 
     Cell mobCell = null;
     try {
 
-      mobFileWriter = newMobWriter(fd, major);
+      mobFileWriter = newMobWriter(fd, major, 
request.getWriterCreationTracker());
       fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
 
       do {
@@ -426,7 +426,7 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
                       LOG.debug("Closing output MOB File, length={} file={}, 
store={}", len,
                         mobFileWriter.getPath().getName(), getStoreInfo());
                       commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, 
mobCells, major);
-                      mobFileWriter = newMobWriter(fd, major);
+                      mobFileWriter = newMobWriter(fd, major, 
request.getWriterCreationTracker());
                       fileName = 
Bytes.toBytes(mobFileWriter.getPath().getName());
                       mobCells = 0;
                     }
@@ -470,7 +470,7 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
                   long len = mobFileWriter.getPos();
                   if (len > maxMobFileSize) {
                     commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, 
mobCells, major);
-                    mobFileWriter = newMobWriter(fd, major);
+                    mobFileWriter = newMobWriter(fd, major, 
request.getWriterCreationTracker());
                     fileName = 
Bytes.toBytes(mobFileWriter.getPath().getName());
                     mobCells = 0;
                   }
@@ -522,7 +522,7 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
               long len = mobFileWriter.getPos();
               if (len > maxMobFileSize) {
                 commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, 
major);
-                mobFileWriter = newMobWriter(fd, major);
+                mobFileWriter = newMobWriter(fd, major, 
request.getWriterCreationTracker());
                 fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
                 mobCells = 0;
               }
@@ -608,11 +608,16 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
     }
   }
 
-  private StoreFileWriter newMobWriter(FileDetails fd, boolean major) throws 
IOException {
+  private StoreFileWriter newMobWriter(FileDetails fd, boolean major,
+    Consumer<Path> writerCreationTracker) throws IOException {
     try {
-      StoreFileWriter mobFileWriter = mobStore.createWriterInTmp(new 
Date(fd.latestPutTs),
-        fd.maxKeyCount, major ? majorCompactionCompression : 
minorCompactionCompression,
-        store.getRegionInfo().getStartKey(), true);
+      StoreFileWriter mobFileWriter = 
mobStore.getStoreEngine().requireWritingToTmpDirFirst()
+        ? mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
+          major ? majorCompactionCompression : minorCompactionCompression,
+          store.getRegionInfo().getStartKey(), true)
+        : mobStore.createWriter(new Date(fd.latestPutTs), fd.maxKeyCount,
+          major ? majorCompactionCompression : minorCompactionCompression,
+          store.getRegionInfo().getStartKey(), true, writerCreationTracker);
       LOG.debug("New MOB writer created={} store={}", 
mobFileWriter.getPath().getName(),
         getStoreInfo());
       // Add reference we get for compact MOB
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index a7f2ecdf242..6df17e58e22 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -127,7 +127,8 @@ public class DefaultMobStoreFlusher extends 
DefaultStoreFlusher {
         try {
           // It's a mob store, flush the cells in a mob way. This is the 
difference of flushing
           // between a normal and a mob store.
-          performMobFlush(snapshot, cacheFlushId, scanner, writer, status, 
throughputController);
+          performMobFlush(snapshot, cacheFlushId, scanner, writer, status, 
throughputController,
+            writerCreationTracker);
         } catch (IOException ioe) {
           e = ioe;
           // throw the exception out
@@ -171,16 +172,21 @@ public class DefaultMobStoreFlusher extends 
DefaultStoreFlusher {
    */
   protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
     InternalScanner scanner, StoreFileWriter writer, MonitoredTask status,
-    ThroughputController throughputController) throws IOException {
+    ThroughputController throughputController, Consumer<Path> 
writerCreationTracker)
+    throws IOException {
     StoreFileWriter mobFileWriter = null;
     int compactionKVMax =
       conf.getInt(HConstants.COMPACTION_KV_MAX, 
HConstants.COMPACTION_KV_MAX_DEFAULT);
     long mobCount = 0;
     long mobSize = 0;
     long time = snapshot.getTimeRangeTracker().getMax();
-    mobFileWriter = mobStore.createWriterInTmp(new Date(time), 
snapshot.getCellsCount(),
-      store.getColumnFamilyDescriptor().getCompressionType(), 
store.getRegionInfo().getStartKey(),
-      false);
+    mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst()
+      ? mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
+        store.getColumnFamilyDescriptor().getCompressionType(), 
store.getRegionInfo().getStartKey(),
+        false)
+      : mobStore.createWriter(new Date(time), snapshot.getCellsCount(),
+        store.getColumnFamilyDescriptor().getCompressionType(), 
store.getRegionInfo().getStartKey(),
+        false, writerCreationTracker);
     // the target path is {tableName}/.mob/{cfName}/mobFiles
     // the relative path is mobFiles
     byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
index e2c1f8961de..eb2201417b0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
@@ -168,14 +168,15 @@ public class MobFileCleanerChore extends ScheduledChore {
           maxCreationTimeToArchive, table);
       }
 
+      FileSystem fs = FileSystem.get(conf);
+      Set<String> regionNames = new HashSet<>();
       Path rootDir = CommonFSUtils.getRootDir(conf);
       Path tableDir = CommonFSUtils.getTableDir(rootDir, table);
-      // How safe is this call?
-      List<Path> regionDirs = FSUtils.getRegionDirs(FileSystem.get(conf), 
tableDir);
+      List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
 
       Set<String> allActiveMobFileName = new HashSet<String>();
-      FileSystem fs = FileSystem.get(conf);
       for (Path regionPath : regionDirs) {
+        regionNames.add(regionPath.getName());
         for (ColumnFamilyDescriptor hcd : list) {
           String family = hcd.getNameAsString();
           Path storePath = new Path(regionPath, family);
@@ -203,13 +204,26 @@ public class MobFileCleanerChore extends ScheduledChore {
               for (Path pp : storeFiles) {
                 currentPath = pp;
                 LOG.trace("Store file: {}", pp);
-                HStoreFile sf =
-                  new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, 
BloomType.NONE, true);
-                sf.initReader();
-                byte[] mobRefData = 
sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
-                byte[] bulkloadMarkerData = 
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
-                // close store file to avoid memory leaks
-                sf.closeStoreFile(true);
+                HStoreFile sf = null;
+                byte[] mobRefData = null;
+                byte[] bulkloadMarkerData = null;
+                try {
+                  sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, 
BloomType.NONE, true);
+                  sf.initReader();
+                  mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
+                  bulkloadMarkerData = 
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
+                  // close store file to avoid memory leaks
+                  sf.closeStoreFile(true);
+                } catch (IOException ex) {
+                  // When FileBased SFT is active the store dir can contain 
corrupted or incomplete
+                  // files. So read errors are expected. We just skip these 
files.
+                  if (ex instanceof FileNotFoundException) {
+                    throw ex;
+                  }
+                  LOG.debug("Failed to get mob data from file: {} due to 
error.", pp.toString(),
+                    ex);
+                  continue;
+                }
                 if (mobRefData == null) {
                   if (bulkloadMarkerData == null) {
                     LOG.warn("Found old store file with no MOB_FILE_REFS: {} - 
"
@@ -264,9 +278,11 @@ public class MobFileCleanerChore extends ScheduledChore {
         while (rit.hasNext()) {
           LocatedFileStatus lfs = rit.next();
           Path p = lfs.getPath();
-          if (!allActiveMobFileName.contains(p.getName())) {
-            // MOB is not in a list of active references, but it can be too
-            // fresh, skip it in this case
+          String[] mobParts = p.getName().split("_");
+          String regionName = mobParts[mobParts.length - 1];
+
+          if (!regionNames.contains(regionName)) {
+            // MOB belonged to a region no longer hosted
             long creationTime = fs.getFileStatus(p).getModificationTime();
             if (creationTime < maxCreationTimeToArchive) {
               LOG.trace("Archiving MOB file {} creation time={}", p,
@@ -277,7 +293,7 @@ public class MobFileCleanerChore extends ScheduledChore {
                 fs.getFileStatus(p).getModificationTime());
             }
           } else {
-            LOG.trace("Keeping active MOB file: {}", p);
+            LOG.trace("Keeping MOB file with existing region: {}", p);
           }
         }
         LOG.info(" MOB Cleaner found {} files to archive for table={} 
family={}", toArchive.size(),
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index a409c9f42c4..aab589cc13f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -33,6 +33,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -584,6 +585,33 @@ public final class MobUtils {
     CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType 
checksumType,
     int bytesPerChecksum, int blocksize, BloomType bloomType, boolean 
isCompaction)
     throws IOException {
+    return createWriter(conf, fs, family, path, maxKeyCount, compression, 
cacheConfig,
+      cryptoContext, checksumType, bytesPerChecksum, blocksize, bloomType, 
isCompaction, null);
+  }
+
+  /**
+   * Creates a writer for the mob file in temp directory.
+   * @param conf                  The current configuration.
+   * @param fs                    The current file system.
+   * @param family                The descriptor of the current column family.
+   * @param path                  The path for a temp directory.
+   * @param maxKeyCount           The key count.
+   * @param compression           The compression algorithm.
+   * @param cacheConfig           The current cache config.
+   * @param cryptoContext         The encryption context.
+   * @param checksumType          The checksum type.
+   * @param bytesPerChecksum      The bytes per checksum.
+   * @param blocksize             The HFile block size.
+   * @param bloomType             The bloom filter type.
+   * @param isCompaction          If the writer is used in compaction.
+   * @param writerCreationTracker to track the current writer in the store
+   * @return The writer for the mob file.
+   */
+  public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
+    ColumnFamilyDescriptor family, Path path, long maxKeyCount, 
Compression.Algorithm compression,
+    CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType 
checksumType,
+    int bytesPerChecksum, int blocksize, BloomType bloomType, boolean 
isCompaction,
+    Consumer<Path> writerCreationTracker) throws IOException {
     if (compression == null) {
       compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
     }
@@ -602,7 +630,8 @@ public final class MobUtils {
       .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
 
     StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, 
fs).withFilePath(path)
-      
.withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
+      
.withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext)
+      .withWriterCreationTracker(writerCreationTracker).build();
     return w;
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
new file mode 100644
index 00000000000..06e34988733
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
+
+/**
+ * The class RSMobFileCleanerChore for running cleaner regularly to remove the 
obsolete (files which
+ * have no active references to) mob files that were referenced from the 
current RS.
+ */
[email protected]
+public class RSMobFileCleanerChore extends ScheduledChore {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RSMobFileCleanerChore.class);
+  private final HRegionServer rs;
+
+  public RSMobFileCleanerChore(HRegionServer rs) {
+    super(rs.getServerName() + "-MobFileCleanerChore", rs,
+      rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
+        MobConstants.DEFAULT_MOB_CLEANER_PERIOD),
+      Math.round(rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
+        MobConstants.DEFAULT_MOB_CLEANER_PERIOD)
+        * ((ThreadLocalRandom.current().nextDouble() + 0.5D))),
+      TimeUnit.SECONDS);
+    // to prevent a load spike on the fs the initial delay is modified by +/- 
50%
+    this.rs = rs;
+  }
+
+  public RSMobFileCleanerChore() {
+    this.rs = null;
+  }
+
+  @Override
+  protected void chore() {
+
+    long minAgeToArchive = 
rs.getConfiguration().getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY,
+      MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
+    // We check only those MOB files, which creation time is less
+    // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
+    // gives us full confidence that all corresponding store files will
+    // exist at the time cleaning procedure begins and will be examined.
+    // So, if MOB file creation time is greater than this maxTimeToArchive,
+    // this will be skipped and won't be archived.
+    long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - 
minAgeToArchive;
+
+    TableDescriptors htds = rs.getTableDescriptors();
+    try {
+      FileSystem fs = FileSystem.get(rs.getConfiguration());
+
+      Map<String, TableDescriptor> map = null;
+      try {
+        map = htds.getAll();
+      } catch (IOException e) {
+        LOG.error("MobFileCleanerChore failed", e);
+        return;
+      }
+      Map<String, Map<String, List<String>>> referencedMOBs = new HashMap<>();
+      for (TableDescriptor htd : map.values()) {
+        // Now clean obsolete files for a table
+        LOG.info("Cleaning obsolete MOB files from table={}", 
htd.getTableName());
+        List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
+        List<HRegion> regions = rs.getRegions(htd.getTableName());
+        for (HRegion region : regions) {
+          for (ColumnFamilyDescriptor hcd : list) {
+            HStore store = region.getStore(hcd.getName());
+            Collection<HStoreFile> sfs = store.getStorefiles();
+            Set<String> regionMobs = new HashSet<String>();
+            Path currentPath = null;
+            try {
+              // collectinng referenced MOBs
+              for (HStoreFile sf : sfs) {
+                currentPath = sf.getPath();
+                sf.initReader();
+                byte[] mobRefData = 
sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
+                byte[] bulkloadMarkerData = 
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
+                // close store file to avoid memory leaks
+                sf.closeStoreFile(true);
+                if (mobRefData == null) {
+                  if (bulkloadMarkerData == null) {
+                    LOG.warn(
+                      "Found old store file with no MOB_FILE_REFS: {} - "
+                        + "can not proceed until all old files will be 
MOB-compacted.",
+                      currentPath);
+                    return;
+                  } else {
+                    LOG.debug("Skipping file without MOB references 
(bulkloaded file):{}",
+                      currentPath);
+                    continue;
+                  }
+                }
+                // file may or may not have MOB references, but was created by 
the distributed
+                // mob compaction code.
+                try {
+                  SetMultimap<TableName, String> mobs =
+                    MobUtils.deserializeMobFileRefs(mobRefData).build();
+                  LOG.debug("Found {} mob references for store={}", 
mobs.size(), sf);
+                  LOG.trace("Specific mob references found for store={} : {}", 
sf, mobs);
+                  regionMobs.addAll(mobs.values());
+                } catch (RuntimeException exception) {
+                  throw new IOException("failure getting mob references for 
hfile " + sf,
+                    exception);
+                }
+              }
+              // collecting files, MOB included currently being written
+              regionMobs.addAll(store.getStoreFilesBeingWritten().stream()
+                .map(path -> path.getName()).collect(Collectors.toList()));
+
+              referencedMOBs
+                .computeIfAbsent(hcd.getNameAsString(), cf -> new 
HashMap<String, List<String>>())
+                .computeIfAbsent(region.getRegionInfo().getEncodedName(), name 
-> new ArrayList<>())
+                .addAll(regionMobs);
+
+            } catch (FileNotFoundException e) {
+              LOG.warn(
+                "Missing file:{} Starting MOB cleaning cycle from the 
beginning" + " due to error",
+                currentPath, e);
+              regionMobs.clear();
+              continue;
+            } catch (IOException e) {
+              LOG.error("Failed to clean the obsolete mob files for table={}",
+                htd.getTableName().getNameAsString(), e);
+            }
+          }
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Found: {} active mob refs for table={}",
+            referencedMOBs.values().stream().map(inner -> inner.values())
+              .flatMap(lists -> lists.stream()).mapToInt(lists -> 
lists.size()).sum(),
+            htd.getTableName().getNameAsString());
+        }
+        if (LOG.isTraceEnabled()) {
+          referencedMOBs.values().stream().forEach(innerMap -> 
innerMap.values().stream()
+            .forEach(mobFileList -> mobFileList.stream().forEach(LOG::trace)));
+        }
+
+        // collect regions referencing MOB files belonging to the current rs
+        Set<String> regionsCovered = new HashSet<>();
+        referencedMOBs.values().stream()
+          .forEach(regionMap -> regionsCovered.addAll(regionMap.keySet()));
+
+        for (ColumnFamilyDescriptor hcd : list) {
+          List<Path> toArchive = new ArrayList<Path>();
+          String family = hcd.getNameAsString();
+          Path dir = MobUtils.getMobFamilyPath(rs.getConfiguration(), 
htd.getTableName(), family);
+          RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
+          while (rit.hasNext()) {
+            LocatedFileStatus lfs = rit.next();
+            Path p = lfs.getPath();
+            String[] mobParts = p.getName().split("_");
+            String regionName = mobParts[mobParts.length - 1];
+
+            // skip MOB files not belonging to a region assigned to the 
current rs
+            if (!regionsCovered.contains(regionName)) {
+              LOG.trace("MOB file does not belong to current rs: {}", p);
+              continue;
+            }
+
+            // check active or actively written mob files
+            Map<String, List<String>> cfMobs = 
referencedMOBs.get(hcd.getNameAsString());
+            if (
+              cfMobs != null && cfMobs.get(regionName) != null
+                && cfMobs.get(regionName).contains(p.getName())
+            ) {
+              LOG.trace("Keeping active MOB file: {}", p);
+              continue;
+            }
+
+            // MOB is not in a list of active references, but it can be too
+            // fresh, skip it in this case
+            long creationTime = fs.getFileStatus(p).getModificationTime();
+            if (creationTime < maxCreationTimeToArchive) {
+              LOG.trace("Archiving MOB file {} creation time={}", p,
+                (fs.getFileStatus(p).getModificationTime()));
+              toArchive.add(p);
+            } else {
+              LOG.trace("Skipping fresh file: {}. Creation time={}", p,
+                fs.getFileStatus(p).getModificationTime());
+            }
+
+          }
+          LOG.info(" MOB Cleaner found {} files to archive for table={} 
family={}",
+            toArchive.size(), htd.getTableName().getNameAsString(), family);
+          archiveMobFiles(rs.getConfiguration(), htd.getTableName(), 
family.getBytes(), toArchive);
+          LOG.info(" MOB Cleaner archived {} files, table={} family={}", 
toArchive.size(),
+            htd.getTableName().getNameAsString(), family);
+        }
+
+        LOG.info("Cleaning obsolete MOB files finished for table={}", 
htd.getTableName());
+
+      }
+    } catch (IOException e) {
+      LOG.error("MOB Cleaner failed when trying to access the file system", e);
+    }
+  }
+
+  /**
+   * Archives the mob files.
+   * @param conf       The current configuration.
+   * @param tableName  The table name.
+   * @param family     The name of the column family.
+   * @param storeFiles The files to be archived.
+   * @throws IOException exception
+   */
+  public void archiveMobFiles(Configuration conf, TableName tableName, byte[] 
family,
+    List<Path> storeFiles) throws IOException {
+
+    if (storeFiles.size() == 0) {
+      // nothing to remove
+      LOG.debug("Skipping archiving old MOB files - no files found for 
table={} cf={}", tableName,
+        Bytes.toString(family));
+      return;
+    }
+    Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), 
tableName);
+    FileSystem fs = storeFiles.get(0).getFileSystem(conf);
+
+    for (Path p : storeFiles) {
+      LOG.debug("MOB Cleaner is archiving: {}", p);
+      HFileArchiver.archiveStoreFile(conf, fs, 
MobUtils.getMobRegionInfo(tableName), mobTableDir,
+        family, p);
+    }
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index ac69eb8d324..5992c5941e9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -28,6 +28,7 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -184,7 +185,27 @@ public class HMobStore extends HStore {
     }
     Path path = getTempDir();
     return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, 
compression, startKey,
-      isCompaction);
+      isCompaction, null);
+  }
+
+  /**
+   * Creates the writer for the mob file in the mob family directory.
+   * @param date         The latest date of written cells.
+   * @param maxKeyCount  The key count.
+   * @param compression  The compression algorithm.
+   * @param startKey     The start key.
+   * @param isCompaction If the writer is used in compaction.
+   * @return The writer for the mob file. n
+   */
+  public StoreFileWriter createWriter(Date date, long maxKeyCount,
+    Compression.Algorithm compression, byte[] startKey, boolean isCompaction,
+    Consumer<Path> writerCreationTracker) throws IOException {
+    if (startKey == null) {
+      startKey = HConstants.EMPTY_START_ROW;
+    }
+    Path path = getPath();
+    return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, 
compression, startKey,
+      isCompaction, writerCreationTracker);
   }
 
   /**
@@ -198,11 +219,13 @@ public class HMobStore extends HStore {
    * @return The writer for the mob file. n
    */
   public StoreFileWriter createWriterInTmp(String date, Path basePath, long 
maxKeyCount,
-    Compression.Algorithm compression, byte[] startKey, boolean isCompaction) 
throws IOException {
+    Compression.Algorithm compression, byte[] startKey, boolean isCompaction,
+    Consumer<Path> writerCreationTracker) throws IOException {
     MobFileName mobFileName =
       MobFileName.create(startKey, date, 
UUID.randomUUID().toString().replaceAll("-", ""),
         getHRegion().getRegionInfo().getEncodedName());
-    return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, 
isCompaction);
+    return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, 
isCompaction,
+      writerCreationTracker);
   }
 
   /**
@@ -214,13 +237,15 @@ public class HMobStore extends HStore {
    * @param isCompaction If the writer is used in compaction.
    * @return The writer for the mob file. n
    */
+
   public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path 
basePath, long maxKeyCount,
-    Compression.Algorithm compression, boolean isCompaction) throws 
IOException {
+    Compression.Algorithm compression, boolean isCompaction, Consumer<Path> 
writerCreationTracker)
+    throws IOException {
     return MobUtils.createWriter(conf, getFileSystem(), 
getColumnFamilyDescriptor(),
       new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, 
getCacheConfig(),
       getStoreContext().getEncryptionContext(), 
StoreUtils.getChecksumType(conf),
       StoreUtils.getBytesPerChecksum(conf), getStoreContext().getBlockSize(), 
BloomType.NONE,
-      isCompaction);
+      isCompaction, writerCreationTracker);
   }
 
   /**
@@ -234,6 +259,10 @@ public class HMobStore extends HStore {
     }
     Path dstPath = new Path(targetPath, sourceFile.getName());
     validateMobFile(sourceFile);
+    if (sourceFile.equals(targetPath)) {
+      LOG.info("File is already in the destination dir: {}", sourceFile);
+      return;
+    }
     LOG.info(" FLUSH Renaming flushed file from {} to {}", sourceFile, 
dstPath);
     Path parent = dstPath.getParent();
     if (!getFileSystem().exists(parent)) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 5408685311e..6980003b6c1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -131,6 +131,7 @@ import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.LoadBalancer;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
 import org.apache.hadoop.hbase.mob.MobFileCache;
+import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
 import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
@@ -553,6 +554,8 @@ public class HRegionServer extends Thread
 
   private BrokenStoreFileCleaner brokenStoreFileCleaner;
 
+  private RSMobFileCleanerChore rsMobFileCleanerChore;
+
   @InterfaceAudience.Private
   CompactedHFilesDischarger compactedFileDischarger;
 
@@ -2203,6 +2206,10 @@ public class HRegionServer extends Thread
       choreService.scheduleChore(brokenStoreFileCleaner);
     }
 
+    if (this.rsMobFileCleanerChore != null) {
+      choreService.scheduleChore(rsMobFileCleanerChore);
+    }
+
     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
     // an unhandled exception, it will just exit.
     Threads.setDaemonThreadRunning(this.leaseManager, getName() + 
".leaseChecker",
@@ -2299,6 +2306,8 @@ public class HRegionServer extends Thread
       new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + 
jitterValue),
         brokenStoreFileCleanerPeriod, this, conf, this);
 
+    this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
+
     registerConfigurationObservers();
   }
 
@@ -2781,6 +2790,7 @@ public class HRegionServer extends Thread
       shutdownChore(storefileRefresher);
       shutdownChore(fsUtilizationChore);
       shutdownChore(slowLogTableOpsChore);
+      shutdownChore(rsMobFileCleanerChore);
       // cancel the remaining scheduled chores (in case we missed out any)
       // TODO: cancel will not cleanup the chores, so we need make sure we do 
not miss any
       choreService.shutdown();
@@ -4056,6 +4066,11 @@ public class HRegionServer extends Thread
     return brokenStoreFileCleaner;
   }
 
+  @InterfaceAudience.Private
+  public RSMobFileCleanerChore getRSMobFileCleanerChore() {
+    return rsMobFileCleanerChore;
+  }
+
   RSSnapshotVerifier getRsSnapshotVerifier() {
     return rsSnapshotVerifier;
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index b0dfa92336e..c698cfd8aba 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2421,7 +2421,7 @@ public class HStore
    * {@link BrokenStoreFileCleaner} to prevent deleting the these files as 
they are not present in
    * SFT yet.
    */
-  Set<Path> getStoreFilesBeingWritten() {
+  public Set<Path> getStoreFilesBeingWritten() {
     return storeFileWriterCreationTrackers.stream().flatMap(t -> 
t.get().stream())
       .collect(Collectors.toSet());
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 0de3eeb024d..566e708b06d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -361,7 +361,7 @@ public abstract class Compactor<T extends CellSink> {
       writer = sinkFactory.createWriter(scanner, fd, dropCache, 
request.isMajor(),
         request.getWriterCreationTracker());
       finished = performCompaction(fd, scanner, writer, smallestReadPoint, 
cleanSeqId,
-        throughputController, request.isAllFiles(), request.getFiles().size(), 
progress);
+        throughputController, request, progress);
       if (!finished) {
         throw new InterruptedIOException("Aborting compaction of store " + 
store + " in region "
           + store.getRegionInfo().getRegionNameAsString() + " because it was 
interrupted.");
@@ -400,20 +400,19 @@ public abstract class Compactor<T extends CellSink> {
 
   /**
    * Performs the compaction.
-   * @param fd                  FileDetails of cell sink writer
-   * @param scanner             Where to read from.
-   * @param writer              Where to write to.
-   * @param smallestReadPoint   Smallest read point.
-   * @param cleanSeqId          When true, remove seqId(used to be mvcc) value 
which is &lt;=
-   *                            smallestReadPoint
-   * @param major               Is a major compaction.
-   * @param numofFilesToCompact the number of files to compact
-   * @param progress            Progress reporter.
+   * @param fd                FileDetails of cell sink writer
+   * @param scanner           Where to read from.
+   * @param writer            Where to write to.
+   * @param smallestReadPoint Smallest read point.
+   * @param cleanSeqId        When true, remove seqId(used to be mvcc) value 
which is &lt;=
+   *                          smallestReadPoint
+   * @param request           compaction request.
+   * @param progress          Progress reporter.
    * @return Whether compaction ended; false if it was interrupted for some 
reason.
    */
   protected boolean performCompaction(FileDetails fd, InternalScanner scanner, 
CellSink writer,
     long smallestReadPoint, boolean cleanSeqId, ThroughputController 
throughputController,
-    boolean major, int numofFilesToCompact, CompactionProgress progress) 
throws IOException {
+    CompactionRequestImpl request, CompactionProgress progress) throws 
IOException {
     assert writer instanceof ShipperListener;
     long bytesWrittenProgressForLog = 0;
     long bytesWrittenProgressForShippedCall = 0;
@@ -435,7 +434,7 @@ public abstract class Compactor<T extends CellSink> {
     throughputController.start(compactionName);
     KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? 
(KeyValueScanner) scanner : null;
     long shippedCallSizeLimit =
-      (long) numofFilesToCompact * 
this.store.getColumnFamilyDescriptor().getBlocksize();
+      (long) request.getFiles().size() * 
this.store.getColumnFamilyDescriptor().getBlocksize();
     try {
       do {
         hasMore = scanner.next(cells, scannerContext);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
index 324b897d9fd..d37cc605f6b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.ShipperListener;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -91,8 +92,9 @@ public class FaultyMobStoreCompactor extends 
DefaultMobStoreCompactor {
   @Override
   protected boolean performCompaction(FileDetails fd, InternalScanner scanner, 
CellSink writer,
     long smallestReadPoint, boolean cleanSeqId, ThroughputController 
throughputController,
-    boolean major, int numofFilesToCompact, CompactionProgress progress) 
throws IOException {
+    CompactionRequestImpl request, CompactionProgress progress) throws 
IOException {
 
+    boolean major = request.isAllFiles();
     totalCompactions.incrementAndGet();
     if (major) {
       totalMajorCompactions.incrementAndGet();
@@ -143,7 +145,7 @@ public class FaultyMobStoreCompactor extends 
DefaultMobStoreCompactor {
     throughputController.start(compactionName);
     KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? 
(KeyValueScanner) scanner : null;
     long shippedCallSizeLimit =
-      (long) numofFilesToCompact * 
this.store.getColumnFamilyDescriptor().getBlocksize();
+      (long) request.getFiles().size() * 
this.store.getColumnFamilyDescriptor().getBlocksize();
 
     Cell mobCell = null;
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
index 54563e2eb90..2641ed227ae 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
@@ -18,17 +18,26 @@
 package org.apache.hadoop.hbase.mob;
 
 import java.io.IOException;
+import java.util.Date;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -112,4 +121,17 @@ public class MobTestUtil {
     scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
     return util.countRows(table, scan);
   }
+
+  public static Path generateMOBFileForRegion(Configuration conf, TableName 
tableName,
+    ColumnFamilyDescriptor familyDescriptor, String regionName) throws 
IOException {
+    Date date = new Date();
+    String dateStr = MobUtils.formatDate(date);
+    FileSystem fs = FileSystem.get(conf);
+    Path cfMOBDir = MobUtils.getMobFamilyPath(conf, tableName, 
familyDescriptor.getNameAsString());
+    StoreFileWriter writer = MobUtils.createWriter(conf, fs, familyDescriptor, 
dateStr, cfMOBDir,
+      1000L, Compression.Algorithm.NONE, "startKey", CacheConfig.DISABLED, 
Encryption.Context.NONE,
+      false, "");
+    writer.close();
+    return writer.getPath();
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
index a07a3b0ab2d..056afaf6fc4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
@@ -17,30 +17,37 @@
  */
 package org.apache.hadoop.hbase.mob;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import 
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 @Category(LargeTests.class)
 public class TestDefaultMobStoreFlusher {
 
@@ -60,41 +67,52 @@ public class TestDefaultMobStoreFlusher {
   @Rule
   public TestName name = new TestName();
 
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
+  protected Boolean useFileBasedSFT;
+
+  public TestDefaultMobStoreFlusher(Boolean useFileBasedSFT) {
+    this.useFileBasedSFT = useFileBasedSFT;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Boolean> data() {
+    Boolean[] data = { false, true };
+    return Arrays.asList(data);
+  }
+
+  @Before
+  public void setUpBefore() throws Exception {
+    if (useFileBasedSFT) {
+      TEST_UTIL.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL,
+        
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
+    }
     TEST_UTIL.startMiniCluster(1);
   }
 
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
+  @After
+  public void tearDownAfter() throws Exception {
     TEST_UTIL.shutdownMiniCluster();
   }
 
   @Test
   public void testFlushNonMobFile() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
-    HTableDescriptor desc = new HTableDescriptor(tableName);
-    HColumnDescriptor hcd = new HColumnDescriptor(family);
-    hcd.setMaxVersions(4);
-    desc.addFamily(hcd);
-
-    testFlushFile(desc);
+    final TableName tableName = 
TableName.valueOf(TestMobUtils.getTableName(name));
+    TableDescriptor tableDescriptor = 
TableDescriptorBuilder.newBuilder(tableName)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build())
+      .build();
+    testFlushFile(tableDescriptor);
   }
 
   @Test
   public void testFlushMobFile() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
-    HTableDescriptor desc = new HTableDescriptor(tableName);
-    HColumnDescriptor hcd = new HColumnDescriptor(family);
-    hcd.setMobEnabled(true);
-    hcd.setMobThreshold(3L);
-    hcd.setMaxVersions(4);
-    desc.addFamily(hcd);
-
-    testFlushFile(desc);
+    final TableName tableName = 
TableName.valueOf(TestMobUtils.getTableName(name));
+    TableDescriptor tableDescriptor = 
TableDescriptorBuilder.newBuilder(tableName)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setMobEnabled(true)
+        .setMobThreshold(3L).setMaxVersions(4).build())
+      .build();
+    testFlushFile(tableDescriptor);
   }
 
-  private void testFlushFile(HTableDescriptor htd) throws Exception {
+  private void testFlushFile(TableDescriptor htd) throws Exception {
     Table table = null;
     try {
       table = TEST_UTIL.createTable(htd, null);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionBase.java
deleted file mode 100644
index 62ca370197b..00000000000
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionBase.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mob;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Random;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.CompactionState;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.RegionSplitter;
-import org.junit.After;
-import org.junit.Before;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Mob file compaction base test. 1. Enables batch mode for regular MOB 
compaction, Sets batch size
- * to 7 regions. (Optional) 2. Disables periodic MOB compactions, sets minimum 
age to archive to 10
- * sec 3. Creates MOB table with 20 regions 4. Loads MOB data (randomized 
keys, 1000 rows), flushes
- * data. 5. Repeats 4. two more times 6. Verifies that we have 20 *3 = 60 mob 
files (equals to
- * number of regions x 3) 7. Runs major MOB compaction. 8. Verifies that 
number of MOB files in a
- * mob directory is 20 x4 = 80 9. Waits for a period of time larger than 
minimum age to archive 10.
- * Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob 
directory is 20. 12 Runs
- * scanner and checks all 3 * 1000 rows.
- */
-@SuppressWarnings("deprecation")
-public abstract class TestMobCompactionBase {
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestMobCompactionBase.class);
-
-  protected HBaseTestingUtility HTU;
-
-  protected final static String famStr = "f1";
-  protected final static byte[] fam = Bytes.toBytes(famStr);
-  protected final static byte[] qualifier = Bytes.toBytes("q1");
-  protected final static long mobLen = 10;
-  protected final static byte[] mobVal = Bytes
-    
.toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
-
-  protected Configuration conf;
-  protected HTableDescriptor hdt;
-  private HColumnDescriptor hcd;
-  protected Admin admin;
-  protected Table table = null;
-  protected long minAgeToArchive = 10000;
-  protected int numRegions = 20;
-  protected int rows = 1000;
-
-  protected MobFileCleanerChore cleanerChore;
-
-  public TestMobCompactionBase() {
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    HTU = new HBaseTestingUtility();
-    hdt = HTU.createTableDescriptor(getClass().getName());
-    conf = HTU.getConfiguration();
-
-    initConf();
-
-    HTU.startMiniCluster();
-    admin = HTU.getAdmin();
-    cleanerChore = new MobFileCleanerChore();
-    hcd = new HColumnDescriptor(fam);
-    hcd.setMobEnabled(true);
-    hcd.setMobThreshold(mobLen);
-    hcd.setMaxVersions(1);
-    hdt.addFamily(hcd);
-    RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
-    byte[][] splitKeys = splitAlgo.split(numRegions);
-    table = HTU.createTable(hdt, splitKeys);
-
-  }
-
-  protected void initConf() {
-
-    conf.setInt("hfile.format.version", 3);
-    // Disable automatic MOB compaction
-    conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
-    // Disable automatic MOB file cleaner chore
-    conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
-    // Set minimum age to archive to 10 sec
-    conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
-    // Set compacted file discharger interval to a half minAgeToArchive
-    conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive 
/ 2);
-  }
-
-  private void loadData(int num) {
-
-    Random r = new Random();
-    try {
-      LOG.info("Started loading {} rows", num);
-      for (int i = 0; i < num; i++) {
-        byte[] key = new byte[32];
-        r.nextBytes(key);
-        Put p = new Put(key);
-        p.addColumn(fam, qualifier, mobVal);
-        table.put(p);
-      }
-      admin.flush(table.getName());
-      LOG.info("Finished loading {} rows", num);
-    } catch (Exception e) {
-      LOG.error("MOB file compaction chore test FAILED", e);
-      fail("MOB file compaction chore test FAILED");
-    }
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    admin.disableTable(hdt.getTableName());
-    admin.deleteTable(hdt.getTableName());
-    HTU.shutdownMiniCluster();
-  }
-
-  public void baseTestMobFileCompaction() throws InterruptedException, 
IOException {
-
-    // Load and flush data 3 times
-    loadData(rows);
-    loadData(rows);
-    loadData(rows);
-    long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
-    assertEquals(numRegions * 3, num);
-    // Major MOB compact
-    mobCompact(admin, hdt, hcd);
-    // wait until compaction is complete
-    while (admin.getCompactionState(hdt.getTableName()) != 
CompactionState.NONE) {
-      Thread.sleep(100);
-    }
-
-    num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
-    assertEquals(numRegions * 4, num);
-    // We have guarantee, that compacted file discharger will run during this 
pause
-    // because it has interval less than this wait time
-    LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
-
-    Thread.sleep(minAgeToArchive + 1000);
-    LOG.info("Cleaning up MOB files");
-    // Cleanup again
-    cleanerChore.cleanupObsoleteMobFiles(conf, table.getName());
-
-    num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
-    assertEquals(numRegions, num);
-
-    long scanned = scanTable();
-    assertEquals(3 * rows, scanned);
-
-  }
-
-  protected abstract void mobCompact(Admin admin2, HTableDescriptor hdt2, 
HColumnDescriptor hcd2)
-    throws IOException, InterruptedException;
-
-  protected long getNumberOfMobFiles(Configuration conf, TableName tableName, 
String family)
-    throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
-    FileStatus[] stat = fs.listStatus(dir);
-    for (FileStatus st : stat) {
-      LOG.debug("MOB Directory content: {}", st.getPath());
-    }
-    LOG.debug("MOB Directory content total files: {}", stat.length);
-
-    return stat.length;
-  }
-
-  protected long scanTable() {
-    try {
-
-      Result result;
-      ResultScanner scanner = table.getScanner(fam);
-      long counter = 0;
-      while ((result = scanner.next()) != null) {
-        assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
-        counter++;
-      }
-      return counter;
-    } catch (Exception e) {
-      LOG.error("MOB file compaction test FAILED", e);
-      if (HTU != null) {
-        fail(e.getMessage());
-      } else {
-        System.exit(-1);
-      }
-    }
-    return 0;
-  }
-}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
index 47fcde9a233..816ea29b911 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
@@ -17,10 +17,8 @@
  */
 package org.apache.hadoop.hbase.mob;
 
-import java.io.IOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
 
@@ -40,12 +38,13 @@ public class TestMobCompactionOptMode extends 
TestMobCompactionWithDefaults {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestMobCompactionOptMode.class);
 
-  @BeforeClass
-  public static void configureOptimizedCompaction() throws 
InterruptedException, IOException {
-    HTU.shutdownMiniHBaseCluster();
+  public TestMobCompactionOptMode(Boolean useFileBasedSFT) {
+    super(useFileBasedSFT);
+  }
+
+  protected void additonalConfigSetup() {
     conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, 
MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
     conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
-    HTU.startMiniHBaseCluster();
   }
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
index 7b6b44d0e31..dfd6435d364 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
@@ -23,9 +23,10 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
  * time larger than minimum age to archive 10. Runs Mob cleaner chore 11 
Verifies that number of MOB
  * files in a mob directory is 20. 12 Runs scanner and checks all 3 * 1000 
rows.
  */
+@RunWith(Parameterized.class)
 @Category(LargeTests.class)
 public class TestMobCompactionOptRegionBatchMode extends 
TestMobCompactionWithDefaults {
   private static final Logger LOG =
@@ -50,20 +52,20 @@ public class TestMobCompactionOptRegionBatchMode extends 
TestMobCompactionWithDe
   private static final int batchSize = 7;
   private MobFileCompactionChore compactionChore;
 
+  public TestMobCompactionOptRegionBatchMode(Boolean useFileBasedSFT) {
+    super(useFileBasedSFT);
+  }
+
   @Before
   public void setUp() throws Exception {
     super.setUp();
     compactionChore = new MobFileCompactionChore(conf, batchSize);
   }
 
-  @BeforeClass
-  public static void configureOptimizedCompactionAndBatches()
-    throws InterruptedException, IOException {
-    HTU.shutdownMiniHBaseCluster();
+  protected void additonalConfigSetup() {
     conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, 
batchSize);
     conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, 
MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
     conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
-    HTU.startMiniHBaseCluster();
   }
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularMode.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularMode.java
deleted file mode 100644
index 92809ea5511..00000000000
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularMode.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mob;
-
-import java.io.IOException;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Mob file compaction chore in a regular non-batch mode test. 1. Uses default 
(non-batch) mode for
- * regular MOB compaction, 2. Disables periodic MOB compactions, sets minimum 
age to archive to 10
- * sec 3. Creates MOB table with 20 regions 4. Loads MOB data (randomized 
keys, 1000 rows), flushes
- * data. 5. Repeats 4. two more times 6. Verifies that we have 20 *3 = 60 mob 
files (equals to
- * number of regions x 3) 7. Runs major MOB compaction. 8. Verifies that 
number of MOB files in a
- * mob directory is 20 x4 = 80 9. Waits for a period of time larger than 
minimum age to archive 10.
- * Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob 
directory is 20. 12 Runs
- * scanner and checks all 3 * 1000 rows.
- */
-@SuppressWarnings("deprecation")
-@Category(LargeTests.class)
-public class TestMobCompactionRegularMode extends TestMobCompactionBase {
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestMobCompactionRegularMode.class);
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestMobCompactionRegularMode.class);
-
-  public TestMobCompactionRegularMode() {
-  }
-
-  @Test
-  public void testMobFileCompactionBatchMode() throws InterruptedException, 
IOException {
-    LOG.info("MOB compaction regular mode started");
-    baseTestMobFileCompaction();
-    LOG.info("MOB compaction regular mode finished OK");
-
-  }
-
-  @Override
-  protected void mobCompact(Admin admin, HTableDescriptor hdt, 
HColumnDescriptor hcd)
-    throws IOException, InterruptedException {
-    // Major compact MOB table
-    admin.majorCompact(hdt.getTableName(), hcd.getName());
-  }
-
-}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
index 3d6eaa0a25a..5e2806bdee2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
@@ -23,9 +23,10 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
  * to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB files 
in a mob directory is
  * 20. 12 Runs scanner and checks all 3 * 1000 rows.
  */
+@RunWith(Parameterized.class)
 @Category(LargeTests.class)
 public class TestMobCompactionRegularRegionBatchMode extends 
TestMobCompactionWithDefaults {
   private static final Logger LOG =
@@ -50,17 +52,18 @@ public class TestMobCompactionRegularRegionBatchMode 
extends TestMobCompactionWi
   private static final int batchSize = 7;
   private MobFileCompactionChore compactionChore;
 
+  public TestMobCompactionRegularRegionBatchMode(Boolean useFileBasedSFT) {
+    super(useFileBasedSFT);
+  }
+
   @Before
   public void setUp() throws Exception {
     super.setUp();
     compactionChore = new MobFileCompactionChore(conf, batchSize);
   }
 
-  @BeforeClass
-  public static void configureCompactionBatches() throws InterruptedException, 
IOException {
-    HTU.shutdownMiniHBaseCluster();
+  protected void additonalConfigSetup() {
     conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, 
batchSize);
-    HTU.startMiniHBaseCluster();
   }
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
index 23e7c233496..df3eb29525b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 import java.util.stream.Collectors;
@@ -32,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -43,18 +45,19 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import 
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.RegionSplitter;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,6 +71,7 @@ import org.slf4j.LoggerFactory;
  * Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob 
directory is 20. 12 Runs
  * scanner and checks all 3 * 1000 rows.
  */
+@RunWith(Parameterized.class)
 @Category(LargeTests.class)
 public class TestMobCompactionWithDefaults {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestMobCompactionWithDefaults.class);
@@ -97,8 +101,19 @@ public class TestMobCompactionWithDefaults {
 
   protected MobFileCleanerChore cleanerChore;
 
-  @BeforeClass
-  public static void htuStart() throws Exception {
+  protected Boolean useFileBasedSFT;
+
+  public TestMobCompactionWithDefaults(Boolean useFileBasedSFT) {
+    this.useFileBasedSFT = useFileBasedSFT;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Boolean> data() {
+    Boolean[] data = { false, true };
+    return Arrays.asList(data);
+  }
+
+  protected void htuStart() throws Exception {
     HTU = new HBaseTestingUtility();
     conf = HTU.getConfiguration();
     conf.setInt("hfile.format.version", 3);
@@ -111,17 +126,21 @@ public class TestMobCompactionWithDefaults {
     // Set compacted file discharger interval to a half minAgeToArchive
     conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive 
/ 2);
     conf.setBoolean("hbase.regionserver.compaction.enabled", false);
+    if (useFileBasedSFT) {
+      conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
+        
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
+    }
+    additonalConfigSetup();
     HTU.startMiniCluster();
   }
 
-  @AfterClass
-  public static void htuStop() throws Exception {
-    HTU.shutdownMiniCluster();
+  protected void additonalConfigSetup() {
   }
 
   @Before
   public void setUp() throws Exception {
-    tableDescriptor = 
HTU.createModifyableTableDescriptor(test.getMethodName());
+    htuStart();
+    tableDescriptor = 
HTU.createModifyableTableDescriptor(TestMobUtils.getTableName(test));
     admin = HTU.getAdmin();
     cleanerChore = new MobFileCleanerChore();
     familyDescriptor = new 
ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
@@ -158,6 +177,7 @@ public class TestMobCompactionWithDefaults {
   public void tearDown() throws Exception {
     admin.disableTable(tableDescriptor.getTableName());
     admin.deleteTable(tableDescriptor.getTableName());
+    HTU.shutdownMiniCluster();
   }
 
   @Test
@@ -173,12 +193,12 @@ public class TestMobCompactionWithDefaults {
 
   @Test
   public void testMobFileCompactionAfterSnapshotClone() throws 
InterruptedException, IOException {
-    final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
+    final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) 
+ "-clone");
     LOG.info("MOB compaction of cloned snapshot, " + description() + " 
started");
     loadAndFlushThreeTimes(rows, table, famStr);
     LOG.debug("Taking snapshot and cloning table {}", table);
-    admin.snapshot(test.getMethodName(), table);
-    admin.cloneSnapshot(test.getMethodName(), clone);
+    admin.snapshot(TestMobUtils.getTableName(test), table);
+    admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
     assertEquals("Should have 3 hlinks per region in MOB area from snapshot 
clone", 3 * numRegions,
       getNumberOfMobFiles(clone, famStr));
     mobCompact(admin.getDescriptor(clone), familyDescriptor);
@@ -191,12 +211,12 @@ public class TestMobCompactionWithDefaults {
   @Test
   public void testMobFileCompactionAfterSnapshotCloneAndFlush()
     throws InterruptedException, IOException {
-    final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
+    final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) 
+ "-clone");
     LOG.info("MOB compaction of cloned snapshot after flush, " + description() 
+ " started");
     loadAndFlushThreeTimes(rows, table, famStr);
     LOG.debug("Taking snapshot and cloning table {}", table);
-    admin.snapshot(test.getMethodName(), table);
-    admin.cloneSnapshot(test.getMethodName(), clone);
+    admin.snapshot(TestMobUtils.getTableName(test), table);
+    admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
     assertEquals("Should have 3 hlinks per region in MOB area from snapshot 
clone", 3 * numRegions,
       getNumberOfMobFiles(clone, famStr));
     loadAndFlushThreeTimes(rows, clone, famStr);
@@ -275,8 +295,11 @@ public class TestMobCompactionWithDefaults {
 
     Thread.sleep(minAgeToArchive + 1000);
     LOG.info("Cleaning up MOB files");
-    // Cleanup again
-    cleanerChore.cleanupObsoleteMobFiles(conf, table);
+
+    // run cleaner chore on each RS
+    for (ServerName sn : admin.getRegionServers()) {
+      
HTU.getMiniHBaseCluster().getRegionServer(sn).getRSMobFileCleanerChore().chore();
+    }
 
     assertEquals("After cleaning, we should have 1 MOB file per region based 
on size.", numRegions,
       getNumberOfMobFiles(table, family));
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
index bac0408eb4a..6ce127b994a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.mob;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -32,6 +33,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.CompactionState;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -168,14 +171,40 @@ public class TestMobFileCleanerChore {
 
     Thread.sleep(minAgeToArchive + 1000);
     LOG.info("Cleaning up MOB files");
-    // Cleanup again
+    // Cleanup
     chore.cleanupObsoleteMobFiles(conf, table.getName());
 
+    // verify that nothing have happened
     num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
-    assertEquals(1, num);
+    assertEquals(4, num);
 
     long scanned = scanTable();
     assertEquals(30, scanned);
+
+    // add a MOB file to with a name refering to a non-existing region
+    ColumnFamilyDescriptor familyDescriptor = 
ColumnFamilyDescriptorBuilder.newBuilder(fam)
+      .setMobEnabled(true).setMobThreshold(mobLen).setMaxVersions(1).build();
+    Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, 
table.getName(),
+      familyDescriptor, "nonExistentRegion");
+    num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
+    assertEquals(5, num);
+
+    LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
+
+    Thread.sleep(minAgeToArchive + 1000);
+    LOG.info("Cleaning up MOB files");
+    chore.cleanupObsoleteMobFiles(conf, table.getName());
+
+    // check that the extra file got deleted
+    num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
+    assertEquals(4, num);
+
+    FileSystem fs = FileSystem.get(conf);
+    assertFalse(fs.exists(extraMOBFile));
+
+    scanned = scanTable();
+    assertEquals(30, scanned);
+
   }
 
   private long getNumberOfMobFiles(Configuration conf, TableName tableName, 
String family)
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java
index 90b3d4085bf..123965c0eca 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java
@@ -26,11 +26,14 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -65,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionAsTable;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import 
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
 import 
org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -78,12 +82,15 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Test mob store compaction
  */
+@RunWith(Parameterized.class)
 @Category(MediumTests.class)
 public class TestMobStoreCompaction {
 
@@ -108,13 +115,30 @@ public class TestMobStoreCompaction {
   private final byte[] STARTROW = Bytes.toBytes(START_KEY);
   private int compactionThreshold;
 
+  private Boolean useFileBasedSFT;
+
+  public TestMobStoreCompaction(Boolean useFileBasedSFT) {
+    this.useFileBasedSFT = useFileBasedSFT;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Boolean> data() {
+    Boolean[] data = { false, true };
+    return Arrays.asList(data);
+  }
+
   private void init(Configuration conf, long mobThreshold) throws Exception {
+    if (useFileBasedSFT) {
+      conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
+        
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
+    }
+
     this.conf = conf;
     this.mobCellThreshold = mobThreshold;
     HBaseTestingUtility UTIL = new HBaseTestingUtility(conf);
 
     compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
-    htd = UTIL.createTableDescriptor(name.getMethodName());
+    htd = UTIL.createTableDescriptor(TestMobUtils.getTableName(name));
     hcd = new HColumnDescriptor(COLUMN_FAMILY);
     hcd.setMobEnabled(true);
     hcd.setMobThreshold(mobThreshold);
@@ -227,7 +251,7 @@ public class TestMobStoreCompaction {
     Path basedir = new Path(hbaseRootDir, htd.getNameAsString());
     List<Pair<byte[], String>> hfiles = new ArrayList<>(1);
     for (int i = 0; i < compactionThreshold; i++) {
-      Path hpath = new Path(basedir, "hfile" + i);
+      Path hpath = new Path(basedir, UUID.randomUUID().toString().replace("-", 
""));
       hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
       createHFile(hpath, i, dummyData);
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java
index 247dfff77d1..32170e98b35 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
 import 
org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
@@ -89,4 +90,8 @@ public class TestMobUtils {
     assertTrue(testTable3Refs.contains("file3a"));
     assertTrue(testTable3Refs.contains("file3b"));
   }
+
+  public static String getTableName(TestName test) {
+    return test.getMethodName().replace("[", "-").replace("]", "");
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
similarity index 65%
copy from 
hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
copy to 
hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
index bac0408eb4a..98187631d96 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
@@ -22,21 +22,26 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.CompactionState;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -50,17 +55,16 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Mob file cleaner chore test. 1. Creates MOB table 2. Load MOB data and 
flushes it N times 3. Runs
- * major MOB compaction (N MOB files go to archive) 4. Verifies that number of 
MOB files in a mob
- * directory is N+1 5. Waits for a period of time larger than minimum age to 
archive 6. Runs Mob
- * cleaner chore 7 Verifies that number of MOB files in a mob directory is 1.
+ * major MOB compaction 4. Verifies that number of MOB files in a mob 
directory is N+1 5. Waits for
+ * a period of time larger than minimum age to archive 6. Runs Mob cleaner 
chore 7 Verifies that
+ * every old MOB file referenced from current RS was archived
  */
-@SuppressWarnings("deprecation")
 @Category(MediumTests.class)
-public class TestMobFileCleanerChore {
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestMobFileCleanerChore.class);
+public class TestRSMobFileCleanerChore {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestRSMobFileCleanerChore.class);
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestMobFileCleanerChore.class);
+    HBaseClassTestRule.forClass(TestRSMobFileCleanerChore.class);
 
   private HBaseTestingUtility HTU;
 
@@ -72,33 +76,30 @@ public class TestMobFileCleanerChore {
     
.toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
 
   private Configuration conf;
-  private HTableDescriptor hdt;
-  private HColumnDescriptor hcd;
+  private TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor;
+  private ColumnFamilyDescriptor familyDescriptor;
   private Admin admin;
   private Table table = null;
-  private MobFileCleanerChore chore;
+  private RSMobFileCleanerChore chore;
   private long minAgeToArchive = 10000;
 
-  public TestMobFileCleanerChore() {
+  public TestRSMobFileCleanerChore() {
   }
 
   @Before
   public void setUp() throws Exception {
     HTU = new HBaseTestingUtility();
-    hdt = HTU.createTableDescriptor("testMobCompactTable");
     conf = HTU.getConfiguration();
 
     initConf();
 
     HTU.startMiniCluster();
     admin = HTU.getAdmin();
-    chore = new MobFileCleanerChore();
-    hcd = new HColumnDescriptor(fam);
-    hcd.setMobEnabled(true);
-    hcd.setMobThreshold(mobLen);
-    hcd.setMaxVersions(1);
-    hdt.addFamily(hcd);
-    table = HTU.createTable(hdt, null);
+    familyDescriptor = 
ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true)
+      .setMobThreshold(mobLen).setMaxVersions(1).build();
+    tableDescriptor =
+      
HTU.createModifyableTableDescriptor("testMobCompactTable").setColumnFamily(familyDescriptor);
+    table = HTU.createTable(tableDescriptor, Bytes.toByteArrays("1"));
   }
 
   private void initConf() {
@@ -140,42 +141,88 @@ public class TestMobFileCleanerChore {
 
   @After
   public void tearDown() throws Exception {
-    admin.disableTable(hdt.getTableName());
-    admin.deleteTable(hdt.getTableName());
+    admin.disableTable(tableDescriptor.getTableName());
+    admin.deleteTable(tableDescriptor.getTableName());
     HTU.shutdownMiniCluster();
   }
 
   @Test
   public void testMobFileCleanerChore() throws InterruptedException, 
IOException {
-
     loadData(0, 10);
     loadData(10, 10);
-    loadData(20, 10);
+    // loadData(20, 10);
     long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
-    assertEquals(3, num);
+    assertEquals(2, num);
     // Major compact
-    admin.majorCompact(hdt.getTableName(), fam);
+    admin.majorCompact(tableDescriptor.getTableName(), fam);
     // wait until compaction is complete
-    while (admin.getCompactionState(hdt.getTableName()) != 
CompactionState.NONE) {
+    while (admin.getCompactionState(tableDescriptor.getTableName()) != 
CompactionState.NONE) {
       Thread.sleep(100);
     }
 
     num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
-    assertEquals(4, num);
+    assertEquals(3, num);
     // We have guarantee, that compcated file discharger will run during this 
pause
     // because it has interval less than this wait time
     LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
 
     Thread.sleep(minAgeToArchive + 1000);
     LOG.info("Cleaning up MOB files");
-    // Cleanup again
-    chore.cleanupObsoleteMobFiles(conf, table.getName());
+
+    ServerName serverUsed = null;
+    List<RegionInfo> serverRegions = null;
+    for (ServerName sn : admin.getRegionServers()) {
+      serverRegions = admin.getRegions(sn);
+      if (serverRegions != null && serverRegions.size() > 0) {
+        // filtering out non test table regions
+        serverRegions = serverRegions.stream().filter(r -> r.getTable() == 
table.getName())
+          .collect(Collectors.toList());
+        // if such one is found use this rs
+        if (serverRegions.size() > 0) {
+          serverUsed = sn;
+        }
+        break;
+      }
+    }
+
+    chore = 
HTU.getMiniHBaseCluster().getRegionServer(serverUsed).getRSMobFileCleanerChore();
+
+    chore.chore();
 
     num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
-    assertEquals(1, num);
+    assertEquals(3 - serverRegions.size(), num);
 
     long scanned = scanTable();
-    assertEquals(30, scanned);
+    assertEquals(20, scanned);
+
+    // creating a MOB file not referenced from the current RS
+    Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, 
table.getName(),
+      familyDescriptor, "nonExistentRegion");
+
+    // verifying the new MOBfile is added
+    num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
+    assertEquals(4 - serverRegions.size(), num);
+
+    FileSystem fs = FileSystem.get(conf);
+    assertTrue(fs.exists(extraMOBFile));
+
+    LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
+
+    Thread.sleep(minAgeToArchive + 1000);
+    LOG.info("Cleaning up MOB files");
+
+    // running chore again
+    chore.chore();
+
+    // the chore should only archive old MOB files that were referenced from 
the current RS
+    // the unrelated MOB file is still there
+    num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
+    assertEquals(4 - serverRegions.size(), num);
+
+    assertTrue(fs.exists(extraMOBFile));
+
+    scanned = scanTable();
+    assertEquals(20, scanned);
   }
 
   private long getNumberOfMobFiles(Configuration conf, TableName tableName, 
String family)

Reply via email to