This is an automated email from the ASF dual-hosted git repository.

huaxiangsun pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new e595f154d10 HBASE-23723 Ensure MOB compaction works in optimized mode 
after snapshot clone (#4617) (#4704)
e595f154d10 is described below

commit e595f154d108f335c3025dee8ef85b8da586b0f2
Author: huaxiangsun <huaxiang...@apache.org>
AuthorDate: Mon Aug 15 13:16:00 2022 -0700

    HBASE-23723 Ensure MOB compaction works in optimized mode after snapshot 
clone (#4617) (#4704)
    
    * HBASE-23723 Ensure MOB compaction works in optimized mode after snapshot 
clone (#1446)
    
    * Reorganize MOB compaction tests for more reuse.
    * Add tests for mob compaction after snapshot clone operations
    * note the original table used to write a given mob hfile and use that to 
find it later.
    
    Signed-off-by: Esteban Gutierrez <este...@apache.org>
    
    * spotless:apply to fix HBaseTestingUtility
    
    * Fix error-prone errors
    
    Signed-off-by: Esteban Gutierrez <este...@apache.org>
    Co-authored-by: Sean Busbey <bus...@apache.org>
    
    Signed-off-by: Esteban Gutierrez <este...@apache.org>
    Co-authored-by: Andrew Purtell <apurt...@apache.org>
    Co-authored-by: Sean Busbey <bus...@apache.org>
---
 .../org/apache/hadoop/hbase/PrivateCellUtil.java   |   2 +-
 .../java/org/apache/hadoop/hbase/TableName.java    |  60 +++-
 .../hadoop/hbase/io/hfile/HFilePrettyPrinter.java  |   8 +-
 .../hadoop/hbase/mob/DefaultMobStoreCompactor.java | 127 +++++---
 .../hadoop/hbase/mob/DefaultMobStoreFlusher.java   |   6 +-
 .../hadoop/hbase/mob/MobFileCleanerChore.java      |  41 +--
 .../java/org/apache/hadoop/hbase/mob/MobUtils.java | 150 ++++++++--
 .../hadoop/hbase/regionserver/HMobStore.java       |  55 ++--
 .../hadoop/hbase/regionserver/HStoreFile.java      |   3 +-
 .../hadoop/hbase/regionserver/StoreFileWriter.java |  17 +-
 .../apache/hadoop/hbase/HBaseTestingUtility.java   |  23 ++
 .../hadoop/hbase/mob/FaultyMobStoreCompactor.java  |  33 ++-
 .../hadoop/hbase/mob/TestMobCompactionOptMode.java |  35 +--
 .../mob/TestMobCompactionOptRegionBatchMode.java   |  39 ++-
 .../TestMobCompactionRegularRegionBatchMode.java   |  38 +--
 .../hbase/mob/TestMobCompactionWithDefaults.java   | 322 +++++++++++++++++++++
 16 files changed, 739 insertions(+), 220 deletions(-)

diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java
index b3e70132dfc..001dfaae6d5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java
@@ -916,7 +916,7 @@ public final class PrivateCellUtil {
    * Retrieve Cell's first tag, matching the passed in type
    * @param cell The Cell
    * @param type Type of the Tag to retrieve
-   * @return null if there is no tag of the passed in tag type
+   * @return Optional, empty if there is no tag of the passed in tag type
    */
   public static Optional<Tag> getTag(Cell cell, byte type) {
     boolean bufferBacked = cell instanceof ByteBufferExtendedCell;
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
index 18eba1ebb53..174a4169c0d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
@@ -22,9 +22,12 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
 /**
  * Immutable POJO class for representing a table name. Which is of the form: 
&lt;table
  * namespace&gt;:&lt;table qualifier&gt; Two special namespaces: 1. hbase - 
system namespace, used
@@ -386,28 +389,69 @@ public final class TableName implements 
Comparable<TableName> {
   }
 
   /**
+   * @param fullName will use the entire byte array
    * @throws IllegalArgumentException if fullName equals old root or old meta. 
Some code depends on
    *                                  this. The test is buried in the table 
creation to save on
    *                                  array comparison when we're creating a 
standard table object
    *                                  that will be in the cache.
    */
   public static TableName valueOf(byte[] fullName) throws 
IllegalArgumentException {
+    return valueOf(fullName, 0, fullName.length);
+  }
+
+  /**
+   * @param fullName byte array to look into
+   * @param offset   within said array
+   * @param length   within said array
+   * @throws IllegalArgumentException if fullName equals old root or old meta.
+   */
+  public static TableName valueOf(byte[] fullName, int offset, int length)
+    throws IllegalArgumentException {
+    Preconditions.checkArgument(offset >= 0, "offset must be non-negative but 
was %s", offset);
+    Preconditions.checkArgument(offset < fullName.length, "offset (%s) must be 
< array length (%s)",
+      offset, fullName.length);
+    Preconditions.checkArgument(length <= fullName.length,
+      "length (%s) must be <= array length (%s)", length, fullName.length);
     for (TableName tn : tableCache) {
-      if (Arrays.equals(tn.getName(), fullName)) {
+      final byte[] tnName = tn.getName();
+      if (Bytes.equals(tnName, 0, tnName.length, fullName, offset, length)) {
         return tn;
       }
     }
 
-    int namespaceDelimIndex = 
org.apache.hbase.thirdparty.com.google.common.primitives.Bytes
-      .lastIndexOf(fullName, (byte) NAMESPACE_DELIM);
+    int namespaceDelimIndex = ArrayUtils.lastIndexOf(fullName, (byte) 
NAMESPACE_DELIM);
 
-    if (namespaceDelimIndex < 0) {
+    if (namespaceDelimIndex < offset) {
       return 
createTableNameIfNecessary(ByteBuffer.wrap(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME),
-        ByteBuffer.wrap(fullName));
+        ByteBuffer.wrap(fullName, offset, length));
+    } else {
+      return createTableNameIfNecessary(ByteBuffer.wrap(fullName, offset, 
namespaceDelimIndex),
+        ByteBuffer.wrap(fullName, namespaceDelimIndex + 1, length - 
(namespaceDelimIndex + 1)));
+    }
+  }
+
+  /**
+   * @param fullname of a table, possibly with a leading namespace and ':' as 
delimiter.
+   * @throws IllegalArgumentException if fullName equals old root or old meta.
+   */
+  public static TableName valueOf(ByteBuffer fullname) {
+    fullname = fullname.duplicate();
+    fullname.mark();
+    boolean miss = true;
+    while (fullname.hasRemaining() && miss) {
+      miss = ((byte) NAMESPACE_DELIM) != fullname.get();
+    }
+    if (miss) {
+      fullname.reset();
+      return valueOf(null, fullname);
     } else {
-      return createTableNameIfNecessary(ByteBuffer.wrap(fullName, 0, 
namespaceDelimIndex),
-        ByteBuffer.wrap(fullName, namespaceDelimIndex + 1,
-          fullName.length - (namespaceDelimIndex + 1)));
+      ByteBuffer qualifier = fullname.slice();
+      int delimiterIndex = fullname.position() - 1;
+      fullname.reset();
+      // changing variable name for clarity
+      ByteBuffer namespace = fullname.duplicate();
+      namespace.limit(delimiterIndex);
+      return valueOf(namespace, qualifier);
     }
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
index 842ca8389c8..cb74ed39a85 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
@@ -41,6 +41,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TimeZone;
@@ -422,17 +423,16 @@ public class HFilePrettyPrinter extends Configured 
implements Tool {
       }
       // check if mob files are missing.
       if (checkMobIntegrity && MobUtils.isMobReferenceCell(cell)) {
-        Tag tnTag = MobUtils.getTableNameTag(cell);
-        if (tnTag == null) {
+        Optional<TableName> tn = MobUtils.getTableName(cell);
+        if (!tn.isPresent()) {
           System.err.println(
             "ERROR, wrong tag format in mob reference cell " + 
CellUtil.getCellKeyAsString(cell));
         } else if (!MobUtils.hasValidMobRefCellValue(cell)) {
           System.err.println(
             "ERROR, wrong value format in mob reference cell " + 
CellUtil.getCellKeyAsString(cell));
         } else {
-          TableName tn = TableName.valueOf(Tag.cloneValue(tnTag));
           String mobFileName = MobUtils.getMobFileName(cell);
-          boolean exist = mobFileExists(fs, tn, mobFileName,
+          boolean exist = mobFileExists(fs, tn.get(), mobFileName,
             Bytes.toString(CellUtil.cloneFamily(cell)), foundMobFiles, 
missingMobFiles);
           if (!exist) {
             // report error
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index 5f02b227350..6de12111a82 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -26,9 +26,9 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
+import java.util.Optional;
 import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -37,9 +37,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.CellSink;
 import org.apache.hadoop.hbase.regionserver.HMobStore;
 import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.ScanInfo;
@@ -62,7 +64,10 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
+import 
org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
 
 /**
  * Compact passed set of files in the mob-enabled column family.
@@ -82,12 +87,8 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
    * compaction process.
    */
 
-  static ThreadLocal<Set<String>> mobRefSet = new ThreadLocal<Set<String>>() {
-    @Override
-    protected Set<String> initialValue() {
-      return new HashSet<String>();
-    }
-  };
+  static ThreadLocal<SetMultimap<TableName, String>> mobRefSet =
+    ThreadLocal.withInitial(HashMultimap::create);
 
   /*
    * Is it user or system-originated request.
@@ -192,34 +193,72 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
     // Check if I/O optimized MOB compaction
     if (ioOptimizedMode) {
       if (request.isMajor() && request.getPriority() == HStore.PRIORITY_USER) {
-        Path mobDir =
-          MobUtils.getMobFamilyPath(conf, store.getTableName(), 
store.getColumnFamilyName());
-        List<Path> mobFiles = 
MobUtils.getReferencedMobFiles(request.getFiles(), mobDir);
-        // reset disableIO
-        disableIO.set(Boolean.FALSE);
-        if (mobFiles.size() > 0) {
-          calculateMobLengthMap(mobFiles);
+        try {
+          final SetMultimap<TableName, String> mobRefs = 
request.getFiles().stream().map(file -> {
+            byte[] value = file.getMetadataValue(HStoreFile.MOB_FILE_REFS);
+            ImmutableSetMultimap.Builder<TableName, String> builder;
+            if (value == null) {
+              builder = ImmutableSetMultimap.builder();
+            } else {
+              try {
+                builder = MobUtils.deserializeMobFileRefs(value);
+              } catch (RuntimeException exception) {
+                throw new RuntimeException("failure getting mob references for 
hfile " + file,
+                  exception);
+              }
+            }
+            return builder;
+          }).reduce((a, b) -> 
a.putAll(b.build())).orElseGet(ImmutableSetMultimap::builder).build();
+          // reset disableIO
+          disableIO.set(Boolean.FALSE);
+          if (!mobRefs.isEmpty()) {
+            calculateMobLengthMap(mobRefs);
+          }
+          LOG.info(
+            "Table={} cf={} region={}. I/O optimized MOB compaction. "
+              + "Total referenced MOB files: {}",
+            tableName, familyName, regionName, mobRefs.size());
+        } catch (RuntimeException exception) {
+          throw new IOException("Failed to get list of referenced hfiles for 
request " + request,
+            exception);
         }
-        LOG.info("Table={} cf={} region={}. I/O optimized MOB compaction. "
-          + "Total referenced MOB files: {}", tableName, familyName, 
regionName, mobFiles.size());
       }
     }
 
     return compact(request, scannerFactory, writerFactory, 
throughputController, user);
   }
 
-  private void calculateMobLengthMap(List<Path> mobFiles) throws IOException {
+  /**
+   * @param mobRefs multimap of original table name -> mob hfile
+   */
+  private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) 
throws IOException {
     FileSystem fs = store.getFileSystem();
     HashMap<String, Long> map = mobLengthMap.get();
     map.clear();
-    for (Path p : mobFiles) {
-      if (MobFileName.isOldMobFileName(p.getName())) {
+    for (Map.Entry<TableName, String> reference : mobRefs.entries()) {
+      final TableName table = reference.getKey();
+      final String mobfile = reference.getValue();
+      if (MobFileName.isOldMobFileName(mobfile)) {
         disableIO.set(Boolean.TRUE);
       }
-      FileStatus st = fs.getFileStatus(p);
-      long size = st.getLen();
-      LOG.debug("Referenced MOB file={} size={}", p, size);
-      map.put(p.getName(), fs.getFileStatus(p).getLen());
+      List<Path> locations = mobStore.getLocations(table);
+      for (Path p : locations) {
+        try {
+          FileStatus st = fs.getFileStatus(new Path(p, mobfile));
+          long size = st.getLen();
+          LOG.debug("Referenced MOB file={} size={}", mobfile, size);
+          map.put(mobfile, size);
+          break;
+        } catch (FileNotFoundException exception) {
+          LOG.debug("Mob file {} was not in location {}. May have other 
locations to try.", mobfile,
+            p);
+        }
+      }
+      if (!map.containsKey(mobfile)) {
+        throw new FileNotFoundException("Could not find mob file " + mobfile + 
" in the list of "
+          + "expected locations: " + locations);
+      }
+
     }
   }
 
@@ -395,8 +434,15 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
                     // We leave large MOB file as is (is not compacted),
                     // then we update set of MOB file references
                     // and append mob cell directly to the store's writer
-                    mobRefSet.get().add(fName);
-                    writer.append(mobCell);
+                    Optional<TableName> refTable = MobUtils.getTableName(c);
+                    if (refTable.isPresent()) {
+                      mobRefSet.get().put(refTable.get(), fName);
+                      writer.append(c);
+                    } else {
+                      throw new IOException("MOB cell did not contain a 
tablename "
+                        + "tag. should not be possible. see ref guide on mob 
troubleshooting. "
+                        + "store=" + getStoreInfo() + " cell=" + c);
+                    }
                   }
                 }
               } else {
@@ -444,9 +490,15 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
             if (MobUtils.hasValidMobRefCellValue(c)) {
               // We do not check mobSizeThreshold during normal compaction,
               // leaving it to a MOB compaction run
-              writer.append(c);
-              // Add MOB reference to a MOB reference set
-              mobRefSet.get().add(MobUtils.getMobFileName(c));
+              Optional<TableName> refTable = MobUtils.getTableName(c);
+              if (refTable.isPresent()) {
+                mobRefSet.get().put(refTable.get(), 
MobUtils.getMobFileName(c));
+                writer.append(c);
+              } else {
+                throw new IOException("MOB cell did not contain a tablename "
+                  + "tag. should not be possible. see ref guide on mob 
troubleshooting. " + "store="
+                  + getStoreInfo() + " cell=" + c);
+              }
             } else {
               String errMsg = String.format("Corrupted MOB reference: %s", 
c.toString());
               throw new IOException(errMsg);
@@ -525,7 +577,7 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
       throughputController.finish(compactionName);
       if (!finished && mobFileWriter != null) {
         // Remove all MOB references because compaction failed
-        mobRefSet.get().clear();
+        clearThreadLocals();
         // Abort writer
         LOG.debug("Aborting writer for {} because of a compaction failure, 
Store {}",
           mobFileWriter.getPath(), getStoreInfo());
@@ -543,16 +595,13 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
     return true;
   }
 
-  private String getStoreInfo() {
+  protected String getStoreInfo() {
     return String.format("[table=%s family=%s region=%s]", 
store.getTableName().getNameAsString(),
       store.getColumnFamilyName(), store.getRegionInfo().getEncodedName());
   }
 
   private void clearThreadLocals() {
-    Set<String> set = mobRefSet.get();
-    if (set != null) {
-      set.clear();
-    }
+    mobRefSet.get().clear();
     HashMap<String, Long> map = mobLengthMap.get();
     if (map != null) {
       map.clear();
@@ -567,7 +616,7 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
       LOG.debug("New MOB writer created={} store={}", 
mobFileWriter.getPath().getName(),
         getStoreInfo());
       // Add reference we get for compact MOB
-      mobRefSet.get().add(mobFileWriter.getPath().getName());
+      mobRefSet.get().put(store.getTableName(), 
mobFileWriter.getPath().getName());
       return mobFileWriter;
     } catch (IOException e) {
       // Bailing out
@@ -599,7 +648,7 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
         LOG.debug("Aborting writer for {} because there are no MOB cells, 
store={}",
           mobFileWriter.getPath(), getStoreInfo());
         // Remove MOB file from reference set
-        mobRefSet.get().remove(mobFileWriter.getPath().getName());
+        mobRefSet.get().remove(store.getTableName(), 
mobFileWriter.getPath().getName());
         abortWriter(mobFileWriter);
       }
     } else {
@@ -612,9 +661,7 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
     CompactionRequestImpl request) throws IOException {
     List<Path> newFiles = Lists.newArrayList(writer.getPath());
     writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), 
request.getFiles());
-    // Append MOB references
-    Set<String> refSet = mobRefSet.get();
-    writer.appendMobMetadata(refSet);
+    writer.appendMobMetadata(mobRefSet.get());
     writer.close();
     clearThreadLocals();
     return newFiles;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index baec21a593c..a7f2ecdf242 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
 import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
@@ -47,6 +48,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
+
 /**
  * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher. 
If the store is not a
  * mob store, the flusher flushes the MemStore the same with 
DefaultStoreFlusher, If the store is a
@@ -277,7 +280,8 @@ public class DefaultMobStoreFlusher extends 
DefaultStoreFlusher {
     // The hfile is current up to and including cacheFlushSeqNum.
     status.setStatus("Flushing " + store + ": appending metadata");
     writer.appendMetadata(cacheFlushSeqNum, false);
-    writer.appendMobMetadata(mobRefSet.get());
+    writer.appendMobMetadata(ImmutableSetMultimap.<TableName, String> builder()
+      .putAll(store.getTableName(), mobRefSet.get()).build());
     status.setStatus("Flushing " + store + ": closing flushed file");
     writer.close();
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
index 24ac5e2a0ff..e2c1f8961de 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
@@ -21,7 +21,6 @@ import com.google.errorprone.annotations.RestrictedApi;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -53,6 +52,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
+
 /**
  * The class MobFileCleanerChore for running cleaner regularly to remove the 
expired and obsolete
  * (files which have no active references to) mob files.
@@ -209,27 +210,27 @@ public class MobFileCleanerChore extends ScheduledChore {
                 byte[] bulkloadMarkerData = 
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
                 // close store file to avoid memory leaks
                 sf.closeStoreFile(true);
-                if (mobRefData == null && bulkloadMarkerData == null) {
-                  LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
-                    + "can not proceed until all old files will be 
MOB-compacted.", pp);
-                  return;
-                } else if (mobRefData == null && bulkloadMarkerData != null) {
-                  LOG.debug("Skipping file without MOB references (bulkloaded 
file):{}", pp);
-                  continue;
-                }
-                // mobRefData will never be null here, but to make FindBugs 
happy
-                if (mobRefData != null && mobRefData.length > 1) {
-                  // if length = 1 means NULL, that there are no MOB references
-                  // in this store file, but the file was created by new MOB 
code
-                  String[] mobs = new String(mobRefData).split(",");
-                  if (LOG.isTraceEnabled()) {
-                    LOG.trace("Found: {} mob references: {}", mobs.length, 
Arrays.toString(mobs));
+                if (mobRefData == null) {
+                  if (bulkloadMarkerData == null) {
+                    LOG.warn("Found old store file with no MOB_FILE_REFS: {} - 
"
+                      + "can not proceed until all old files will be 
MOB-compacted.", pp);
+                    return;
                   } else {
-                    LOG.debug("Found: {} mob references", mobs.length);
+                    LOG.debug("Skipping file without MOB references 
(bulkloaded file):{}", pp);
+                    continue;
                   }
-                  regionMobs.addAll(Arrays.asList(mobs));
-                } else {
-                  LOG.debug("File {} does not have mob references", 
currentPath);
+                }
+                // file may or may not have MOB references, but was created by 
the distributed
+                // mob compaction code.
+                try {
+                  SetMultimap<TableName, String> mobs =
+                    MobUtils.deserializeMobFileRefs(mobRefData).build();
+                  LOG.debug("Found {} mob references for store={}", 
mobs.size(), sf);
+                  LOG.trace("Specific mob references found for store={} : {}", 
sf, mobs);
+                  regionMobs.addAll(mobs.values());
+                } catch (RuntimeException exception) {
+                  throw new IOException("failure getting mob references for 
hfile " + sf,
+                    exception);
                 }
               }
             } catch (FileNotFoundException e) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index aac75835164..f408c9642a7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -22,17 +22,16 @@ import static 
org.apache.hadoop.hbase.mob.MobConstants.MOB_CLEANER_BATCH_SIZE_UP
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Date;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
-import java.util.Set;
 import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -70,6 +69,9 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
+
 /**
  * The mob utilities
  */
@@ -131,14 +133,51 @@ public final class MobUtils {
    * @param cell The current cell.
    * @return The table name tag.
    */
-  public static Tag getTableNameTag(Cell cell) {
+  private static Optional<Tag> getTableNameTag(Cell cell) {
+    Optional<Tag> tag = Optional.empty();
     if (cell.getTagsLength() > 0) {
-      Optional<Tag> tag = PrivateCellUtil.getTag(cell, 
TagType.MOB_TABLE_NAME_TAG_TYPE);
-      if (tag.isPresent()) {
-        return tag.get();
+      tag = PrivateCellUtil.getTag(cell, TagType.MOB_TABLE_NAME_TAG_TYPE);
+    }
+    return tag;
+  }
+
+  /**
+   * Gets the table name from when this cell was written into a mob hfile as a 
string.
+   * @param cell to extract tag from
+   * @return table name as a string. empty if the tag is not found.
+   */
+  public static Optional<String> getTableNameString(Cell cell) {
+    Optional<Tag> tag = getTableNameTag(cell);
+    Optional<String> name = Optional.empty();
+    if (tag.isPresent()) {
+      name = Optional.of(Tag.getValueAsString(tag.get()));
+    }
+    return name;
+  }
+
+  /**
+   * Get the table name from when this cell was written into a mob hfile as a 
TableName.
+   * @param cell to extract tag from
+   * @return name of table as a TableName. empty if the tag is not found.
+   */
+  public static Optional<TableName> getTableName(Cell cell) {
+    Optional<Tag> maybe = getTableNameTag(cell);
+    Optional<TableName> name = Optional.empty();
+    if (maybe.isPresent()) {
+      final Tag tag = maybe.get();
+      if (tag.hasArray()) {
+        name = Optional
+          .of(TableName.valueOf(tag.getValueArray(), tag.getValueOffset(), 
tag.getValueLength()));
+      } else {
+        // TODO ByteBuffer handling in tags looks busted. revisit.
+        ByteBuffer buffer = tag.getValueByteBuffer().duplicate();
+        buffer.mark();
+        buffer.position(tag.getValueOffset());
+        buffer.limit(tag.getValueOffset() + tag.getValueLength());
+        name = Optional.of(TableName.valueOf(buffer));
       }
     }
-    return null;
+    return name;
   }
 
   /**
@@ -395,8 +434,7 @@ public final class MobUtils {
 
   /**
    * Gets the RegionInfo of the mob files. This is a dummy region. The mob 
files are not saved in a
-   * region in HBase. This is only used in mob snapshot. It's internally used 
only. n * @return A
-   * dummy mob region info.
+   * region in HBase. It's internally used only.
    */
   public static RegionInfo getMobRegionInfo(TableName tableName) {
     return 
RegionInfoBuilder.newBuilder(tableName).setStartKey(MobConstants.MOB_REGION_NAME_BYTES)
@@ -682,27 +720,81 @@ public final class MobUtils {
   }
 
   /**
-   * Get list of referenced MOB files from a given collection of store files
-   * @param storeFiles store files
-   * @param mobDir     MOB file directory
-   * @return list of MOB file paths
-   */
-
-  public static List<Path> getReferencedMobFiles(Collection<HStoreFile> 
storeFiles, Path mobDir) {
-
-    Set<String> mobSet = new HashSet<String>();
-    for (HStoreFile sf : storeFiles) {
-      byte[] value = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
-      if (value != null && value.length > 1) {
-        String s = Bytes.toString(value);
-        String[] all = s.split(",");
-        Collections.addAll(mobSet, all);
+   * Serialize a set of referenced mob hfiles
+   * @param mobRefSet to serialize, may be null
+   * @return byte array to i.e. put into store file metadata. will not be null
+   */
+  public static byte[] serializeMobFileRefs(SetMultimap<TableName, String> 
mobRefSet) {
+    if (mobRefSet != null && mobRefSet.size() > 0) {
+      // Here we rely on the fact that '/' and ',' are not allowed in either 
table names nor hfile
+      // names for serialization.
+      //
+      // 
exampleTable/filename1,filename2//example:table/filename5//otherTable/filename3,filename4
+      //
+      // to approximate the needed capacity we use the fact that there will 
usually be 1 table name
+      // and each mob filename is around 105 bytes. we pick an arbitrary 
number to cover "most"
+      // single table name lengths
+      StringBuilder sb = new StringBuilder(100 + mobRefSet.size() * 105);
+      boolean doubleSlash = false;
+      for (TableName tableName : mobRefSet.keySet()) {
+        sb.append(tableName).append("/");
+        boolean comma = false;
+        for (String refs : mobRefSet.get(tableName)) {
+          sb.append(refs);
+          if (comma) {
+            sb.append(",");
+          } else {
+            comma = true;
+          }
+        }
+        if (doubleSlash) {
+          sb.append("//");
+        } else {
+          doubleSlash = true;
+        }
       }
+      return Bytes.toBytes(sb.toString());
+    } else {
+      return HStoreFile.NULL_VALUE;
     }
-    List<Path> retList = new ArrayList<Path>();
-    for (String name : mobSet) {
-      retList.add(new Path(mobDir, name));
+  }
+
+  /**
+   * Deserialize the set of referenced mob hfiles from store file metadata.
+   * @param bytes compatibly serialized data. can not be null
+   * @return a setmultimap of original table to list of hfile names. will be 
empty if no values.
+   * @throws IllegalStateException if there are values but no table name
+   */
+  public static ImmutableSetMultimap.Builder<TableName, String> 
deserializeMobFileRefs(byte[] bytes)
+    throws IllegalStateException {
+    ImmutableSetMultimap.Builder<TableName, String> map = 
ImmutableSetMultimap.builder();
+    if (bytes.length > 1) {
+      // TODO avoid turning the tablename pieces in to strings.
+      String s = Bytes.toString(bytes);
+      String[] tables = s.split("//");
+      for (String tableEnc : tables) {
+        final int delim = tableEnc.indexOf('/');
+        if (delim <= 0) {
+          throw new IllegalStateException("MOB reference data does not match 
expected encoding: "
+            + "no table name included before list of mob refs.");
+        }
+        TableName table = TableName.valueOf(tableEnc.substring(0, delim));
+        String[] refs = tableEnc.substring(delim + 1).split(",");
+        map.putAll(table, refs);
+      }
+    } else {
+      if (LOG.isDebugEnabled()) {
+        // array length 1 should be the NULL_VALUE.
+        if (!Arrays.equals(HStoreFile.NULL_VALUE, bytes)) {
+          LOG.debug(
+            "Serialized MOB file refs array was treated as the placeholder 'no 
entries' but"
+              + " didn't have the expected placeholder byte. expected={} and 
actual={}",
+            Arrays.toString(HStoreFile.NULL_VALUE), Arrays.toString(bytes));
+        }
+      }
+
     }
-    return retList;
+    return map;
   }
+
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index c78efd0ce5f..ac69eb8d324 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -24,6 +24,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -54,7 +55,6 @@ import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.mob.MobFileName;
 import org.apache.hadoop.hbase.mob.MobStoreEngine;
 import org.apache.hadoop.hbase.mob.MobUtils;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.hadoop.hbase.util.IdLock;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -90,7 +90,7 @@ public class HMobStore extends HStore {
   private AtomicLong mobFlushedCellsSize = new AtomicLong();
   private AtomicLong mobScanCellsCount = new AtomicLong();
   private AtomicLong mobScanCellsSize = new AtomicLong();
-  private Map<String, List<Path>> map = new ConcurrentHashMap<>();
+  private Map<TableName, List<Path>> map = new ConcurrentHashMap<>();
   private final IdLock keyLock = new IdLock();
   // When we add a MOB reference cell to the HFile, we will add 2 tags along 
with it
   // 1. A ref tag with type TagType.MOB_REFERENCE_TAG_TYPE. This just denote 
this this cell is not
@@ -112,7 +112,7 @@ public class HMobStore extends HStore {
     TableName tn = region.getTableDescriptor().getTableName();
     locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn,
       MobUtils.getMobRegionInfo(tn).getEncodedName(), 
family.getNameAsString()));
-    map.put(Bytes.toString(tn.getName()), locations);
+    map.put(tn, locations);
     List<Tag> tags = new ArrayList<>(2);
     tags.add(MobConstants.MOB_REF_TAG);
     Tag tableNameTag =
@@ -304,26 +304,9 @@ public class HMobStore extends HStore {
     MobCell mobCell = null;
     if (MobUtils.hasValidMobRefCellValue(reference)) {
       String fileName = MobUtils.getMobFileName(reference);
-      Tag tableNameTag = MobUtils.getTableNameTag(reference);
-      if (tableNameTag != null) {
-        String tableNameString = Tag.getValueAsString(tableNameTag);
-        List<Path> locations = map.get(tableNameString);
-        if (locations == null) {
-          IdLock.Entry lockEntry = 
keyLock.getLockEntry(tableNameString.hashCode());
-          try {
-            locations = map.get(tableNameString);
-            if (locations == null) {
-              locations = new ArrayList<>(2);
-              TableName tn = TableName.valueOf(tableNameString);
-              locations.add(MobUtils.getMobFamilyPath(conf, tn, 
getColumnFamilyName()));
-              locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn,
-                MobUtils.getMobRegionInfo(tn).getEncodedName(), 
getColumnFamilyName()));
-              map.put(tableNameString, locations);
-            }
-          } finally {
-            keyLock.releaseLockEntry(lockEntry);
-          }
-        }
+      Optional<TableName> tableName = MobUtils.getTableName(reference);
+      if (tableName.isPresent()) {
+        List<Path> locations = getLocations(tableName.get());
         mobCell = readCell(locations, fileName, reference, cacheBlocks, readPt,
           readEmptyValueOnMobCellMiss);
       }
@@ -346,6 +329,32 @@ public class HMobStore extends HStore {
     return mobCell;
   }
 
+  /**
+   * @param tableName to look up locations for, can not be null
+   * @return a list of location in order of working dir, archive dir. will not 
be null.
+   */
+  public List<Path> getLocations(TableName tableName) throws IOException {
+    List<Path> locations = map.get(tableName);
+    if (locations == null) {
+      IdLock.Entry lockEntry = keyLock.getLockEntry(tableName.hashCode());
+      try {
+        locations = map.get(tableName);
+        if (locations == null) {
+          locations = new ArrayList<>(2);
+          locations.add(MobUtils.getMobFamilyPath(conf, tableName,
+            getColumnFamilyDescriptor().getNameAsString()));
+          locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tableName,
+            MobUtils.getMobRegionInfo(tableName).getEncodedName(),
+            getColumnFamilyDescriptor().getNameAsString()));
+          map.put(tableName, locations);
+        }
+      } finally {
+        keyLock.releaseLockEntry(lockEntry);
+      }
+    }
+    return locations;
+  }
+
   /**
    * Reads the cell from a mob file. The mob file might be located in 
different directories. 1. The
    * working directory. 2. The archive directory. Reads the cell from the 
files located in both of
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
index 11685268480..b3bd2898e34 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
@@ -323,7 +323,8 @@ public class HStoreFile implements StoreFile {
   }
 
   /**
-   * Only used by the Striped Compaction Policy n * @return value associated 
with the metadata key
+   * @param key to look up
+   * @return value associated with the metadata key
    */
   public byte[] getMetadataValue(byte[] key) {
     return metadataMap.get(key);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
index a5399cb07bb..de32c270565 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
@@ -26,7 +26,6 @@ import static 
org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_K
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS;
-import static org.apache.hadoop.hbase.regionserver.HStoreFile.NULL_VALUE;
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
 
 import java.io.IOException;
@@ -40,7 +39,6 @@ import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -48,11 +46,13 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
+import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.util.BloomContext;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.BloomFilterUtil;
@@ -68,6 +68,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.base.Strings;
+import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 
@@ -243,17 +244,11 @@ public class StoreFileWriter implements CellSink, 
ShipperListener {
 
   /**
    * Appends MOB - specific metadata (even if it is empty)
-   * @param mobRefSet - set of MOB file names
+   * @param mobRefSet - original table -> set of MOB file names
    * @throws IOException problem writing to FS
    */
-  public void appendMobMetadata(Set<String> mobRefSet) throws IOException {
-    if (mobRefSet != null && mobRefSet.size() > 0) {
-      String sb = StringUtils.join(mobRefSet, ",");
-      byte[] bytes = Bytes.toBytes(sb.toString());
-      writer.appendFileInfo(MOB_FILE_REFS, bytes);
-    } else {
-      writer.appendFileInfo(MOB_FILE_REFS, NULL_VALUE);
-    }
+  public void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) 
throws IOException {
+    writer.appendFileInfo(MOB_FILE_REFS, 
MobUtils.serializeMobFileRefs(mobRefSet));
   }
 
   /**
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index f94cae51953..40975e626be 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -1906,6 +1906,29 @@ public class HBaseTestingUtility extends 
HBaseZKTestingUtility {
   public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, 
FIRST_CHAR };
   public static final String START_KEY = new String(START_KEY_BYTES, 
HConstants.UTF8_CHARSET);
 
+  public TableDescriptorBuilder.ModifyableTableDescriptor
+    createModifyableTableDescriptor(final String name) {
+    return createModifyableTableDescriptor(TableName.valueOf(name),
+      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, 
HConstants.FOREVER,
+      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
+  }
+
+  public TableDescriptorBuilder.ModifyableTableDescriptor 
createModifyableTableDescriptor(
+    final TableName name, final int minVersions, final int versions, final int 
ttl,
+    KeepDeletedCells keepDeleted) {
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
+    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
+      ColumnFamilyDescriptorBuilder cfBuilder = 
ColumnFamilyDescriptorBuilder.newBuilder(cfName)
+        
.setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
+        .setBlockCacheEnabled(false).setTimeToLive(ttl);
+      if (isNewVersionBehaviorEnabled()) {
+        cfBuilder.setNewVersionBehavior(true);
+      }
+      builder.setColumnFamily(cfBuilder.build());
+    }
+    return new TableDescriptorBuilder.ModifyableTableDescriptor(name, 
builder.build());
+  }
+
   /**
    * @deprecated since 2.0.0 and will be removed in 3.0.0. Use
    *             {@link #createTableDescriptor(TableName, int, int, int, 
KeepDeletedCells)} instead.
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
index 3a4c18dd7ea..f19289dca75 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
@@ -23,14 +23,15 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
 import org.apache.hadoop.hbase.regionserver.CellSink;
 import org.apache.hadoop.hbase.regionserver.HStore;
@@ -116,8 +117,6 @@ public class FaultyMobStoreCompactor extends 
DefaultMobStoreCompactor {
       }
     }
 
-    FileSystem fs = store.getFileSystem();
-
     // Since scanner.next() can return 'false' but still be delivering data,
     // we have to use a do/while loop.
     List<Cell> cells = new ArrayList<>();
@@ -169,7 +168,7 @@ public class FaultyMobStoreCompactor extends 
DefaultMobStoreCompactor {
         // Add the only reference we get for compact MOB case
         // because new store file will have only one MOB reference
         // in this case - of newly compacted MOB file
-        mobRefSet.get().add(mobFileWriter.getPath().getName());
+        mobRefSet.get().put(store.getTableName(), 
mobFileWriter.getPath().getName());
       }
       do {
         hasMore = scanner.next(cells, scannerContext);
@@ -242,9 +241,15 @@ public class FaultyMobStoreCompactor extends 
DefaultMobStoreCompactor {
               if (size > mobSizeThreshold) {
                 // If the value size is larger than the threshold, it's 
regarded as a mob. Since
                 // its value is already in the mob file, directly write this 
cell to the store file
-                writer.append(c);
-                // Add MOB reference to a set
-                mobRefSet.get().add(MobUtils.getMobFileName(c));
+                Optional<TableName> refTable = MobUtils.getTableName(c);
+                if (refTable.isPresent()) {
+                  mobRefSet.get().put(refTable.get(), 
MobUtils.getMobFileName(c));
+                  writer.append(c);
+                } else {
+                  throw new IOException("MOB cell did not contain a tablename "
+                    + "tag. should not be possible. see ref guide on mob 
troubleshooting. "
+                    + "store=" + getStoreInfo() + " cell=" + c);
+                }
               } else {
                 // If the value is not larger than the threshold, it's not 
regarded a mob. Retrieve
                 // the mob cell from the mob file, and write it back to the 
store file.
@@ -260,9 +265,15 @@ public class FaultyMobStoreCompactor extends 
DefaultMobStoreCompactor {
                   // directly write the cell to the store file, and leave it 
to be handled by the
                   // next compaction.
                   LOG.error("Empty value for: " + c);
-                  writer.append(c);
-                  // Add MOB reference to a set
-                  mobRefSet.get().add(MobUtils.getMobFileName(c));
+                  Optional<TableName> refTable = MobUtils.getTableName(c);
+                  if (refTable.isPresent()) {
+                    mobRefSet.get().put(refTable.get(), 
MobUtils.getMobFileName(c));
+                    writer.append(c);
+                  } else {
+                    throw new IOException("MOB cell did not contain a 
tablename "
+                      + "tag. should not be possible. see ref guide on mob 
troubleshooting. "
+                      + "store=" + getStoreInfo() + " cell=" + c);
+                  }
                 }
               }
             } else {
@@ -285,7 +296,7 @@ public class FaultyMobStoreCompactor extends 
DefaultMobStoreCompactor {
             cellsCountCompactedToMob++;
             cellsSizeCompactedToMob += c.getValueLength();
             // Add ref we get for compact MOB case
-            mobRefSet.get().add(mobFileWriter.getPath().getName());
+            mobRefSet.get().put(store.getTableName(), 
mobFileWriter.getPath().getName());
           }
 
           int len = c.getSerializedSize();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
index cfe2db13ca3..47fcde9a233 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
@@ -19,15 +19,10 @@ package org.apache.hadoop.hbase.mob;
 
 import java.io.IOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Mob file compaction chore in a generational non-batch mode test. 1. Uses 
default (non-batch) mode
@@ -39,37 +34,23 @@ import org.slf4j.LoggerFactory;
  * than minimum age to archive 10. Runs Mob cleaner chore 11 Verifies that 
number of MOB files in a
  * mob directory is 20. 12 Runs scanner and checks all 3 * 1000 rows.
  */
-@SuppressWarnings("deprecation")
 @Category(LargeTests.class)
-public class TestMobCompactionOptMode extends TestMobCompactionBase {
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestMobCompactionOptMode.class);
+public class TestMobCompactionOptMode extends TestMobCompactionWithDefaults {
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestMobCompactionOptMode.class);
 
-  public TestMobCompactionOptMode() {
-  }
-
-  @Override
-  protected void initConf() {
-    super.initConf();
+  @BeforeClass
+  public static void configureOptimizedCompaction() throws 
InterruptedException, IOException {
+    HTU.shutdownMiniHBaseCluster();
     conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, 
MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
     conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
-  }
-
-  @Test
-  public void testMobFileCompactionBatchMode() throws InterruptedException, 
IOException {
-    LOG.info("MOB compaction generational (non-batch) mode started");
-    baseTestMobFileCompaction();
-    LOG.info("MOB compaction generational (non-batch) mode finished OK");
-
+    HTU.startMiniHBaseCluster();
   }
 
   @Override
-  protected void mobCompact(Admin admin, HTableDescriptor hdt, 
HColumnDescriptor hcd)
-    throws IOException, InterruptedException {
-    // Major compact MOB table
-    admin.majorCompact(hdt.getTableName(), hcd.getName());
+  protected String description() {
+    return "generational (non-batch) mode";
   }
 
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
index 7ea74a1617a..7b6b44d0e31 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
@@ -19,13 +19,12 @@ package org.apache.hadoop.hbase.mob;
 
 import java.io.IOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,47 +39,43 @@ import org.slf4j.LoggerFactory;
  * time larger than minimum age to archive 10. Runs Mob cleaner chore 11 
Verifies that number of MOB
  * files in a mob directory is 20. 12 Runs scanner and checks all 3 * 1000 
rows.
  */
-@SuppressWarnings("deprecation")
 @Category(LargeTests.class)
-public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionBase 
{
+public class TestMobCompactionOptRegionBatchMode extends 
TestMobCompactionWithDefaults {
   private static final Logger LOG =
     LoggerFactory.getLogger(TestMobCompactionOptRegionBatchMode.class);
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestMobCompactionOptRegionBatchMode.class);
 
-  private int batchSize = 7;
+  private static final int batchSize = 7;
   private MobFileCompactionChore compactionChore;
 
-  public TestMobCompactionOptRegionBatchMode() {
-  }
-
   @Before
   public void setUp() throws Exception {
     super.setUp();
     compactionChore = new MobFileCompactionChore(conf, batchSize);
   }
 
-  protected void initConf() {
-    super.initConf();
+  @BeforeClass
+  public static void configureOptimizedCompactionAndBatches()
+    throws InterruptedException, IOException {
+    HTU.shutdownMiniHBaseCluster();
     conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, 
batchSize);
     conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, 
MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
     conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
+    HTU.startMiniHBaseCluster();
   }
 
-  @Test
-  public void testMobFileCompactionBatchMode() throws InterruptedException, 
IOException {
-    LOG.info("MOB compaction chore generational batch mode started");
-    baseTestMobFileCompaction();
-    LOG.info("MOB compaction chore generational batch mode finished OK");
-
+  @Override
+  protected void mobCompactImpl(TableDescriptor tableDescriptor,
+    ColumnFamilyDescriptor familyDescriptor) throws IOException, 
InterruptedException {
+    LOG.debug("compacting {} in batch mode.", tableDescriptor.getTableName());
+    compactionChore.performMajorCompactionInBatches(admin, tableDescriptor, 
familyDescriptor);
   }
 
   @Override
-  protected void mobCompact(Admin admin, HTableDescriptor hdt, 
HColumnDescriptor hcd)
-    throws IOException, InterruptedException {
-    // Major compact with batch mode enabled
-    compactionChore.performMajorCompactionInBatches(admin, hdt, hcd);
+  protected String description() {
+    return "generational batch mode";
   }
 
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
index 9b8006d1be5..3d6eaa0a25a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
@@ -19,13 +19,12 @@ package org.apache.hadoop.hbase.mob;
 
 import java.io.IOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,45 +39,40 @@ import org.slf4j.LoggerFactory;
  * to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB files 
in a mob directory is
  * 20. 12 Runs scanner and checks all 3 * 1000 rows.
  */
-@SuppressWarnings("deprecation")
 @Category(LargeTests.class)
-public class TestMobCompactionRegularRegionBatchMode extends 
TestMobCompactionBase {
+public class TestMobCompactionRegularRegionBatchMode extends 
TestMobCompactionWithDefaults {
   private static final Logger LOG =
     LoggerFactory.getLogger(TestMobCompactionRegularRegionBatchMode.class);
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestMobCompactionRegularRegionBatchMode.class);
 
-  private int batchSize = 7;
+  private static final int batchSize = 7;
   private MobFileCompactionChore compactionChore;
 
-  public TestMobCompactionRegularRegionBatchMode() {
-  }
-
   @Before
   public void setUp() throws Exception {
     super.setUp();
     compactionChore = new MobFileCompactionChore(conf, batchSize);
   }
 
-  protected void initConf() {
-    super.initConf();
+  @BeforeClass
+  public static void configureCompactionBatches() throws InterruptedException, 
IOException {
+    HTU.shutdownMiniHBaseCluster();
     conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, 
batchSize);
+    HTU.startMiniHBaseCluster();
   }
 
-  @Test
-  public void testMobFileCompactionBatchMode() throws InterruptedException, 
IOException {
-    LOG.info("MOB compaction chore regular batch mode started");
-    baseTestMobFileCompaction();
-    LOG.info("MOB compaction chore regular batch mode finished OK");
-
+  @Override
+  protected void mobCompactImpl(TableDescriptor tableDescriptor,
+    ColumnFamilyDescriptor familyDescriptor) throws IOException, 
InterruptedException {
+    LOG.debug("compacting {} in batch mode.", tableDescriptor.getTableName());
+    compactionChore.performMajorCompactionInBatches(admin, tableDescriptor, 
familyDescriptor);
   }
 
   @Override
-  protected void mobCompact(Admin admin, HTableDescriptor hdt, 
HColumnDescriptor hcd)
-    throws IOException, InterruptedException {
-    // Major compact with batch mode enabled
-    compactionChore.performMajorCompactionInBatches(admin, hdt, hcd);
+  protected String description() {
+    return "regular batch mode";
   }
 
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
new file mode 100644
index 00000000000..23e7c233496
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.CompactionState;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mob file compaction base test. 1. Enables batch mode for regular MOB 
compaction, Sets batch size
+ * to 7 regions. (Optional) 2. Disables periodic MOB compactions, sets minimum 
age to archive to 10
+ * sec 3. Creates MOB table with 20 regions 4. Loads MOB data (randomized 
keys, 1000 rows), flushes
+ * data. 5. Repeats 4. two more times 6. Verifies that we have 20 *3 = 60 mob 
files (equals to
+ * number of regions x 3) 7. Runs major MOB compaction. 8. Verifies that 
number of MOB files in a
+ * mob directory is 20 x4 = 80 9. Waits for a period of time larger than 
minimum age to archive 10.
+ * Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob 
directory is 20. 12 Runs
+ * scanner and checks all 3 * 1000 rows.
+ */
+@Category(LargeTests.class)
+public class TestMobCompactionWithDefaults {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestMobCompactionWithDefaults.class);
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestMobCompactionWithDefaults.class);
+
+  protected static HBaseTestingUtility HTU;
+  protected static Configuration conf;
+  protected static long minAgeToArchive = 10000;
+
+  protected final static String famStr = "f1";
+  protected final static byte[] fam = Bytes.toBytes(famStr);
+  protected final static byte[] qualifier = Bytes.toBytes("q1");
+  protected final static long mobLen = 10;
+  protected final static byte[] mobVal = Bytes
+    
.toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
+
+  @Rule
+  public TestName test = new TestName();
+  protected TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor;
+  protected ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor 
familyDescriptor;
+  protected Admin admin;
+  protected TableName table = null;
+  protected int numRegions = 20;
+  protected int rows = 1000;
+
+  protected MobFileCleanerChore cleanerChore;
+
+  @BeforeClass
+  public static void htuStart() throws Exception {
+    HTU = new HBaseTestingUtility();
+    conf = HTU.getConfiguration();
+    conf.setInt("hfile.format.version", 3);
+    // Disable automatic MOB compaction
+    conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
+    // Disable automatic MOB file cleaner chore
+    conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
+    // Set minimum age to archive to 10 sec
+    conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
+    // Set compacted file discharger interval to a half minAgeToArchive
+    conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive 
/ 2);
+    conf.setBoolean("hbase.regionserver.compaction.enabled", false);
+    HTU.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void htuStop() throws Exception {
+    HTU.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    tableDescriptor = 
HTU.createModifyableTableDescriptor(test.getMethodName());
+    admin = HTU.getAdmin();
+    cleanerChore = new MobFileCleanerChore();
+    familyDescriptor = new 
ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
+    familyDescriptor.setMobEnabled(true);
+    familyDescriptor.setMobThreshold(mobLen);
+    familyDescriptor.setMaxVersions(1);
+    tableDescriptor.setColumnFamily(familyDescriptor);
+    RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
+    byte[][] splitKeys = splitAlgo.split(numRegions);
+    table = HTU.createTable(tableDescriptor, splitKeys).getName();
+  }
+
+  private void loadData(TableName tableName, int num) {
+
+    Random r = new Random();
+    LOG.info("Started loading {} rows into {}", num, tableName);
+    try (final Table table = HTU.getConnection().getTable(tableName)) {
+      for (int i = 0; i < num; i++) {
+        byte[] key = new byte[32];
+        r.nextBytes(key);
+        Put p = new Put(key);
+        p.addColumn(fam, qualifier, mobVal);
+        table.put(p);
+      }
+      admin.flush(tableName);
+      LOG.info("Finished loading {} rows into {}", num, tableName);
+    } catch (Exception e) {
+      LOG.error("MOB file compaction chore test FAILED", e);
+      fail("MOB file compaction chore test FAILED");
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    admin.disableTable(tableDescriptor.getTableName());
+    admin.deleteTable(tableDescriptor.getTableName());
+  }
+
+  @Test
+  public void baseTestMobFileCompaction() throws InterruptedException, 
IOException {
+    LOG.info("MOB compaction " + description() + " started");
+    loadAndFlushThreeTimes(rows, table, famStr);
+    mobCompact(tableDescriptor, familyDescriptor);
+    assertEquals("Should have 4 MOB files per region due to 3xflush + 
compaction.", numRegions * 4,
+      getNumberOfMobFiles(table, famStr));
+    cleanupAndVerifyCounts(table, famStr, 3 * rows);
+    LOG.info("MOB compaction " + description() + " finished OK");
+  }
+
+  @Test
+  public void testMobFileCompactionAfterSnapshotClone() throws 
InterruptedException, IOException {
+    final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
+    LOG.info("MOB compaction of cloned snapshot, " + description() + " 
started");
+    loadAndFlushThreeTimes(rows, table, famStr);
+    LOG.debug("Taking snapshot and cloning table {}", table);
+    admin.snapshot(test.getMethodName(), table);
+    admin.cloneSnapshot(test.getMethodName(), clone);
+    assertEquals("Should have 3 hlinks per region in MOB area from snapshot 
clone", 3 * numRegions,
+      getNumberOfMobFiles(clone, famStr));
+    mobCompact(admin.getDescriptor(clone), familyDescriptor);
+    assertEquals("Should have 3 hlinks + 1 MOB file per region due to clone + 
compact",
+      4 * numRegions, getNumberOfMobFiles(clone, famStr));
+    cleanupAndVerifyCounts(clone, famStr, 3 * rows);
+    LOG.info("MOB compaction of cloned snapshot, " + description() + " 
finished OK");
+  }
+
+  @Test
+  public void testMobFileCompactionAfterSnapshotCloneAndFlush()
+    throws InterruptedException, IOException {
+    final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
+    LOG.info("MOB compaction of cloned snapshot after flush, " + description() 
+ " started");
+    loadAndFlushThreeTimes(rows, table, famStr);
+    LOG.debug("Taking snapshot and cloning table {}", table);
+    admin.snapshot(test.getMethodName(), table);
+    admin.cloneSnapshot(test.getMethodName(), clone);
+    assertEquals("Should have 3 hlinks per region in MOB area from snapshot 
clone", 3 * numRegions,
+      getNumberOfMobFiles(clone, famStr));
+    loadAndFlushThreeTimes(rows, clone, famStr);
+    mobCompact(admin.getDescriptor(clone), familyDescriptor);
+    assertEquals("Should have 7 MOB file per region due to clone + 3xflush + 
compact",
+      7 * numRegions, getNumberOfMobFiles(clone, famStr));
+    cleanupAndVerifyCounts(clone, famStr, 6 * rows);
+    LOG.info("MOB compaction of cloned snapshot w flush, " + description() + " 
finished OK");
+  }
+
+  protected void loadAndFlushThreeTimes(int rows, TableName table, String 
family)
+    throws IOException {
+    final long start = getNumberOfMobFiles(table, family);
+    // Load and flush data 3 times
+    loadData(table, rows);
+    loadData(table, rows);
+    loadData(table, rows);
+    assertEquals("Should have 3 more mob files per region from flushing.", 
start + numRegions * 3,
+      getNumberOfMobFiles(table, family));
+  }
+
+  protected String description() {
+    return "regular mode";
+  }
+
+  protected void enableCompactions() throws IOException {
+    final List<String> serverList =
+      admin.getRegionServers().stream().map(sn -> 
sn.getServerName()).collect(Collectors.toList());
+    admin.compactionSwitch(true, serverList);
+  }
+
+  protected void disableCompactions() throws IOException {
+    final List<String> serverList =
+      admin.getRegionServers().stream().map(sn -> 
sn.getServerName()).collect(Collectors.toList());
+    admin.compactionSwitch(false, serverList);
+  }
+
+  /**
+   * compact the given table and return once it is done. should presume 
compactions are disabled
+   * when called. should ensure compactions are disabled before returning.
+   */
+  protected void mobCompact(TableDescriptor tableDescriptor,
+    ColumnFamilyDescriptor familyDescriptor) throws IOException, 
InterruptedException {
+    LOG.debug("Major compact MOB table " + tableDescriptor.getTableName());
+    enableCompactions();
+    mobCompactImpl(tableDescriptor, familyDescriptor);
+    waitUntilCompactionIsComplete(tableDescriptor.getTableName());
+    disableCompactions();
+  }
+
+  /**
+   * Call the API for compaction specific to the test set. should not wait for 
compactions to
+   * finish. may assume compactions are enabled when called.
+   */
+  protected void mobCompactImpl(TableDescriptor tableDescriptor,
+    ColumnFamilyDescriptor familyDescriptor) throws IOException, 
InterruptedException {
+    admin.majorCompact(tableDescriptor.getTableName(), 
familyDescriptor.getName());
+  }
+
+  protected void waitUntilCompactionIsComplete(TableName table)
+    throws IOException, InterruptedException {
+    CompactionState state = admin.getCompactionState(table);
+    while (state != CompactionState.NONE) {
+      LOG.debug("Waiting for compaction on {} to complete. current state {}", 
table, state);
+      Thread.sleep(100);
+      state = admin.getCompactionState(table);
+    }
+    LOG.debug("done waiting for compaction on {}", table);
+  }
+
+  protected void cleanupAndVerifyCounts(TableName table, String family, int 
rows)
+    throws InterruptedException, IOException {
+    // We have guarantee, that compacted file discharger will run during this 
pause
+    // because it has interval less than this wait time
+    LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
+
+    Thread.sleep(minAgeToArchive + 1000);
+    LOG.info("Cleaning up MOB files");
+    // Cleanup again
+    cleanerChore.cleanupObsoleteMobFiles(conf, table);
+
+    assertEquals("After cleaning, we should have 1 MOB file per region based 
on size.", numRegions,
+      getNumberOfMobFiles(table, family));
+
+    LOG.debug("checking count of rows");
+    long scanned = scanTable(table);
+    assertEquals("Got the wrong number of rows in table " + table + " cf " + 
family, rows, scanned);
+
+  }
+
+  protected long getNumberOfMobFiles(TableName tableName, String family) 
throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
+    FileStatus[] stat = fs.listStatus(dir);
+    for (FileStatus st : stat) {
+      LOG.debug("MOB Directory content: {}", st.getPath());
+    }
+    LOG.debug("MOB Directory content total files: {}", stat.length);
+
+    return stat.length;
+  }
+
+  protected long scanTable(TableName tableName) {
+    try (final Table table = HTU.getConnection().getTable(tableName);
+      final ResultScanner scanner = table.getScanner(fam)) {
+      Result result;
+      long counter = 0;
+      while ((result = scanner.next()) != null) {
+        assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
+        counter++;
+      }
+      return counter;
+    } catch (Exception e) {
+      LOG.error("MOB file compaction test FAILED", e);
+      if (HTU != null) {
+        fail(e.getMessage());
+      } else {
+        System.exit(-1);
+      }
+    }
+    return 0;
+  }
+}

Reply via email to