http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java new file mode 100644 index 0000000..d63bb95 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java @@ -0,0 +1,924 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.compactions; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.security.Key; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import javax.crypto.spec.SecretKeySpec; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; +import org.apache.hadoop.hbase.io.crypto.aes.AES; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.compactions.MobCompactor; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.security.EncryptionUtil; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestMobCompactor { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private Configuration conf = null; + private String tableNameAsString; + private TableName tableName; + private static Connection conn; + private BufferedMutator bufMut; + private Table hTable; + private Admin admin; + private HTableDescriptor desc; + private HColumnDescriptor hcd1; + private HColumnDescriptor hcd2; + private FileSystem fs; + private static final String family1 = "family1"; + private static final String family2 = "family2"; + private static final String qf1 = "qualifier1"; + private static final String qf2 = "qualifier2"; + private static byte[] KEYS = Bytes.toBytes("012"); + private static int regionNum = KEYS.length; + private static int delRowNum = 1; + private static int delCellNum = 6; + private static int cellNumPerRow = 3; + private static int rowNumPerFile = 2; + private static ExecutorService pool; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + TEST_UTIL.getConfiguration() + .setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, 5000); + TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, + KeyProviderForTesting.class.getName()); + TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); + TEST_UTIL.startMiniCluster(1); + pool = createThreadPool(TEST_UTIL.getConfiguration()); + conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), pool); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + pool.shutdown(); + conn.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + fs = TEST_UTIL.getTestFileSystem(); + conf = TEST_UTIL.getConfiguration(); + long tid = System.currentTimeMillis(); + tableNameAsString = "testMob" + tid; + tableName = TableName.valueOf(tableNameAsString); + hcd1 = new HColumnDescriptor(family1); + hcd1.setMobEnabled(true); + hcd1.setMobThreshold(0L); + hcd1.setMaxVersions(4); + hcd2 = new HColumnDescriptor(family2); + hcd2.setMobEnabled(true); + hcd2.setMobThreshold(0L); + hcd2.setMaxVersions(4); + desc = new HTableDescriptor(tableName); + desc.addFamily(hcd1); + desc.addFamily(hcd2); + admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc, getSplitKeys()); + hTable = conn.getTable(tableName); + bufMut = conn.getBufferedMutator(tableName); + } + + @After + public void tearDown() throws Exception { + admin.disableTable(tableName); + admin.deleteTable(tableName); + admin.close(); + hTable.close(); + fs.delete(TEST_UTIL.getDataTestDir(), true); + } + + @Test + public void testCompactionWithoutDelFilesWithNamespace() throws Exception { + resetConf(); + // create a table with namespace + NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("ns").build(); + String tableNameAsString = "ns:testCompactionWithoutDelFilesWithNamespace"; + admin.createNamespace(namespaceDescriptor); + TableName tableName = TableName.valueOf(tableNameAsString); + HColumnDescriptor hcd1 = new HColumnDescriptor(family1); + hcd1.setMobEnabled(true); + hcd1.setMobThreshold(0L); + hcd1.setMaxVersions(4); + HColumnDescriptor hcd2 = new HColumnDescriptor(family2); + hcd2.setMobEnabled(true); + hcd2.setMobThreshold(0L); + hcd2.setMaxVersions(4); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(hcd1); + desc.addFamily(hcd2); + admin.createTable(desc, getSplitKeys()); + BufferedMutator bufMut= conn.getBufferedMutator(tableName); + Table table = conn.getTable(tableName); + + int count = 4; + // generate mob files + loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count * rowNumPerFile; + + assertEquals("Before compaction: mob rows count", regionNum * rowNumPerRegion, + countMobRows(table)); + assertEquals("Before compaction: mob file count", regionNum * count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: del file count", 0, countFiles(tableName, false, family1)); + + MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After compaction: mob rows count", regionNum * rowNumPerRegion, + countMobRows(table)); + assertEquals("After compaction: mob file count", regionNum, + countFiles(tableName, true, family1)); + assertEquals("After compaction: del file count", 0, countFiles(tableName, false, family1)); + + table.close(); + admin.disableTable(tableName); + admin.deleteTable(tableName); + admin.deleteNamespace("ns"); + } + + @Test + public void testCompactionWithoutDelFiles() throws Exception { + resetConf(); + int count = 4; + // generate mob files + loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before compaction: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before compaction: mob file count", regionNum * count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: del file count", 0, countFiles(tableName, false, family1)); + + MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After compaction: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("After compaction: mob file count", regionNum, + countFiles(tableName, true, family1)); + assertEquals("After compaction: del file count", 0, countFiles(tableName, false, family1)); + } + + @Test + public void testCompactionWithoutDelFilesAndWithEncryption() throws Exception { + resetConf(); + Configuration conf = TEST_UTIL.getConfiguration(); + SecureRandom rng = new SecureRandom(); + byte[] keyBytes = new byte[AES.KEY_LENGTH]; + rng.nextBytes(keyBytes); + String algorithm = conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES); + Key cfKey = new SecretKeySpec(keyBytes, algorithm); + byte[] encryptionKey = EncryptionUtil.wrapKey(conf, + conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()), cfKey); + String tableNameAsString = "testCompactionWithoutDelFilesAndWithEncryption"; + TableName tableName = TableName.valueOf(tableNameAsString); + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(family1); + hcd.setMobEnabled(true); + hcd.setMobThreshold(0); + hcd.setMaxVersions(4); + hcd.setEncryptionType(algorithm); + hcd.setEncryptionKey(encryptionKey); + HColumnDescriptor hcd2 = new HColumnDescriptor(family2); + hcd2.setMobEnabled(true); + hcd2.setMobThreshold(0); + hcd2.setMaxVersions(4); + desc.addFamily(hcd); + desc.addFamily(hcd2); + admin.createTable(desc, getSplitKeys()); + Table hTable = conn.getTable(tableName); + BufferedMutator bufMut = conn.getBufferedMutator(tableName); + int count = 4; + // generate mob files + loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before compaction: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before compaction: mob file count", regionNum * count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: del file count", 0, countFiles(tableName, false, family1)); + + MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd, pool); + compactor.compact(); + + assertEquals("After compaction: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("After compaction: mob file count", regionNum, + countFiles(tableName, true, family1)); + assertEquals("After compaction: del file count", 0, countFiles(tableName, false, family1)); + Assert.assertTrue(verifyEncryption(tableName, family1)); + } + + @Test + public void testCompactionWithDelFiles() throws Exception { + resetConf(); + int count = 4; + // generate mob files + loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion, + countMobCells(hTable)); + assertEquals("Before deleting: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before deleting: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: family2 file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(tableName, false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + + // do the mob file compaction + MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("After compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("After compaction: family1 mob file count", regionNum, + countFiles(tableName, true, family1)); + assertEquals("After compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("After compaction: family1 del file count", 0, + countFiles(tableName, false, family1)); + assertEquals("After compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + assertRefFileNameEqual(family1); + } + + @Test + public void testCompactionWithDelFilesAndNotMergeAllFiles() throws Exception { + resetConf(); + int mergeSize = 5000; + // change the mob compaction merge size + conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); + + int count = 4; + // generate mob files + loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion, + countMobCells(hTable)); + assertEquals("Before deleting: mob file count", regionNum * count, + countFiles(tableName, true, family1)); + + int largeFilesCount = countLargeFiles(mergeSize, family1); + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(tableName, false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + + // do the mob file compaction + MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("After compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + // After the compaction, the files smaller than the mob compaction merge size + // is merge to one file + assertEquals("After compaction: family1 mob file count", largeFilesCount + regionNum, + countFiles(tableName, true, family1)); + assertEquals("After compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("After compaction: family1 del file count", regionNum, + countFiles(tableName, false, family1)); + assertEquals("After compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + } + + @Test + public void testCompactionWithDelFilesAndWithSmallCompactionBatchSize() throws Exception { + resetConf(); + int batchSize = 2; + conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, batchSize); + int count = 4; + // generate mob files + loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before deleting: mob row count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before deleting: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before deleting: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(tableName, false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + + // do the mob compaction + MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("After compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("After compaction: family1 mob file count", regionNum*(count/batchSize), + countFiles(tableName, true, family1)); + assertEquals("After compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("After compaction: family1 del file count", 0, + countFiles(tableName, false, family1)); + assertEquals("After compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + } + + @Test + public void testCompactionWithHFileLink() throws IOException, InterruptedException { + resetConf(); + int count = 4; + // generate mob files + loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + long tid = System.currentTimeMillis(); + byte[] snapshotName1 = Bytes.toBytes("snaptb-" + tid); + // take a snapshot + admin.snapshot(snapshotName1, tableName); + + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(tableName, false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + + // do the mob compaction + MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After first compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("After first compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("After first compaction: family1 mob file count", regionNum, + countFiles(tableName, true, family1)); + assertEquals("After first compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("After first compaction: family1 del file count", 0, + countFiles(tableName, false, family1)); + assertEquals("After first compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + assertEquals("After first compaction: family1 hfilelink count", 0, countHFileLinks(family1)); + assertEquals("After first compaction: family2 hfilelink count", 0, countHFileLinks(family2)); + + admin.disableTable(tableName); + // Restore from snapshot, the hfilelink will exist in mob dir + admin.restoreSnapshot(snapshotName1); + admin.enableTable(tableName); + + assertEquals("After restoring snapshot: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("After restoring snapshot: mob cells count", + regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable)); + assertEquals("After restoring snapshot: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("After restoring snapshot: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("After restoring snapshot: family1 del file count", 0, + countFiles(tableName, false, family1)); + assertEquals("After restoring snapshot: family2 del file count", 0, + countFiles(tableName, false, family2)); + assertEquals("After restoring snapshot: family1 hfilelink count", regionNum*count, + countHFileLinks(family1)); + assertEquals("After restoring snapshot: family2 hfilelink count", 0, + countHFileLinks(family2)); + + compactor.compact(); + + assertEquals("After second compaction: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("After second compaction: mob cells count", + regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable)); + assertEquals("After second compaction: family1 mob file count", regionNum, + countFiles(tableName, true, family1)); + assertEquals("After second compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("After second compaction: family1 del file count", 0, + countFiles(tableName, false, family1)); + assertEquals("After second compaction: family2 del file count", 0, + countFiles(tableName, false, family2)); + assertEquals("After second compaction: family1 hfilelink count", 0, countHFileLinks(family1)); + assertEquals("After second compaction: family2 hfilelink count", 0, countHFileLinks(family2)); + assertRefFileNameEqual(family1); + } + + @Test + public void testCompactionFromAdmin() throws Exception { + int count = 4; + // generate mob files + loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion, + countMobCells(hTable)); + assertEquals("Before deleting: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before deleting: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: family2 file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(tableName, false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + + int largeFilesCount = countLargeFiles(5000, family1); + // do the mob compaction + admin.compactMob(tableName, hcd1.getName()); + + waitUntilCompactionFinished(tableName); + assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum), + countMobRows(hTable)); + assertEquals("After compaction: mob cells count", regionNum + * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(hTable)); + assertEquals("After compaction: family1 mob file count", regionNum + largeFilesCount, + countFiles(tableName, true, family1)); + assertEquals("After compaction: family2 mob file count", regionNum * count, + countFiles(tableName, true, family2)); + assertEquals("After compaction: family1 del file count", regionNum, + countFiles(tableName, false, family1)); + assertEquals("After compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + assertRefFileNameEqual(family1); + } + + @Test + public void testMajorCompactionFromAdmin() throws Exception { + int count = 4; + // generate mob files + loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion, + countMobCells(hTable)); + assertEquals("Before deleting: mob file count", regionNum*count, + countFiles(tableName, true, family1)); + + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(tableName, false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + + // do the major mob compaction, it will force all files to compaction + admin.majorCompactMob(tableName, hcd1.getName()); + + waitUntilCompactionFinished(tableName); + assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("After compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("After compaction: family1 mob file count", regionNum, + countFiles(tableName, true, family1)); + assertEquals("After compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("After compaction: family1 del file count", 0, + countFiles(tableName, false, family1)); + assertEquals("After compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + } + + private void waitUntilCompactionFinished(TableName tableName) throws IOException, + InterruptedException { + long finished = EnvironmentEdgeManager.currentTime() + 60000; + CompactionState state = admin.getMobCompactionState(tableName); + while (EnvironmentEdgeManager.currentTime() < finished) { + if (state == CompactionState.NONE) { + break; + } + state = admin.getMobCompactionState(tableName); + Thread.sleep(10); + } + assertEquals(CompactionState.NONE, state); + } + + /** + * Gets the number of rows in the given table. + * @param table to get the scanner + * @return the number of rows + */ + private int countMobRows(final Table table) throws IOException { + Scan scan = new Scan(); + // Do not retrieve the mob data when scanning + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + ResultScanner results = table.getScanner(scan); + int count = 0; + for (Result res : results) { + count++; + } + results.close(); + return count; + } + + /** + * Gets the number of cells in the given table. + * @param table to get the scanner + * @return the number of cells + */ + private int countMobCells(final Table table) throws IOException { + Scan scan = new Scan(); + // Do not retrieve the mob data when scanning + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + ResultScanner results = table.getScanner(scan); + int count = 0; + for (Result res : results) { + for (Cell cell : res.listCells()) { + count++; + } + } + results.close(); + return count; + } + + /** + * Gets the number of files in the mob path. + * @param isMobFile gets number of the mob files or del files + * @param familyName the family name + * @return the number of the files + */ + private int countFiles(TableName tableName, boolean isMobFile, String familyName) + throws IOException { + Path mobDirPath = MobUtils.getMobFamilyPath( + MobUtils.getMobRegionPath(conf, tableName), familyName); + int count = 0; + if (fs.exists(mobDirPath)) { + FileStatus[] files = fs.listStatus(mobDirPath); + for (FileStatus file : files) { + if (isMobFile == true) { + if (!StoreFileInfo.isDelFile(file.getPath())) { + count++; + } + } else { + if (StoreFileInfo.isDelFile(file.getPath())) { + count++; + } + } + } + } + return count; + } + + private boolean verifyEncryption(TableName tableName, String familyName) throws IOException { + Path mobDirPath = MobUtils.getMobFamilyPath(MobUtils.getMobRegionPath(conf, tableName), + familyName); + boolean hasFiles = false; + if (fs.exists(mobDirPath)) { + FileStatus[] files = fs.listStatus(mobDirPath); + hasFiles = files != null && files.length > 0; + Assert.assertTrue(hasFiles); + Path path = files[0].getPath(); + CacheConfig cacheConf = new CacheConfig(conf); + StoreFile sf = new StoreFile(TEST_UTIL.getTestFileSystem(), path, conf, cacheConf, + BloomType.NONE); + HFile.Reader reader = sf.createReader().getHFileReader(); + byte[] encryptionKey = reader.getTrailer().getEncryptionKey(); + Assert.assertTrue(null != encryptionKey); + Assert.assertTrue(reader.getFileContext().getEncryptionContext().getCipher().getName() + .equals(HConstants.CIPHER_AES)); + } + return hasFiles; + } + + /** + * Gets the number of HFileLink in the mob path. + * @param familyName the family name + * @return the number of the HFileLink + */ + private int countHFileLinks(String familyName) throws IOException { + Path mobDirPath = MobUtils.getMobFamilyPath( + MobUtils.getMobRegionPath(conf, tableName), familyName); + int count = 0; + if (fs.exists(mobDirPath)) { + FileStatus[] files = fs.listStatus(mobDirPath); + for (FileStatus file : files) { + if (HFileLink.isHFileLink(file.getPath())) { + count++; + } + } + } + return count; + } + + /** + * Gets the number of files. + * @param size the size of the file + * @param familyName the family name + * @return the number of files large than the size + */ + private int countLargeFiles(int size, String familyName) throws IOException { + Path mobDirPath = MobUtils.getMobFamilyPath( + MobUtils.getMobRegionPath(conf, tableName), familyName); + int count = 0; + if (fs.exists(mobDirPath)) { + FileStatus[] files = fs.listStatus(mobDirPath); + for (FileStatus file : files) { + // ignore the del files in the mob path + if ((!StoreFileInfo.isDelFile(file.getPath())) + && (file.getLen() > size)) { + count++; + } + } + } + return count; + } + + /** + * loads some data to the table. + */ + private void loadData(Admin admin, BufferedMutator table, TableName tableName, int fileNum, + int rowNumPerFile) throws IOException, InterruptedException { + if (fileNum <= 0) { + throw new IllegalArgumentException(); + } + for (byte k0 : KEYS) { + byte[] k = new byte[] { k0 }; + for (int i = 0; i < fileNum * rowNumPerFile; i++) { + byte[] key = Bytes.add(k, Bytes.toBytes(i)); + byte[] mobVal = makeDummyData(10 * (i + 1)); + Put put = new Put(key); + put.setDurability(Durability.SKIP_WAL); + put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal); + put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal); + put.addColumn(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal); + table.mutate(put); + if ((i + 1) % rowNumPerFile == 0) { + table.flush(); + admin.flush(tableName); + } + } + } + } + + /** + * delete the row, family and cell to create the del file + */ + private void createDelFile() throws IOException, InterruptedException { + for (byte k0 : KEYS) { + byte[] k = new byte[] { k0 }; + // delete a family + byte[] key1 = Bytes.add(k, Bytes.toBytes(0)); + Delete delete1 = new Delete(key1); + delete1.addFamily(Bytes.toBytes(family1)); + hTable.delete(delete1); + // delete one row + byte[] key2 = Bytes.add(k, Bytes.toBytes(2)); + Delete delete2 = new Delete(key2); + hTable.delete(delete2); + // delete one cell + byte[] key3 = Bytes.add(k, Bytes.toBytes(4)); + Delete delete3 = new Delete(key3); + delete3.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1)); + hTable.delete(delete3); + admin.flush(tableName); + List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions( + Bytes.toBytes(tableNameAsString)); + for (HRegion region : regions) { + region.waitForFlushesAndCompactions(); + region.compact(true); + } + } + } + /** + * Creates the dummy data with a specific size. + * @param size the size of value + * @return the dummy data + */ + private byte[] makeDummyData(int size) { + byte[] dummyData = new byte[size]; + new Random().nextBytes(dummyData); + return dummyData; + } + + /** + * Gets the split keys + */ + private byte[][] getSplitKeys() { + byte[][] splitKeys = new byte[KEYS.length - 1][]; + for (int i = 0; i < splitKeys.length; ++i) { + splitKeys[i] = new byte[] { KEYS[i + 1] }; + } + return splitKeys; + } + + private static ExecutorService createThreadPool(Configuration conf) { + int maxThreads = 10; + long keepAliveTime = 60; + final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>(); + ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, + keepAliveTime, TimeUnit.SECONDS, queue, + Threads.newDaemonThreadFactory("MobFileCompactionChore"), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // waiting for a thread to pick up instead of throwing exceptions. + queue.put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException(e); + } + } + }); + ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); + return pool; + } + + private void assertRefFileNameEqual(String familyName) throws IOException { + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes(familyName)); + // Do not retrieve the mob data when scanning + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + ResultScanner results = hTable.getScanner(scan); + Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(), + tableName), familyName); + List<Path> actualFilePaths = new ArrayList<>(); + List<Path> expectFilePaths = new ArrayList<>(); + for (Result res : results) { + for (Cell cell : res.listCells()) { + byte[] referenceValue = CellUtil.cloneValue(cell); + String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT, + referenceValue.length - Bytes.SIZEOF_INT); + Path targetPath = new Path(mobFamilyPath, fileName); + if(!actualFilePaths.contains(targetPath)) { + actualFilePaths.add(targetPath); + } + } + } + results.close(); + if (fs.exists(mobFamilyPath)) { + FileStatus[] files = fs.listStatus(mobFamilyPath); + for (FileStatus file : files) { + if (!StoreFileInfo.isDelFile(file.getPath())) { + expectFilePaths.add(file.getPath()); + } + } + } + Collections.sort(actualFilePaths); + Collections.sort(expectFilePaths); + assertEquals(expectFilePaths, actualFilePaths); + } + + /** + * Resets the configuration. + */ + private void resetConf() { + conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, + MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD); + conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, + MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactionRequest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactionRequest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactionRequest.java new file mode 100644 index 0000000..fabc4e2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactionRequest.java @@ -0,0 +1,60 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.compactions; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestPartitionedMobCompactionRequest { + + @Test + public void testCompactedPartitionId() { + String startKey1 = "startKey1"; + String startKey2 = "startKey2"; + String date1 = "date1"; + String date2 = "date2"; + CompactionPartitionId partitionId1 = new CompactionPartitionId(startKey1, date1); + CompactionPartitionId partitionId2 = new CompactionPartitionId(startKey2, date2); + CompactionPartitionId partitionId3 = new CompactionPartitionId(startKey1, date2); + + Assert.assertTrue(partitionId1.equals(partitionId1)); + Assert.assertFalse(partitionId1.equals(partitionId2)); + Assert.assertFalse(partitionId1.equals(partitionId3)); + Assert.assertFalse(partitionId2.equals(partitionId3)); + + Assert.assertEquals(startKey1, partitionId1.getStartKey()); + Assert.assertEquals(date1, partitionId1.getDate()); + } + + @Test + public void testCompactedPartition() { + CompactionPartitionId partitionId = new CompactionPartitionId("startKey1", "date1"); + CompactionPartition partition = new CompactionPartition(partitionId); + FileStatus file = new FileStatus(1, false, 1, 1024, 1, new Path("/test")); + partition.addFile(file); + Assert.assertEquals(file, partition.listFiles().get(0)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java new file mode 100644 index 0000000..e6d2b98 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java @@ -0,0 +1,440 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.compactions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.regionserver.*; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor; +import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestPartitionedMobCompactor { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static String family = "family"; + private final static String qf = "qf"; + private HColumnDescriptor hcd = new HColumnDescriptor(family); + private Configuration conf = TEST_UTIL.getConfiguration(); + private CacheConfig cacheConf = new CacheConfig(conf); + private FileSystem fs; + private List<FileStatus> mobFiles = new ArrayList<>(); + private List<FileStatus> delFiles = new ArrayList<>(); + private List<FileStatus> allFiles = new ArrayList<>(); + private Path basePath; + private String mobSuffix; + private String delSuffix; + private static ExecutorService pool; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + TEST_UTIL.startMiniCluster(1); + pool = createThreadPool(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + pool.shutdown(); + TEST_UTIL.shutdownMiniCluster(); + } + + private void init(String tableName) throws Exception { + fs = FileSystem.get(conf); + Path testDir = FSUtils.getRootDir(conf); + Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); + basePath = new Path(new Path(mobTestDir, tableName), family); + mobSuffix = UUID.randomUUID().toString().replaceAll("-", ""); + delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del"; + } + + @Test + public void testCompactionSelectWithAllFiles() throws Exception { + resetConf(); + String tableName = "testCompactionSelectWithAllFiles"; + init(tableName); + int count = 10; + // create 10 mob files. + createStoreFiles(basePath, family, qf, count, Type.Put); + // create 10 del files + createStoreFiles(basePath, family, qf, count, Type.Delete); + listFiles(); + long mergeSize = MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD; + List<String> expectedStartKeys = new ArrayList<>(); + for(FileStatus file : mobFiles) { + if(file.getLen() < mergeSize) { + String fileName = file.getPath().getName(); + String startKey = fileName.substring(0, 32); + expectedStartKeys.add(startKey); + } + } + testSelectFiles(tableName, CompactionType.ALL_FILES, false, expectedStartKeys); + } + + @Test + public void testCompactionSelectWithPartFiles() throws Exception { + resetConf(); + String tableName = "testCompactionSelectWithPartFiles"; + init(tableName); + int count = 10; + // create 10 mob files. + createStoreFiles(basePath, family, qf, count, Type.Put); + // create 10 del files + createStoreFiles(basePath, family, qf, count, Type.Delete); + listFiles(); + long mergeSize = 4000; + List<String> expectedStartKeys = new ArrayList<>(); + for(FileStatus file : mobFiles) { + if(file.getLen() < 4000) { + String fileName = file.getPath().getName(); + String startKey = fileName.substring(0, 32); + expectedStartKeys.add(startKey); + } + } + // set the mob compaction mergeable threshold + conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); + testSelectFiles(tableName, CompactionType.PART_FILES, false, expectedStartKeys); + } + + @Test + public void testCompactionSelectWithForceAllFiles() throws Exception { + resetConf(); + String tableName = "testCompactionSelectWithForceAllFiles"; + init(tableName); + int count = 10; + // create 10 mob files. + createStoreFiles(basePath, family, qf, count, Type.Put); + // create 10 del files + createStoreFiles(basePath, family, qf, count, Type.Delete); + listFiles(); + long mergeSize = 4000; + List<String> expectedStartKeys = new ArrayList<>(); + for(FileStatus file : mobFiles) { + String fileName = file.getPath().getName(); + String startKey = fileName.substring(0, 32); + expectedStartKeys.add(startKey); + } + // set the mob compaction mergeable threshold + conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); + testSelectFiles(tableName, CompactionType.ALL_FILES, true, expectedStartKeys); + } + + @Test + public void testCompactDelFilesWithDefaultBatchSize() throws Exception { + resetConf(); + String tableName = "testCompactDelFilesWithDefaultBatchSize"; + init(tableName); + // create 20 mob files. + createStoreFiles(basePath, family, qf, 20, Type.Put); + // create 13 del files + createStoreFiles(basePath, family, qf, 13, Type.Delete); + listFiles(); + testCompactDelFiles(tableName, 1, 13, false); + } + + @Test + public void testCompactDelFilesWithSmallBatchSize() throws Exception { + resetConf(); + String tableName = "testCompactDelFilesWithSmallBatchSize"; + init(tableName); + // create 20 mob files. + createStoreFiles(basePath, family, qf, 20, Type.Put); + // create 13 del files + createStoreFiles(basePath, family, qf, 13, Type.Delete); + listFiles(); + + // set the mob compaction batch size + conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, 4); + testCompactDelFiles(tableName, 1, 13, false); + } + + @Test + public void testCompactDelFilesChangeMaxDelFileCount() throws Exception { + resetConf(); + String tableName = "testCompactDelFilesWithSmallBatchSize"; + init(tableName); + // create 20 mob files. + createStoreFiles(basePath, family, qf, 20, Type.Put); + // create 13 del files + createStoreFiles(basePath, family, qf, 13, Type.Delete); + listFiles(); + + // set the max del file count + conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, 5); + // set the mob compaction batch size + conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, 2); + testCompactDelFiles(tableName, 4, 13, false); + } + + /** + * Tests the selectFiles + * @param tableName the table name + * @param type the expected compaction type + * @param isForceAllFiles whether all the mob files are selected + * @param expected the expected start keys + */ + private void testSelectFiles(String tableName, final CompactionType type, + final boolean isForceAllFiles, final List<String> expected) throws IOException { + PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs, + TableName.valueOf(tableName), hcd, pool) { + @Override + public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles) + throws IOException { + if (files == null || files.isEmpty()) { + return null; + } + PartitionedMobCompactionRequest request = select(files, isForceAllFiles); + // assert the compaction type + Assert.assertEquals(type, request.type); + // assert get the right partitions + compareCompactedPartitions(expected, request.compactionPartitions); + // assert get the right del files + compareDelFiles(request.delFiles); + return null; + } + }; + compactor.compact(allFiles, isForceAllFiles); + } + + /** + * Tests the compacteDelFile + * @param tableName the table name + * @param expectedFileCount the expected file count + * @param expectedCellCount the expected cell count + * @param isForceAllFiles whether all the mob files are selected + */ + private void testCompactDelFiles(String tableName, final int expectedFileCount, + final int expectedCellCount, boolean isForceAllFiles) throws IOException { + PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs, + TableName.valueOf(tableName), hcd, pool) { + @Override + protected List<Path> performCompaction(PartitionedMobCompactionRequest request) + throws IOException { + List<Path> delFilePaths = new ArrayList<Path>(); + for (FileStatus delFile : request.delFiles) { + delFilePaths.add(delFile.getPath()); + } + List<Path> newDelPaths = compactDelFiles(request, delFilePaths); + // assert the del files are merged. + Assert.assertEquals(expectedFileCount, newDelPaths.size()); + Assert.assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths)); + return null; + } + }; + compactor.compact(allFiles, isForceAllFiles); + } + + /** + * Lists the files in the path + */ + private void listFiles() throws IOException { + for (FileStatus file : fs.listStatus(basePath)) { + allFiles.add(file); + if (file.getPath().getName().endsWith("_del")) { + delFiles.add(file); + } else { + mobFiles.add(file); + } + } + } + + /** + * Compares the compacted partitions. + * @param partitions the collection of CompactedPartitions + */ + private void compareCompactedPartitions(List<String> expected, + Collection<CompactionPartition> partitions) { + List<String> actualKeys = new ArrayList<>(); + for (CompactionPartition partition : partitions) { + actualKeys.add(partition.getPartitionId().getStartKey()); + } + Collections.sort(expected); + Collections.sort(actualKeys); + Assert.assertEquals(expected.size(), actualKeys.size()); + for (int i = 0; i < expected.size(); i++) { + Assert.assertEquals(expected.get(i), actualKeys.get(i)); + } + } + + /** + * Compares the del files. + * @param allDelFiles all the del files + */ + private void compareDelFiles(Collection<FileStatus> allDelFiles) { + int i = 0; + for (FileStatus file : allDelFiles) { + Assert.assertEquals(delFiles.get(i), file); + i++; + } + } + + /** + * Creates store files. + * @param basePath the path to create file + * @family the family name + * @qualifier the column qualifier + * @count the store file number + * @type the key type + */ + private void createStoreFiles(Path basePath, String family, String qualifier, int count, + Type type) throws IOException { + HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); + String startKey = "row_"; + MobFileName mobFileName = null; + for (int i = 0; i < count; i++) { + byte[] startRow = Bytes.toBytes(startKey + i) ; + if(type.equals(Type.Delete)) { + mobFileName = MobFileName.create(startRow, MobUtils.formatDate( + new Date()), delSuffix); + } + if(type.equals(Type.Put)){ + mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate( + new Date()), mobSuffix); + } + StoreFile.Writer mobFileWriter = new StoreFile.WriterBuilder(conf, cacheConf, fs) + .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build(); + writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier), + type, (i+1)*1000); + } + } + + /** + * Writes data to store file. + * @param writer the store file writer + * @param row the row key + * @param family the family name + * @param qualifier the column qualifier + * @param type the key type + * @param size the size of value + */ + private static void writeStoreFile(final StoreFile.Writer writer, byte[]row, byte[] family, + byte[] qualifier, Type type, int size) throws IOException { + long now = System.currentTimeMillis(); + try { + byte[] dummyData = new byte[size]; + new Random().nextBytes(dummyData); + writer.append(new KeyValue(row, family, qualifier, now, type, dummyData)); + } finally { + writer.close(); + } + } + + /** + * Gets the number of del cell in the del files + * @param paths the del file paths + * @return the cell size + */ + private int countDelCellsInDelFiles(List<Path> paths) throws IOException { + List<StoreFile> sfs = new ArrayList<StoreFile>(); + int size = 0; + for(Path path : paths) { + StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE); + sfs.add(sf); + } + List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, + false, null, HConstants.LATEST_TIMESTAMP); + Scan scan = new Scan(); + scan.setMaxVersions(hcd.getMaxVersions()); + long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); + long ttl = HStore.determineTTLFromFamily(hcd); + ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, CellComparator.COMPARATOR); + StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_RETAIN_DELETES, null, + scanners, 0L, HConstants.LATEST_TIMESTAMP); + List<Cell> results = new ArrayList<>(); + boolean hasMore = true; + + while (hasMore) { + hasMore = scanner.next(results); + size += results.size(); + results.clear(); + } + scanner.close(); + return size; + } + + private static ExecutorService createThreadPool() { + int maxThreads = 10; + long keepAliveTime = 60; + final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>(); + ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, + TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // waiting for a thread to pick up instead of throwing exceptions. + queue.put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException(e); + } + } + }); + ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); + return pool; + } + + /** + * Resets the configuration. + */ + private void resetConf() { + conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, + MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD); + conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); + conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, + MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); + } +} \ No newline at end of file
