http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
new file mode 100644
index 0000000..d63bb95
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
@@ -0,0 +1,924 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.compactions;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.security.Key;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
+import org.apache.hadoop.hbase.io.crypto.aes.AES;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.compactions.MobCompactor;
+import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.security.EncryptionUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestMobCompactor {
+  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  private Configuration conf = null;
+  private String tableNameAsString;
+  private TableName tableName;
+  private static Connection conn;
+  private BufferedMutator bufMut;
+  private Table hTable;
+  private Admin admin;
+  private HTableDescriptor desc;
+  private HColumnDescriptor hcd1;
+  private HColumnDescriptor hcd2;
+  private FileSystem fs;
+  private static final String family1 = "family1";
+  private static final String family2 = "family2";
+  private static final String qf1 = "qualifier1";
+  private static final String qf2 = "qualifier2";
+  private static byte[] KEYS = Bytes.toBytes("012");
+  private static int regionNum = KEYS.length;
+  private static int delRowNum = 1;
+  private static int delCellNum = 6;
+  private static int cellNumPerRow = 3;
+  private static int rowNumPerFile = 2;
+  private static ExecutorService pool;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", 
true);
+    TEST_UTIL.getConfiguration()
+      .setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, 5000);
+    TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY,
+      KeyProviderForTesting.class.getName());
+    
TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, 
"hbase");
+    TEST_UTIL.startMiniCluster(1);
+    pool = createThreadPool(TEST_UTIL.getConfiguration());
+    conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), 
pool);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    pool.shutdown();
+    conn.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    fs = TEST_UTIL.getTestFileSystem();
+    conf = TEST_UTIL.getConfiguration();
+    long tid = System.currentTimeMillis();
+    tableNameAsString = "testMob" + tid;
+    tableName = TableName.valueOf(tableNameAsString);
+    hcd1 = new HColumnDescriptor(family1);
+    hcd1.setMobEnabled(true);
+    hcd1.setMobThreshold(0L);
+    hcd1.setMaxVersions(4);
+    hcd2 = new HColumnDescriptor(family2);
+    hcd2.setMobEnabled(true);
+    hcd2.setMobThreshold(0L);
+    hcd2.setMaxVersions(4);
+    desc = new HTableDescriptor(tableName);
+    desc.addFamily(hcd1);
+    desc.addFamily(hcd2);
+    admin = TEST_UTIL.getHBaseAdmin();
+    admin.createTable(desc, getSplitKeys());
+    hTable = conn.getTable(tableName);
+    bufMut = conn.getBufferedMutator(tableName);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    admin.disableTable(tableName);
+    admin.deleteTable(tableName);
+    admin.close();
+    hTable.close();
+    fs.delete(TEST_UTIL.getDataTestDir(), true);
+  }
+
+  @Test
+  public void testCompactionWithoutDelFilesWithNamespace() throws Exception {
+    resetConf();
+    // create a table with namespace
+    NamespaceDescriptor namespaceDescriptor = 
NamespaceDescriptor.create("ns").build();
+    String tableNameAsString = "ns:testCompactionWithoutDelFilesWithNamespace";
+    admin.createNamespace(namespaceDescriptor);
+    TableName tableName = TableName.valueOf(tableNameAsString);
+    HColumnDescriptor hcd1 = new HColumnDescriptor(family1);
+    hcd1.setMobEnabled(true);
+    hcd1.setMobThreshold(0L);
+    hcd1.setMaxVersions(4);
+    HColumnDescriptor hcd2 = new HColumnDescriptor(family2);
+    hcd2.setMobEnabled(true);
+    hcd2.setMobThreshold(0L);
+    hcd2.setMaxVersions(4);
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(hcd1);
+    desc.addFamily(hcd2);
+    admin.createTable(desc, getSplitKeys());
+    BufferedMutator bufMut= conn.getBufferedMutator(tableName);
+    Table table = conn.getTable(tableName);
+
+    int count = 4;
+    // generate mob files
+    loadData(admin, bufMut, tableName, count, rowNumPerFile);
+    int rowNumPerRegion = count * rowNumPerFile;
+
+    assertEquals("Before compaction: mob rows count", regionNum * 
rowNumPerRegion,
+      countMobRows(table));
+    assertEquals("Before compaction: mob file count", regionNum * count,
+      countFiles(tableName, true, family1));
+    assertEquals("Before compaction: del file count", 0, countFiles(tableName, 
false, family1));
+
+    MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, 
hcd1, pool);
+    compactor.compact();
+
+    assertEquals("After compaction: mob rows count", regionNum * 
rowNumPerRegion,
+      countMobRows(table));
+    assertEquals("After compaction: mob file count", regionNum,
+      countFiles(tableName, true, family1));
+    assertEquals("After compaction: del file count", 0, countFiles(tableName, 
false, family1));
+
+    table.close();
+    admin.disableTable(tableName);
+    admin.deleteTable(tableName);
+    admin.deleteNamespace("ns");
+  }
+
+  @Test
+  public void testCompactionWithoutDelFiles() throws Exception {
+    resetConf();
+    int count = 4;
+    // generate mob files
+    loadData(admin, bufMut, tableName, count, rowNumPerFile);
+    int rowNumPerRegion = count*rowNumPerFile;
+
+    assertEquals("Before compaction: mob rows count", 
regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("Before compaction: mob file count", regionNum * count,
+      countFiles(tableName, true, family1));
+    assertEquals("Before compaction: del file count", 0, countFiles(tableName, 
false, family1));
+
+    MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, 
hcd1, pool);
+    compactor.compact();
+
+    assertEquals("After compaction: mob rows count", regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("After compaction: mob file count", regionNum,
+      countFiles(tableName, true, family1));
+    assertEquals("After compaction: del file count", 0, countFiles(tableName, 
false, family1));
+  }
+
+  @Test
+  public void testCompactionWithoutDelFilesAndWithEncryption() throws 
Exception {
+    resetConf();
+    Configuration conf = TEST_UTIL.getConfiguration();
+    SecureRandom rng = new SecureRandom();
+    byte[] keyBytes = new byte[AES.KEY_LENGTH];
+    rng.nextBytes(keyBytes);
+    String algorithm = conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, 
HConstants.CIPHER_AES);
+    Key cfKey = new SecretKeySpec(keyBytes, algorithm);
+    byte[] encryptionKey = EncryptionUtil.wrapKey(conf,
+      conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, 
User.getCurrent().getShortName()), cfKey);
+    String tableNameAsString = 
"testCompactionWithoutDelFilesAndWithEncryption";
+    TableName tableName = TableName.valueOf(tableNameAsString);
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = new HColumnDescriptor(family1);
+    hcd.setMobEnabled(true);
+    hcd.setMobThreshold(0);
+    hcd.setMaxVersions(4);
+    hcd.setEncryptionType(algorithm);
+    hcd.setEncryptionKey(encryptionKey);
+    HColumnDescriptor hcd2 = new HColumnDescriptor(family2);
+    hcd2.setMobEnabled(true);
+    hcd2.setMobThreshold(0);
+    hcd2.setMaxVersions(4);
+    desc.addFamily(hcd);
+    desc.addFamily(hcd2);
+    admin.createTable(desc, getSplitKeys());
+    Table hTable = conn.getTable(tableName);
+    BufferedMutator bufMut = conn.getBufferedMutator(tableName);
+    int count = 4;
+    // generate mob files
+    loadData(admin, bufMut, tableName, count, rowNumPerFile);
+    int rowNumPerRegion = count*rowNumPerFile;
+
+    assertEquals("Before compaction: mob rows count", 
regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("Before compaction: mob file count", regionNum * count,
+      countFiles(tableName, true, family1));
+    assertEquals("Before compaction: del file count", 0, countFiles(tableName, 
false, family1));
+
+    MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, 
hcd, pool);
+    compactor.compact();
+
+    assertEquals("After compaction: mob rows count", regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("After compaction: mob file count", regionNum,
+      countFiles(tableName, true, family1));
+    assertEquals("After compaction: del file count", 0, countFiles(tableName, 
false, family1));
+    Assert.assertTrue(verifyEncryption(tableName, family1));
+  }
+
+  @Test
+  public void testCompactionWithDelFiles() throws Exception {
+    resetConf();
+    int count = 4;
+    // generate mob files
+    loadData(admin, bufMut, tableName, count, rowNumPerFile);
+    int rowNumPerRegion = count*rowNumPerFile;
+
+    assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("Before deleting: mob cells count", 
regionNum*cellNumPerRow*rowNumPerRegion,
+        countMobCells(hTable));
+    assertEquals("Before deleting: family1 mob file count", regionNum*count,
+        countFiles(tableName, true, family1));
+    assertEquals("Before deleting: family2 mob file count", regionNum*count,
+        countFiles(tableName, true, family2));
+
+    createDelFile();
+
+    assertEquals("Before compaction: mob rows count", 
regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("Before compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), 
countMobCells(hTable));
+    assertEquals("Before compaction: family1 mob file count", regionNum*count,
+        countFiles(tableName, true, family1));
+    assertEquals("Before compaction: family2 file count", regionNum*count,
+        countFiles(tableName, true, family2));
+    assertEquals("Before compaction: family1 del file count", regionNum,
+        countFiles(tableName, false, family1));
+    assertEquals("Before compaction: family2 del file count", regionNum,
+        countFiles(tableName, false, family2));
+
+    // do the mob file compaction
+    MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, 
hcd1, pool);
+    compactor.compact();
+
+    assertEquals("After compaction: mob rows count", 
regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("After compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), 
countMobCells(hTable));
+    assertEquals("After compaction: family1 mob file count", regionNum,
+        countFiles(tableName, true, family1));
+    assertEquals("After compaction: family2 mob file count", regionNum*count,
+        countFiles(tableName, true, family2));
+    assertEquals("After compaction: family1 del file count", 0,
+      countFiles(tableName, false, family1));
+    assertEquals("After compaction: family2 del file count", regionNum,
+        countFiles(tableName, false, family2));
+    assertRefFileNameEqual(family1);
+  }
+
+  @Test
+  public void testCompactionWithDelFilesAndNotMergeAllFiles() throws Exception 
{
+    resetConf();
+    int mergeSize = 5000;
+    // change the mob compaction merge size
+    conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
+
+    int count = 4;
+    // generate mob files
+    loadData(admin, bufMut, tableName, count, rowNumPerFile);
+    int rowNumPerRegion = count*rowNumPerFile;
+
+    assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("Before deleting: mob cells count", 
regionNum*cellNumPerRow*rowNumPerRegion,
+        countMobCells(hTable));
+    assertEquals("Before deleting: mob file count", regionNum * count,
+      countFiles(tableName, true, family1));
+
+    int largeFilesCount = countLargeFiles(mergeSize, family1);
+    createDelFile();
+
+    assertEquals("Before compaction: mob rows count", 
regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("Before compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), 
countMobCells(hTable));
+    assertEquals("Before compaction: family1 mob file count", regionNum*count,
+        countFiles(tableName, true, family1));
+    assertEquals("Before compaction: family2 mob file count", regionNum*count,
+        countFiles(tableName, true, family2));
+    assertEquals("Before compaction: family1 del file count", regionNum,
+        countFiles(tableName, false, family1));
+    assertEquals("Before compaction: family2 del file count", regionNum,
+        countFiles(tableName, false, family2));
+
+    // do the mob file compaction
+    MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, 
hcd1, pool);
+    compactor.compact();
+
+    assertEquals("After compaction: mob rows count", 
regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("After compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), 
countMobCells(hTable));
+    // After the compaction, the files smaller than the mob compaction merge 
size
+    // is merge to one file
+    assertEquals("After compaction: family1 mob file count", largeFilesCount + 
regionNum,
+        countFiles(tableName, true, family1));
+    assertEquals("After compaction: family2 mob file count", regionNum*count,
+        countFiles(tableName, true, family2));
+    assertEquals("After compaction: family1 del file count", regionNum,
+        countFiles(tableName, false, family1));
+    assertEquals("After compaction: family2 del file count", regionNum,
+        countFiles(tableName, false, family2));
+  }
+
+  @Test
+  public void testCompactionWithDelFilesAndWithSmallCompactionBatchSize() 
throws Exception {
+    resetConf();
+    int batchSize = 2;
+    conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, batchSize);
+    int count = 4;
+    // generate mob files
+    loadData(admin, bufMut, tableName, count, rowNumPerFile);
+    int rowNumPerRegion = count*rowNumPerFile;
+
+    assertEquals("Before deleting: mob row count", regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("Before deleting: family1 mob file count", regionNum*count,
+        countFiles(tableName, true, family1));
+    assertEquals("Before deleting: family2 mob file count", regionNum*count,
+        countFiles(tableName, true, family2));
+
+    createDelFile();
+
+    assertEquals("Before compaction: mob rows count", 
regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("Before compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), 
countMobCells(hTable));
+    assertEquals("Before compaction: family1 mob file count", regionNum*count,
+        countFiles(tableName, true, family1));
+    assertEquals("Before compaction: family2 mob file count", regionNum*count,
+        countFiles(tableName, true, family2));
+    assertEquals("Before compaction: family1 del file count", regionNum,
+        countFiles(tableName, false, family1));
+    assertEquals("Before compaction: family2 del file count", regionNum,
+        countFiles(tableName, false, family2));
+
+    // do the mob compaction
+    MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, 
hcd1, pool);
+    compactor.compact();
+
+    assertEquals("After compaction: mob rows count", 
regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("After compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), 
countMobCells(hTable));
+    assertEquals("After compaction: family1 mob file count", 
regionNum*(count/batchSize),
+        countFiles(tableName, true, family1));
+    assertEquals("After compaction: family2 mob file count", regionNum*count,
+        countFiles(tableName, true, family2));
+    assertEquals("After compaction: family1 del file count", 0,
+      countFiles(tableName, false, family1));
+    assertEquals("After compaction: family2 del file count", regionNum,
+        countFiles(tableName, false, family2));
+  }
+
+  @Test
+  public void testCompactionWithHFileLink() throws IOException, 
InterruptedException {
+    resetConf();
+    int count = 4;
+    // generate mob files
+    loadData(admin, bufMut, tableName, count, rowNumPerFile);
+    int rowNumPerRegion = count*rowNumPerFile;
+
+    long tid = System.currentTimeMillis();
+    byte[] snapshotName1 = Bytes.toBytes("snaptb-" + tid);
+    // take a snapshot
+    admin.snapshot(snapshotName1, tableName);
+
+    createDelFile();
+
+    assertEquals("Before compaction: mob rows count", 
regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("Before compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), 
countMobCells(hTable));
+    assertEquals("Before compaction: family1 mob file count", regionNum*count,
+        countFiles(tableName, true, family1));
+    assertEquals("Before compaction: family2 mob file count", regionNum*count,
+        countFiles(tableName, true, family2));
+    assertEquals("Before compaction: family1 del file count", regionNum,
+        countFiles(tableName, false, family1));
+    assertEquals("Before compaction: family2 del file count", regionNum,
+        countFiles(tableName, false, family2));
+
+    // do the mob compaction
+    MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, 
hcd1, pool);
+    compactor.compact();
+
+    assertEquals("After first compaction: mob rows count", 
regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("After first compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), 
countMobCells(hTable));
+    assertEquals("After first compaction: family1 mob file count", regionNum,
+        countFiles(tableName, true, family1));
+    assertEquals("After first compaction: family2 mob file count", 
regionNum*count,
+        countFiles(tableName, true, family2));
+    assertEquals("After first compaction: family1 del file count", 0,
+      countFiles(tableName, false, family1));
+    assertEquals("After first compaction: family2 del file count", regionNum,
+        countFiles(tableName, false, family2));
+    assertEquals("After first compaction: family1 hfilelink count", 0, 
countHFileLinks(family1));
+    assertEquals("After first compaction: family2 hfilelink count", 0, 
countHFileLinks(family2));
+
+    admin.disableTable(tableName);
+    // Restore from snapshot, the hfilelink will exist in mob dir
+    admin.restoreSnapshot(snapshotName1);
+    admin.enableTable(tableName);
+
+    assertEquals("After restoring snapshot: mob rows count", 
regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("After restoring snapshot: mob cells count",
+        regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable));
+    assertEquals("After restoring snapshot: family1 mob file count", 
regionNum*count,
+        countFiles(tableName, true, family1));
+    assertEquals("After restoring snapshot: family2 mob file count", 
regionNum*count,
+        countFiles(tableName, true, family2));
+    assertEquals("After restoring snapshot: family1 del file count", 0,
+        countFiles(tableName, false, family1));
+    assertEquals("After restoring snapshot: family2 del file count", 0,
+        countFiles(tableName, false, family2));
+    assertEquals("After restoring snapshot: family1 hfilelink count", 
regionNum*count,
+        countHFileLinks(family1));
+    assertEquals("After restoring snapshot: family2 hfilelink count", 0,
+        countHFileLinks(family2));
+
+    compactor.compact();
+
+    assertEquals("After second compaction: mob rows count", 
regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("After second compaction: mob cells count",
+        regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable));
+    assertEquals("After second compaction: family1 mob file count", regionNum,
+        countFiles(tableName, true, family1));
+    assertEquals("After second compaction: family2 mob file count", 
regionNum*count,
+        countFiles(tableName, true, family2));
+    assertEquals("After second compaction: family1 del file count", 0,
+      countFiles(tableName, false, family1));
+    assertEquals("After second compaction: family2 del file count", 0,
+      countFiles(tableName, false, family2));
+    assertEquals("After second compaction: family1 hfilelink count", 0, 
countHFileLinks(family1));
+    assertEquals("After second compaction: family2 hfilelink count", 0, 
countHFileLinks(family2));
+    assertRefFileNameEqual(family1);
+  }
+
+  @Test
+  public void testCompactionFromAdmin() throws Exception {
+    int count = 4;
+    // generate mob files
+    loadData(admin, bufMut, tableName, count, rowNumPerFile);
+    int rowNumPerRegion = count*rowNumPerFile;
+
+    assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("Before deleting: mob cells count", 
regionNum*cellNumPerRow*rowNumPerRegion,
+        countMobCells(hTable));
+    assertEquals("Before deleting: family1 mob file count", regionNum*count,
+        countFiles(tableName, true, family1));
+    assertEquals("Before deleting: family2 mob file count", regionNum*count,
+        countFiles(tableName, true, family2));
+
+    createDelFile();
+
+    assertEquals("Before compaction: mob rows count", 
regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("Before compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), 
countMobCells(hTable));
+    assertEquals("Before compaction: family1 mob file count", regionNum*count,
+        countFiles(tableName, true, family1));
+    assertEquals("Before compaction: family2 file count", regionNum*count,
+        countFiles(tableName, true, family2));
+    assertEquals("Before compaction: family1 del file count", regionNum,
+        countFiles(tableName, false, family1));
+    assertEquals("Before compaction: family2 del file count", regionNum,
+        countFiles(tableName, false, family2));
+
+    int largeFilesCount = countLargeFiles(5000, family1);
+    // do the mob compaction
+    admin.compactMob(tableName, hcd1.getName());
+
+    waitUntilCompactionFinished(tableName);
+    assertEquals("After compaction: mob rows count", regionNum * 
(rowNumPerRegion - delRowNum),
+      countMobRows(hTable));
+    assertEquals("After compaction: mob cells count", regionNum
+      * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(hTable));
+    assertEquals("After compaction: family1 mob file count", regionNum + 
largeFilesCount,
+      countFiles(tableName, true, family1));
+    assertEquals("After compaction: family2 mob file count", regionNum * count,
+      countFiles(tableName, true, family2));
+    assertEquals("After compaction: family1 del file count", regionNum,
+      countFiles(tableName, false, family1));
+    assertEquals("After compaction: family2 del file count", regionNum,
+      countFiles(tableName, false, family2));
+    assertRefFileNameEqual(family1);
+  }
+
+  @Test
+  public void testMajorCompactionFromAdmin() throws Exception {
+    int count = 4;
+    // generate mob files
+    loadData(admin, bufMut, tableName, count, rowNumPerFile);
+    int rowNumPerRegion = count*rowNumPerFile;
+
+    assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("Before deleting: mob cells count", 
regionNum*cellNumPerRow*rowNumPerRegion,
+        countMobCells(hTable));
+    assertEquals("Before deleting: mob file count", regionNum*count,
+        countFiles(tableName, true, family1));
+
+    createDelFile();
+
+    assertEquals("Before compaction: mob rows count", 
regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("Before compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), 
countMobCells(hTable));
+    assertEquals("Before compaction: family1 mob file count", regionNum*count,
+        countFiles(tableName, true, family1));
+    assertEquals("Before compaction: family2 mob file count", regionNum*count,
+        countFiles(tableName, true, family2));
+    assertEquals("Before compaction: family1 del file count", regionNum,
+        countFiles(tableName, false, family1));
+    assertEquals("Before compaction: family2 del file count", regionNum,
+        countFiles(tableName, false, family2));
+
+    // do the major mob compaction, it will force all files to compaction
+    admin.majorCompactMob(tableName, hcd1.getName());
+
+    waitUntilCompactionFinished(tableName);
+    assertEquals("After compaction: mob rows count", 
regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("After compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), 
countMobCells(hTable));
+    assertEquals("After compaction: family1 mob file count", regionNum,
+        countFiles(tableName, true, family1));
+    assertEquals("After compaction: family2 mob file count", regionNum*count,
+        countFiles(tableName, true, family2));
+    assertEquals("After compaction: family1 del file count", 0,
+        countFiles(tableName, false, family1));
+    assertEquals("After compaction: family2 del file count", regionNum,
+        countFiles(tableName, false, family2));
+  }
+
+  private void waitUntilCompactionFinished(TableName tableName) throws 
IOException,
+    InterruptedException {
+    long finished = EnvironmentEdgeManager.currentTime() + 60000;
+    CompactionState state = admin.getMobCompactionState(tableName);
+    while (EnvironmentEdgeManager.currentTime() < finished) {
+      if (state == CompactionState.NONE) {
+        break;
+      }
+      state = admin.getMobCompactionState(tableName);
+      Thread.sleep(10);
+    }
+    assertEquals(CompactionState.NONE, state);
+  }
+
+  /**
+   * Gets the number of rows in the given table.
+   * @param table to get the  scanner
+   * @return the number of rows
+   */
+  private int countMobRows(final Table table) throws IOException {
+    Scan scan = new Scan();
+    // Do not retrieve the mob data when scanning
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    ResultScanner results = table.getScanner(scan);
+    int count = 0;
+    for (Result res : results) {
+      count++;
+    }
+    results.close();
+    return count;
+  }
+
+  /**
+   * Gets the number of cells in the given table.
+   * @param table to get the  scanner
+   * @return the number of cells
+   */
+  private int countMobCells(final Table table) throws IOException {
+    Scan scan = new Scan();
+    // Do not retrieve the mob data when scanning
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    ResultScanner results = table.getScanner(scan);
+    int count = 0;
+    for (Result res : results) {
+      for (Cell cell : res.listCells()) {
+        count++;
+      }
+    }
+    results.close();
+    return count;
+  }
+
+  /**
+   * Gets the number of files in the mob path.
+   * @param isMobFile gets number of the mob files or del files
+   * @param familyName the family name
+   * @return the number of the files
+   */
+  private int countFiles(TableName tableName, boolean isMobFile, String 
familyName)
+    throws IOException {
+    Path mobDirPath = MobUtils.getMobFamilyPath(
+        MobUtils.getMobRegionPath(conf, tableName), familyName);
+    int count = 0;
+    if (fs.exists(mobDirPath)) {
+      FileStatus[] files = fs.listStatus(mobDirPath);
+      for (FileStatus file : files) {
+        if (isMobFile == true) {
+          if (!StoreFileInfo.isDelFile(file.getPath())) {
+            count++;
+          }
+        } else {
+          if (StoreFileInfo.isDelFile(file.getPath())) {
+            count++;
+          }
+        }
+      }
+    }
+    return count;
+  }
+
+  private boolean verifyEncryption(TableName tableName, String familyName) 
throws IOException {
+    Path mobDirPath = 
MobUtils.getMobFamilyPath(MobUtils.getMobRegionPath(conf, tableName),
+      familyName);
+    boolean hasFiles = false;
+    if (fs.exists(mobDirPath)) {
+      FileStatus[] files = fs.listStatus(mobDirPath);
+      hasFiles = files != null && files.length > 0;
+      Assert.assertTrue(hasFiles);
+      Path path = files[0].getPath();
+      CacheConfig cacheConf = new CacheConfig(conf);
+      StoreFile sf = new StoreFile(TEST_UTIL.getTestFileSystem(), path, conf, 
cacheConf,
+        BloomType.NONE);
+      HFile.Reader reader = sf.createReader().getHFileReader();
+      byte[] encryptionKey = reader.getTrailer().getEncryptionKey();
+      Assert.assertTrue(null != encryptionKey);
+      
Assert.assertTrue(reader.getFileContext().getEncryptionContext().getCipher().getName()
+        .equals(HConstants.CIPHER_AES));
+    }
+    return hasFiles;
+  }
+
+  /**
+   * Gets the number of HFileLink in the mob path.
+   * @param familyName the family name
+   * @return the number of the HFileLink
+   */
+  private int countHFileLinks(String familyName) throws IOException {
+    Path mobDirPath = MobUtils.getMobFamilyPath(
+        MobUtils.getMobRegionPath(conf, tableName), familyName);
+    int count = 0;
+    if (fs.exists(mobDirPath)) {
+      FileStatus[] files = fs.listStatus(mobDirPath);
+      for (FileStatus file : files) {
+        if (HFileLink.isHFileLink(file.getPath())) {
+          count++;
+        }
+      }
+    }
+    return count;
+  }
+
+  /**
+   * Gets the number of files.
+   * @param size the size of the file
+   * @param familyName the family name
+   * @return the number of files large than the size
+   */
+  private int countLargeFiles(int size, String familyName) throws IOException {
+    Path mobDirPath = MobUtils.getMobFamilyPath(
+        MobUtils.getMobRegionPath(conf, tableName), familyName);
+    int count = 0;
+    if (fs.exists(mobDirPath)) {
+      FileStatus[] files = fs.listStatus(mobDirPath);
+      for (FileStatus file : files) {
+        // ignore the del files in the mob path
+        if ((!StoreFileInfo.isDelFile(file.getPath()))
+            && (file.getLen() > size)) {
+          count++;
+        }
+      }
+    }
+    return count;
+  }
+
+  /**
+   * loads some data to the table.
+   */
+  private void loadData(Admin admin, BufferedMutator table, TableName 
tableName, int fileNum,
+    int rowNumPerFile) throws IOException, InterruptedException {
+    if (fileNum <= 0) {
+      throw new IllegalArgumentException();
+    }
+    for (byte k0 : KEYS) {
+      byte[] k = new byte[] { k0 };
+      for (int i = 0; i < fileNum * rowNumPerFile; i++) {
+        byte[] key = Bytes.add(k, Bytes.toBytes(i));
+        byte[] mobVal = makeDummyData(10 * (i + 1));
+        Put put = new Put(key);
+        put.setDurability(Durability.SKIP_WAL);
+        put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal);
+        put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal);
+        put.addColumn(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal);
+        table.mutate(put);
+        if ((i + 1) % rowNumPerFile == 0) {
+          table.flush();
+          admin.flush(tableName);
+        }
+      }
+    }
+  }
+
+  /**
+   * delete the row, family and cell to create the del file
+   */
+  private void createDelFile() throws IOException, InterruptedException {
+    for (byte k0 : KEYS) {
+      byte[] k = new byte[] { k0 };
+      // delete a family
+      byte[] key1 = Bytes.add(k, Bytes.toBytes(0));
+      Delete delete1 = new Delete(key1);
+      delete1.addFamily(Bytes.toBytes(family1));
+      hTable.delete(delete1);
+      // delete one row
+      byte[] key2 = Bytes.add(k, Bytes.toBytes(2));
+      Delete delete2 = new Delete(key2);
+      hTable.delete(delete2);
+      // delete one cell
+      byte[] key3 = Bytes.add(k, Bytes.toBytes(4));
+      Delete delete3 = new Delete(key3);
+      delete3.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1));
+      hTable.delete(delete3);
+      admin.flush(tableName);
+      List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(
+          Bytes.toBytes(tableNameAsString));
+      for (HRegion region : regions) {
+        region.waitForFlushesAndCompactions();
+        region.compact(true);
+      }
+    }
+  }
+  /**
+   * Creates the dummy data with a specific size.
+   * @param size the size of value
+   * @return the dummy data
+   */
+  private byte[] makeDummyData(int size) {
+    byte[] dummyData = new byte[size];
+    new Random().nextBytes(dummyData);
+    return dummyData;
+  }
+
+  /**
+   * Gets the split keys
+   */
+  private byte[][] getSplitKeys() {
+    byte[][] splitKeys = new byte[KEYS.length - 1][];
+    for (int i = 0; i < splitKeys.length; ++i) {
+      splitKeys[i] = new byte[] { KEYS[i + 1] };
+    }
+    return splitKeys;
+  }
+
+  private static ExecutorService createThreadPool(Configuration conf) {
+    int maxThreads = 10;
+    long keepAliveTime = 60;
+    final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads,
+        keepAliveTime, TimeUnit.SECONDS, queue,
+        Threads.newDaemonThreadFactory("MobFileCompactionChore"),
+        new RejectedExecutionHandler() {
+          @Override
+          public void rejectedExecution(Runnable r, ThreadPoolExecutor 
executor) {
+            try {
+              // waiting for a thread to pick up instead of throwing 
exceptions.
+              queue.put(r);
+            } catch (InterruptedException e) {
+              throw new RejectedExecutionException(e);
+            }
+          }
+        });
+    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+    return pool;
+  }
+
+  private void assertRefFileNameEqual(String familyName) throws IOException {
+    Scan scan = new Scan();
+    scan.addFamily(Bytes.toBytes(familyName));
+    // Do not retrieve the mob data when scanning
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    ResultScanner results = hTable.getScanner(scan);
+    Path mobFamilyPath = new 
Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
+        tableName), familyName);
+    List<Path> actualFilePaths = new ArrayList<>();
+    List<Path> expectFilePaths = new ArrayList<>();
+    for (Result res : results) {
+      for (Cell cell : res.listCells()) {
+        byte[] referenceValue = CellUtil.cloneValue(cell);
+        String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT,
+            referenceValue.length - Bytes.SIZEOF_INT);
+        Path targetPath = new Path(mobFamilyPath, fileName);
+        if(!actualFilePaths.contains(targetPath)) {
+          actualFilePaths.add(targetPath);
+        }
+      }
+    }
+    results.close();
+    if (fs.exists(mobFamilyPath)) {
+      FileStatus[] files = fs.listStatus(mobFamilyPath);
+      for (FileStatus file : files) {
+        if (!StoreFileInfo.isDelFile(file.getPath())) {
+          expectFilePaths.add(file.getPath());
+        }
+      }
+    }
+    Collections.sort(actualFilePaths);
+    Collections.sort(expectFilePaths);
+    assertEquals(expectFilePaths, actualFilePaths);
+  }
+
+  /**
+   * Resets the configuration.
+   */
+  private void resetConf() {
+    conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
+      MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
+    conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
+      MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactionRequest.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactionRequest.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactionRequest.java
new file mode 100644
index 0000000..fabc4e2
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactionRequest.java
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.compactions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import 
org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
+import 
org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestPartitionedMobCompactionRequest {
+
+  @Test
+  public void testCompactedPartitionId() {
+    String startKey1 = "startKey1";
+    String startKey2 = "startKey2";
+    String date1 = "date1";
+    String date2 = "date2";
+    CompactionPartitionId partitionId1 = new CompactionPartitionId(startKey1, 
date1);
+    CompactionPartitionId partitionId2 = new CompactionPartitionId(startKey2, 
date2);
+    CompactionPartitionId partitionId3 = new CompactionPartitionId(startKey1, 
date2);
+
+    Assert.assertTrue(partitionId1.equals(partitionId1));
+    Assert.assertFalse(partitionId1.equals(partitionId2));
+    Assert.assertFalse(partitionId1.equals(partitionId3));
+    Assert.assertFalse(partitionId2.equals(partitionId3));
+
+    Assert.assertEquals(startKey1, partitionId1.getStartKey());
+    Assert.assertEquals(date1, partitionId1.getDate());
+  }
+
+  @Test
+  public void testCompactedPartition() {
+    CompactionPartitionId partitionId = new CompactionPartitionId("startKey1", 
"date1");
+    CompactionPartition partition = new CompactionPartition(partitionId);
+    FileStatus file = new FileStatus(1, false, 1, 1024, 1, new Path("/test"));
+    partition.addFile(file);
+    Assert.assertEquals(file, partition.listFiles().get(0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
new file mode 100644
index 0000000..e6d2b98
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
@@ -0,0 +1,440 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.compactions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.regionserver.*;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest;
+import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
+import 
org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
+import 
org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestPartitionedMobCompactor {
+  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  private final static String family = "family";
+  private final static String qf = "qf";
+  private HColumnDescriptor hcd = new HColumnDescriptor(family);
+  private Configuration conf = TEST_UTIL.getConfiguration();
+  private CacheConfig cacheConf = new CacheConfig(conf);
+  private FileSystem fs;
+  private List<FileStatus> mobFiles = new ArrayList<>();
+  private List<FileStatus> delFiles = new ArrayList<>();
+  private List<FileStatus> allFiles = new ArrayList<>();
+  private Path basePath;
+  private String mobSuffix;
+  private String delSuffix;
+  private static ExecutorService pool;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", 
true);
+    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+    TEST_UTIL.startMiniCluster(1);
+    pool = createThreadPool();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    pool.shutdown();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private void init(String tableName) throws Exception {
+    fs = FileSystem.get(conf);
+    Path testDir = FSUtils.getRootDir(conf);
+    Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
+    basePath = new Path(new Path(mobTestDir, tableName), family);
+    mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
+    delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
+  }
+
+  @Test
+  public void testCompactionSelectWithAllFiles() throws Exception {
+    resetConf();
+    String tableName = "testCompactionSelectWithAllFiles";
+    init(tableName);
+    int count = 10;
+    // create 10 mob files.
+    createStoreFiles(basePath, family, qf, count, Type.Put);
+    // create 10 del files
+    createStoreFiles(basePath, family, qf, count, Type.Delete);
+    listFiles();
+    long mergeSize = MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD;
+    List<String> expectedStartKeys = new ArrayList<>();
+    for(FileStatus file : mobFiles) {
+      if(file.getLen() < mergeSize) {
+        String fileName = file.getPath().getName();
+        String startKey = fileName.substring(0, 32);
+        expectedStartKeys.add(startKey);
+      }
+    }
+    testSelectFiles(tableName, CompactionType.ALL_FILES, false, 
expectedStartKeys);
+  }
+
+  @Test
+  public void testCompactionSelectWithPartFiles() throws Exception {
+    resetConf();
+    String tableName = "testCompactionSelectWithPartFiles";
+    init(tableName);
+    int count = 10;
+    // create 10 mob files.
+    createStoreFiles(basePath, family, qf, count, Type.Put);
+    // create 10 del files
+    createStoreFiles(basePath, family, qf, count, Type.Delete);
+    listFiles();
+    long mergeSize = 4000;
+    List<String> expectedStartKeys = new ArrayList<>();
+    for(FileStatus file : mobFiles) {
+      if(file.getLen() < 4000) {
+        String fileName = file.getPath().getName();
+        String startKey = fileName.substring(0, 32);
+        expectedStartKeys.add(startKey);
+      }
+    }
+    // set the mob compaction mergeable threshold
+    conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
+    testSelectFiles(tableName, CompactionType.PART_FILES, false, 
expectedStartKeys);
+  }
+
+  @Test
+  public void testCompactionSelectWithForceAllFiles() throws Exception {
+    resetConf();
+    String tableName = "testCompactionSelectWithForceAllFiles";
+    init(tableName);
+    int count = 10;
+    // create 10 mob files.
+    createStoreFiles(basePath, family, qf, count, Type.Put);
+    // create 10 del files
+    createStoreFiles(basePath, family, qf, count, Type.Delete);
+    listFiles();
+    long mergeSize = 4000;
+    List<String> expectedStartKeys = new ArrayList<>();
+    for(FileStatus file : mobFiles) {
+      String fileName = file.getPath().getName();
+      String startKey = fileName.substring(0, 32);
+      expectedStartKeys.add(startKey);
+    }
+    // set the mob compaction mergeable threshold
+    conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
+    testSelectFiles(tableName, CompactionType.ALL_FILES, true, 
expectedStartKeys);
+  }
+
+  @Test
+  public void testCompactDelFilesWithDefaultBatchSize() throws Exception {
+    resetConf();
+    String tableName = "testCompactDelFilesWithDefaultBatchSize";
+    init(tableName);
+    // create 20 mob files.
+    createStoreFiles(basePath, family, qf, 20, Type.Put);
+    // create 13 del files
+    createStoreFiles(basePath, family, qf, 13, Type.Delete);
+    listFiles();
+    testCompactDelFiles(tableName, 1, 13, false);
+  }
+
+  @Test
+  public void testCompactDelFilesWithSmallBatchSize() throws Exception {
+    resetConf();
+    String tableName = "testCompactDelFilesWithSmallBatchSize";
+    init(tableName);
+    // create 20 mob files.
+    createStoreFiles(basePath, family, qf, 20, Type.Put);
+    // create 13 del files
+    createStoreFiles(basePath, family, qf, 13, Type.Delete);
+    listFiles();
+
+    // set the mob compaction batch size
+    conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, 4);
+    testCompactDelFiles(tableName, 1, 13, false);
+  }
+
+  @Test
+  public void testCompactDelFilesChangeMaxDelFileCount() throws Exception {
+    resetConf();
+    String tableName = "testCompactDelFilesWithSmallBatchSize";
+    init(tableName);
+    // create 20 mob files.
+    createStoreFiles(basePath, family, qf, 20, Type.Put);
+    // create 13 del files
+    createStoreFiles(basePath, family, qf, 13, Type.Delete);
+    listFiles();
+
+    // set the max del file count
+    conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, 5);
+    // set the mob compaction batch size
+    conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, 2);
+    testCompactDelFiles(tableName, 4, 13, false);
+  }
+
+  /**
+   * Tests the selectFiles
+   * @param tableName the table name
+   * @param type the expected compaction type
+   * @param isForceAllFiles whether all the mob files are selected
+   * @param expected the expected start keys
+   */
+  private void testSelectFiles(String tableName, final CompactionType type,
+    final boolean isForceAllFiles, final List<String> expected) throws 
IOException {
+    PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs,
+      TableName.valueOf(tableName), hcd, pool) {
+      @Override
+      public List<Path> compact(List<FileStatus> files, boolean 
isForceAllFiles)
+        throws IOException {
+        if (files == null || files.isEmpty()) {
+          return null;
+        }
+        PartitionedMobCompactionRequest request = select(files, 
isForceAllFiles);
+        // assert the compaction type
+        Assert.assertEquals(type, request.type);
+        // assert get the right partitions
+        compareCompactedPartitions(expected, request.compactionPartitions);
+        // assert get the right del files
+        compareDelFiles(request.delFiles);
+        return null;
+      }
+    };
+    compactor.compact(allFiles, isForceAllFiles);
+  }
+
+  /**
+   * Tests the compacteDelFile
+   * @param tableName the table name
+   * @param expectedFileCount the expected file count
+   * @param expectedCellCount the expected cell count
+   * @param isForceAllFiles whether all the mob files are selected
+   */
+  private void testCompactDelFiles(String tableName, final int 
expectedFileCount,
+      final int expectedCellCount, boolean isForceAllFiles) throws IOException 
{
+    PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs,
+      TableName.valueOf(tableName), hcd, pool) {
+      @Override
+      protected List<Path> performCompaction(PartitionedMobCompactionRequest 
request)
+          throws IOException {
+        List<Path> delFilePaths = new ArrayList<Path>();
+        for (FileStatus delFile : request.delFiles) {
+          delFilePaths.add(delFile.getPath());
+        }
+        List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
+        // assert the del files are merged.
+        Assert.assertEquals(expectedFileCount, newDelPaths.size());
+        Assert.assertEquals(expectedCellCount, 
countDelCellsInDelFiles(newDelPaths));
+        return null;
+      }
+    };
+    compactor.compact(allFiles, isForceAllFiles);
+  }
+
+  /**
+   * Lists the files in the path
+   */
+  private void listFiles() throws IOException {
+    for (FileStatus file : fs.listStatus(basePath)) {
+      allFiles.add(file);
+      if (file.getPath().getName().endsWith("_del")) {
+        delFiles.add(file);
+      } else {
+        mobFiles.add(file);
+      }
+    }
+  }
+
+  /**
+   * Compares the compacted partitions.
+   * @param partitions the collection of CompactedPartitions
+   */
+  private void compareCompactedPartitions(List<String> expected,
+      Collection<CompactionPartition> partitions) {
+    List<String> actualKeys = new ArrayList<>();
+    for (CompactionPartition partition : partitions) {
+      actualKeys.add(partition.getPartitionId().getStartKey());
+    }
+    Collections.sort(expected);
+    Collections.sort(actualKeys);
+    Assert.assertEquals(expected.size(), actualKeys.size());
+    for (int i = 0; i < expected.size(); i++) {
+      Assert.assertEquals(expected.get(i), actualKeys.get(i));
+    }
+  }
+
+  /**
+   * Compares the del files.
+   * @param allDelFiles all the del files
+   */
+  private void compareDelFiles(Collection<FileStatus> allDelFiles) {
+    int i = 0;
+    for (FileStatus file : allDelFiles) {
+      Assert.assertEquals(delFiles.get(i), file);
+      i++;
+    }
+  }
+
+  /**
+   * Creates store files.
+   * @param basePath the path to create file
+   * @family the family name
+   * @qualifier the column qualifier
+   * @count the store file number
+   * @type the key type
+   */
+  private void createStoreFiles(Path basePath, String family, String 
qualifier, int count,
+      Type type) throws IOException {
+    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 
1024).build();
+    String startKey = "row_";
+    MobFileName mobFileName = null;
+    for (int i = 0; i < count; i++) {
+      byte[] startRow = Bytes.toBytes(startKey + i) ;
+      if(type.equals(Type.Delete)) {
+        mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
+            new Date()), delSuffix);
+      }
+      if(type.equals(Type.Put)){
+        mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), 
MobUtils.formatDate(
+            new Date()), mobSuffix);
+      }
+      StoreFile.Writer mobFileWriter = new StoreFile.WriterBuilder(conf, 
cacheConf, fs)
+      .withFileContext(meta).withFilePath(new Path(basePath, 
mobFileName.getFileName())).build();
+      writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), 
Bytes.toBytes(qualifier),
+          type, (i+1)*1000);
+    }
+  }
+
+  /**
+   * Writes data to store file.
+   * @param writer the store file writer
+   * @param row the row key
+   * @param family the family name
+   * @param qualifier the column qualifier
+   * @param type the key type
+   * @param size the size of value
+   */
+  private static void writeStoreFile(final StoreFile.Writer writer, byte[]row, 
byte[] family,
+      byte[] qualifier, Type type, int size) throws IOException {
+    long now = System.currentTimeMillis();
+    try {
+      byte[] dummyData = new byte[size];
+      new Random().nextBytes(dummyData);
+      writer.append(new KeyValue(row, family, qualifier, now, type, 
dummyData));
+    } finally {
+      writer.close();
+    }
+  }
+
+  /**
+   * Gets the number of del cell in the del files
+   * @param paths the del file paths
+   * @return the cell size
+   */
+  private int countDelCellsInDelFiles(List<Path> paths) throws IOException {
+    List<StoreFile> sfs = new ArrayList<StoreFile>();
+    int size = 0;
+    for(Path path : paths) {
+      StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+      sfs.add(sf);
+    }
+    List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
+        false, null, HConstants.LATEST_TIMESTAMP);
+    Scan scan = new Scan();
+    scan.setMaxVersions(hcd.getMaxVersions());
+    long timeToPurgeDeletes = 
Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
+    long ttl = HStore.determineTTLFromFamily(hcd);
+    ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, 
CellComparator.COMPARATOR);
+    StoreScanner scanner = new StoreScanner(scan, scanInfo, 
ScanType.COMPACT_RETAIN_DELETES, null,
+        scanners, 0L, HConstants.LATEST_TIMESTAMP);
+    List<Cell> results = new ArrayList<>();
+    boolean hasMore = true;
+
+    while (hasMore) {
+      hasMore = scanner.next(results);
+      size += results.size();
+      results.clear();
+    }
+    scanner.close();
+    return size;
+  }
+
+  private static ExecutorService createThreadPool() {
+    int maxThreads = 10;
+    long keepAliveTime = 60;
+    final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, 
keepAliveTime,
+      TimeUnit.SECONDS, queue, 
Threads.newDaemonThreadFactory("MobFileCompactionChore"),
+      new RejectedExecutionHandler() {
+        @Override
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
{
+          try {
+            // waiting for a thread to pick up instead of throwing exceptions.
+            queue.put(r);
+          } catch (InterruptedException e) {
+            throw new RejectedExecutionException(e);
+          }
+        }
+      });
+    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+    return pool;
+  }
+
+  /**
+   * Resets the configuration.
+   */
+  private void resetConf() {
+    conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
+      MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
+    conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, 
MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
+    conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
+      MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
+  }
+}
\ No newline at end of file

Reply via email to