http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java index e4cad6f,0000000..ba0b620 mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java @@@ -1,827 -1,0 +1,822 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.filecompactions; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; ++import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.TableName; - import org.apache.hadoop.hbase.client.Admin; - import org.apache.hadoop.hbase.client.Delete; - import org.apache.hadoop.hbase.client.Durability; - import org.apache.hadoop.hbase.client.HTable; - import org.apache.hadoop.hbase.client.Put; - import org.apache.hadoop.hbase.client.Result; - import org.apache.hadoop.hbase.client.ResultScanner; - import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestMobFileCompactor { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private Configuration conf = null; + private String tableNameAsString; + private TableName tableName; - private HTable hTable; ++ private static Connection conn; ++ private BufferedMutator bufMut; ++ private Table hTable; + private Admin admin; + private HTableDescriptor desc; + private HColumnDescriptor hcd1; + private HColumnDescriptor hcd2; + private FileSystem fs; - private final String family1 = "family1"; - private final String family2 = "family2"; - private final String qf1 = "qualifier1"; - private final String qf2 = "qualifier2"; - private byte[] KEYS = Bytes.toBytes("012"); - private int regionNum = KEYS.length; - private int delRowNum = 1; - private int delCellNum = 6; - private int cellNumPerRow = 3; - private int rowNumPerFile = 2; ++ private static final String family1 = "family1"; ++ private static final String family2 = "family2"; ++ private static final String qf1 = "qualifier1"; ++ private static final String qf2 = "qualifier2"; ++ private static byte[] KEYS = Bytes.toBytes("012"); ++ private static int regionNum = KEYS.length; ++ private static int delRowNum = 1; ++ private static int delCellNum = 6; ++ private static int cellNumPerRow = 3; ++ private static int rowNumPerFile = 2; + private static ExecutorService pool; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + TEST_UTIL.getConfiguration().setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, 5000); + TEST_UTIL.startMiniCluster(1); + pool = createThreadPool(TEST_UTIL.getConfiguration()); ++ conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), pool); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + pool.shutdown(); ++ conn.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + fs = TEST_UTIL.getTestFileSystem(); + conf = TEST_UTIL.getConfiguration(); + long tid = System.currentTimeMillis(); + tableNameAsString = "testMob" + tid; + tableName = TableName.valueOf(tableNameAsString); + hcd1 = new HColumnDescriptor(family1); + hcd1.setMobEnabled(true); + hcd1.setMobThreshold(0L); + hcd1.setMaxVersions(4); + hcd2 = new HColumnDescriptor(family2); + hcd2.setMobEnabled(true); + hcd2.setMobThreshold(0L); + hcd2.setMaxVersions(4); + desc = new HTableDescriptor(tableName); + desc.addFamily(hcd1); + desc.addFamily(hcd2); + admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc, getSplitKeys()); - hTable = new HTable(conf, tableNameAsString); - hTable.setAutoFlush(false, false); ++ hTable = conn.getTable(tableName); ++ bufMut = conn.getBufferedMutator(tableName); + } + + @After + public void tearDown() throws Exception { + admin.disableTable(tableName); + admin.deleteTable(tableName); + admin.close(); + hTable.close(); + fs.delete(TEST_UTIL.getDataTestDir(), true); + } + + @Test + public void testCompactionWithoutDelFilesWithNamespace() throws Exception { + resetConf(); + // create a table with namespace + NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("ns").build(); + String tableNameAsString = "ns:testCompactionWithoutDelFilesWithNamespace"; + admin.createNamespace(namespaceDescriptor); + TableName tableName = TableName.valueOf(tableNameAsString); + HColumnDescriptor hcd1 = new HColumnDescriptor(family1); + hcd1.setMobEnabled(true); + hcd1.setMobThreshold(0L); + hcd1.setMaxVersions(4); + HColumnDescriptor hcd2 = new HColumnDescriptor(family2); + hcd2.setMobEnabled(true); + hcd2.setMobThreshold(0L); + hcd2.setMaxVersions(4); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(hcd1); + desc.addFamily(hcd2); + admin.createTable(desc, getSplitKeys()); - HTable table = new HTable(conf, tableName); - table.setAutoFlush(false, false); ++ BufferedMutator bufMut= conn.getBufferedMutator(tableName); ++ Table table = conn.getTable(tableName); + + int count = 4; + // generate mob files - loadData(admin, table, tableName, count, rowNumPerFile); ++ loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count * rowNumPerFile; + + assertEquals("Before compaction: mob rows count", regionNum * rowNumPerRegion, + countMobRows(table)); + assertEquals("Before compaction: mob file count", regionNum * count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: del file count", 0, countFiles(tableName, false, family1)); + + MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After compaction: mob rows count", regionNum * rowNumPerRegion, + countMobRows(table)); + assertEquals("After compaction: mob file count", regionNum, + countFiles(tableName, true, family1)); + assertEquals("After compaction: del file count", 0, countFiles(tableName, false, family1)); + + table.close(); + admin.disableTable(tableName); + admin.deleteTable(tableName); + admin.deleteNamespace("ns"); + } + + @Test + public void testCompactionWithoutDelFiles() throws Exception { + resetConf(); + int count = 4; + // generate mob files - loadData(admin, hTable, tableName, count, rowNumPerFile); ++ loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before compaction: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before compaction: mob file count", regionNum * count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: del file count", 0, countFiles(tableName, false, family1)); + + MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After compaction: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("After compaction: mob file count", regionNum, + countFiles(tableName, true, family1)); + assertEquals("After compaction: del file count", 0, countFiles(tableName, false, family1)); + } + + @Test + public void testCompactionWithDelFiles() throws Exception { + resetConf(); + int count = 4; + // generate mob files - loadData(admin, hTable, tableName, count, rowNumPerFile); ++ loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion, + countMobCells(hTable)); + assertEquals("Before deleting: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before deleting: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: family2 file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(tableName, false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + + // do the mob file compaction + MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("After compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("After compaction: family1 mob file count", regionNum, + countFiles(tableName, true, family1)); + assertEquals("After compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("After compaction: family1 del file count", 0, + countFiles(tableName, false, family1)); + assertEquals("After compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + assertRefFileNameEqual(family1); + } + + @Test + public void testCompactionWithDelFilesAndNotMergeAllFiles() throws Exception { + resetConf(); + int mergeSize = 5000; + // change the mob compaction merge size + conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); + + int count = 4; + // generate mob files - loadData(admin, hTable, tableName, count, rowNumPerFile); ++ loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion, + countMobCells(hTable)); + assertEquals("Before deleting: mob file count", regionNum * count, + countFiles(tableName, true, family1)); + + int largeFilesCount = countLargeFiles(mergeSize, family1); + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(tableName, false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + + // do the mob file compaction + MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("After compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + // After the compaction, the files smaller than the mob compaction merge size + // is merge to one file + assertEquals("After compaction: family1 mob file count", largeFilesCount + regionNum, + countFiles(tableName, true, family1)); + assertEquals("After compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("After compaction: family1 del file count", regionNum, + countFiles(tableName, false, family1)); + assertEquals("After compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + } + + @Test + public void testCompactionWithDelFilesAndWithSmallCompactionBatchSize() throws Exception { + resetConf(); + int batchSize = 2; + conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, batchSize); + int count = 4; + // generate mob files - loadData(admin, hTable, tableName, count, rowNumPerFile); ++ loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before deleting: mob row count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before deleting: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before deleting: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(tableName, false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + + // do the mob file compaction + MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("After compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("After compaction: family1 mob file count", regionNum*(count/batchSize), + countFiles(tableName, true, family1)); + assertEquals("After compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("After compaction: family1 del file count", 0, + countFiles(tableName, false, family1)); + assertEquals("After compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + } + + @Test + public void testCompactionWithHFileLink() throws IOException, InterruptedException { + resetConf(); + int count = 4; + // generate mob files - loadData(admin, hTable, tableName, count, rowNumPerFile); ++ loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + long tid = System.currentTimeMillis(); + byte[] snapshotName1 = Bytes.toBytes("snaptb-" + tid); + // take a snapshot + admin.snapshot(snapshotName1, tableName); + + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(tableName, false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + + // do the mob file compaction + MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After first compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("After first compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("After first compaction: family1 mob file count", regionNum, + countFiles(tableName, true, family1)); + assertEquals("After first compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("After first compaction: family1 del file count", 0, + countFiles(tableName, false, family1)); + assertEquals("After first compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + assertEquals("After first compaction: family1 hfilelink count", 0, countHFileLinks(family1)); + assertEquals("After first compaction: family2 hfilelink count", 0, countHFileLinks(family2)); + + admin.disableTable(tableName); + // Restore from snapshot, the hfilelink will exist in mob dir + admin.restoreSnapshot(snapshotName1); + admin.enableTable(tableName); + + assertEquals("After restoring snapshot: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("After restoring snapshot: mob cells count", + regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable)); + assertEquals("After restoring snapshot: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("After restoring snapshot: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("After restoring snapshot: family1 del file count", 0, + countFiles(tableName, false, family1)); + assertEquals("After restoring snapshot: family2 del file count", 0, + countFiles(tableName, false, family2)); + assertEquals("After restoring snapshot: family1 hfilelink count", regionNum*count, + countHFileLinks(family1)); + assertEquals("After restoring snapshot: family2 hfilelink count", 0, + countHFileLinks(family2)); + + compactor.compact(); + + assertEquals("After second compaction: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("After second compaction: mob cells count", + regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable)); + assertEquals("After second compaction: family1 mob file count", regionNum, + countFiles(tableName, true, family1)); + assertEquals("After second compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("After second compaction: family1 del file count", 0, + countFiles(tableName, false, family1)); + assertEquals("After second compaction: family2 del file count", 0, + countFiles(tableName, false, family2)); + assertEquals("After second compaction: family1 hfilelink count", 0, countHFileLinks(family1)); + assertEquals("After second compaction: family2 hfilelink count", 0, countHFileLinks(family2)); + assertRefFileNameEqual(family1); + } + + @Test + public void testCompactionFromAdmin() throws Exception { + int count = 4; + // generate mob files - loadData(admin, hTable, tableName, count, rowNumPerFile); ++ loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion, + countMobCells(hTable)); + assertEquals("Before deleting: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before deleting: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: family2 file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(tableName, false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + + int largeFilesCount = countLargeFiles(5000, family1); + // do the mob file compaction + admin.compactMob(tableName, hcd1.getName()); + + waitUntilCompactionFinished(tableName); + assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum), + countMobRows(hTable)); + assertEquals("After compaction: mob cells count", regionNum + * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(hTable)); + assertEquals("After compaction: family1 mob file count", regionNum + largeFilesCount, + countFiles(tableName, true, family1)); + assertEquals("After compaction: family2 mob file count", regionNum * count, + countFiles(tableName, true, family2)); + assertEquals("After compaction: family1 del file count", regionNum, + countFiles(tableName, false, family1)); + assertEquals("After compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + assertRefFileNameEqual(family1); + } + + @Test + public void testMajorCompactionFromAdmin() throws Exception { + int count = 4; + // generate mob files - loadData(admin, hTable, tableName, count, rowNumPerFile); ++ loadData(admin, bufMut, tableName, count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion, + countMobCells(hTable)); + assertEquals("Before deleting: mob file count", regionNum*count, + countFiles(tableName, true, family1)); + + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(tableName, true, family1)); + assertEquals("Before compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(tableName, false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + + // do the major mob file compaction, it will force all files to compaction + admin.majorCompactMob(tableName, hcd1.getName()); + + waitUntilCompactionFinished(tableName); + assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("After compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("After compaction: family1 mob file count", regionNum, + countFiles(tableName, true, family1)); + assertEquals("After compaction: family2 mob file count", regionNum*count, + countFiles(tableName, true, family2)); + assertEquals("After compaction: family1 del file count", 0, + countFiles(tableName, false, family1)); + assertEquals("After compaction: family2 del file count", regionNum, + countFiles(tableName, false, family2)); + } + + private void waitUntilCompactionFinished(TableName tableName) throws IOException, + InterruptedException { + long finished = EnvironmentEdgeManager.currentTime() + 60000; + CompactionState state = admin.getMobCompactionState(tableName); + while (EnvironmentEdgeManager.currentTime() < finished) { + if (state == CompactionState.NONE) { + break; + } + state = admin.getMobCompactionState(tableName); + Thread.sleep(10); + } + assertEquals(CompactionState.NONE, state); + } + + /** + * Gets the number of rows in the given table. + * @param table to get the scanner + * @return the number of rows + */ - private int countMobRows(final HTable table) throws IOException { ++ private int countMobRows(final Table table) throws IOException { + Scan scan = new Scan(); + // Do not retrieve the mob data when scanning + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + ResultScanner results = table.getScanner(scan); + int count = 0; + for (Result res : results) { + count++; + } + results.close(); + return count; + } + + /** + * Gets the number of cells in the given table. + * @param table to get the scanner + * @return the number of cells + */ - private int countMobCells(final HTable table) throws IOException { ++ private int countMobCells(final Table table) throws IOException { + Scan scan = new Scan(); + // Do not retrieve the mob data when scanning + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + ResultScanner results = table.getScanner(scan); + int count = 0; + for (Result res : results) { + for (Cell cell : res.listCells()) { + count++; + } + } + results.close(); + return count; + } + + /** + * Gets the number of files in the mob path. + * @param isMobFile gets number of the mob files or del files + * @param familyName the family name + * @return the number of the files + */ + private int countFiles(TableName tableName, boolean isMobFile, String familyName) + throws IOException { + Path mobDirPath = MobUtils.getMobFamilyPath( + MobUtils.getMobRegionPath(conf, tableName), familyName); + int count = 0; + if (fs.exists(mobDirPath)) { + FileStatus[] files = fs.listStatus(mobDirPath); + for (FileStatus file : files) { + if (isMobFile == true) { + if (!StoreFileInfo.isDelFile(file.getPath())) { + count++; + } + } else { + if (StoreFileInfo.isDelFile(file.getPath())) { + count++; + } + } + } + } + return count; + } + + /** + * Gets the number of HFileLink in the mob path. + * @param familyName the family name + * @return the number of the HFileLink + */ + private int countHFileLinks(String familyName) throws IOException { + Path mobDirPath = MobUtils.getMobFamilyPath( + MobUtils.getMobRegionPath(conf, tableName), familyName); + int count = 0; + if (fs.exists(mobDirPath)) { + FileStatus[] files = fs.listStatus(mobDirPath); + for (FileStatus file : files) { + if (HFileLink.isHFileLink(file.getPath())) { + count++; + } + } + } + return count; + } + + /** + * Gets the number of files. + * @param size the size of the file + * @param familyName the family name + * @return the number of files large than the size + */ + private int countLargeFiles(int size, String familyName) throws IOException { + Path mobDirPath = MobUtils.getMobFamilyPath( + MobUtils.getMobRegionPath(conf, tableName), familyName); + int count = 0; + if (fs.exists(mobDirPath)) { + FileStatus[] files = fs.listStatus(mobDirPath); + for (FileStatus file : files) { + // ignore the del files in the mob path + if ((!StoreFileInfo.isDelFile(file.getPath())) + && (file.getLen() > size)) { + count++; + } + } + } + return count; + } + + /** + * loads some data to the table. - * @param count the mob file number + */ - private void loadData(Admin admin, HTable table, TableName tableName, int fileNum, ++ private void loadData(Admin admin, BufferedMutator table, TableName tableName, int fileNum, + int rowNumPerFile) throws IOException, InterruptedException { + if (fileNum <= 0) { + throw new IllegalArgumentException(); + } + for (byte k0 : KEYS) { + byte[] k = new byte[] { k0 }; + for (int i = 0; i < fileNum * rowNumPerFile; i++) { + byte[] key = Bytes.add(k, Bytes.toBytes(i)); + byte[] mobVal = makeDummyData(10 * (i + 1)); + Put put = new Put(key); + put.setDurability(Durability.SKIP_WAL); - put.add(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal); - put.add(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal); - put.add(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal); - table.put(put); ++ put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal); ++ put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal); ++ put.addColumn(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal); ++ table.mutate(put); + if ((i + 1) % rowNumPerFile == 0) { - table.flushCommits(); ++ table.flush(); + admin.flush(tableName); + } + } + } + } + + /** + * delete the row, family and cell to create the del file + */ + private void createDelFile() throws IOException, InterruptedException { + for (byte k0 : KEYS) { + byte[] k = new byte[] { k0 }; + // delete a family + byte[] key1 = Bytes.add(k, Bytes.toBytes(0)); + Delete delete1 = new Delete(key1); - delete1.deleteFamily(Bytes.toBytes(family1)); ++ delete1.addFamily(Bytes.toBytes(family1)); + hTable.delete(delete1); + // delete one row + byte[] key2 = Bytes.add(k, Bytes.toBytes(2)); + Delete delete2 = new Delete(key2); + hTable.delete(delete2); + // delete one cell + byte[] key3 = Bytes.add(k, Bytes.toBytes(4)); + Delete delete3 = new Delete(key3); - delete3.deleteColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1)); ++ delete3.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1)); + hTable.delete(delete3); - hTable.flushCommits(); + admin.flush(tableName); + List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions( + Bytes.toBytes(tableNameAsString)); + for (HRegion region : regions) { + region.waitForFlushesAndCompactions(); - region.compactStores(true); ++ region.compact(true); + } + } + } + /** + * Creates the dummy data with a specific size. - * @param the size of data ++ * @param size the size of value + * @return the dummy data + */ + private byte[] makeDummyData(int size) { + byte[] dummyData = new byte[size]; + new Random().nextBytes(dummyData); + return dummyData; + } + + /** + * Gets the split keys + */ + private byte[][] getSplitKeys() { + byte[][] splitKeys = new byte[KEYS.length - 1][]; + for (int i = 0; i < splitKeys.length; ++i) { + splitKeys[i] = new byte[] { KEYS[i + 1] }; + } + return splitKeys; + } + + private static ExecutorService createThreadPool(Configuration conf) { + int maxThreads = 10; + long keepAliveTime = 60; + final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>(); + ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, + keepAliveTime, TimeUnit.SECONDS, queue, + Threads.newDaemonThreadFactory("MobFileCompactionChore"), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // waiting for a thread to pick up instead of throwing exceptions. + queue.put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException(e); + } + } + }); + ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); + return pool; + } + + private void assertRefFileNameEqual(String familyName) throws IOException { + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes(familyName)); + // Do not retrieve the mob data when scanning + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + ResultScanner results = hTable.getScanner(scan); + Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(), + tableName), familyName); + List<Path> actualFilePaths = new ArrayList<>(); + List<Path> expectFilePaths = new ArrayList<>(); + for (Result res : results) { + for (Cell cell : res.listCells()) { + byte[] referenceValue = CellUtil.cloneValue(cell); + String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT, + referenceValue.length - Bytes.SIZEOF_INT); + Path targetPath = new Path(mobFamilyPath, fileName); + if(!actualFilePaths.contains(targetPath)) { + actualFilePaths.add(targetPath); + } + } + } + results.close(); + if (fs.exists(mobFamilyPath)) { + FileStatus[] files = fs.listStatus(mobFamilyPath); + for (FileStatus file : files) { + if (!StoreFileInfo.isDelFile(file.getPath())) { + expectFilePaths.add(file.getPath()); + } + } + } + Collections.sort(actualFilePaths); + Collections.sort(expectFilePaths); + assertEquals(expectFilePaths, actualFilePaths); + } + + /** + * Resets the configuration. + */ + private void resetConf() { + conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD); + conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE); + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java index 3c73d52,0000000..ed3853e mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java @@@ -1,446 -1,0 +1,441 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.filecompactions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; ++import org.apache.hadoop.hbase.regionserver.*; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType; +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition; - import org.apache.hadoop.hbase.regionserver.BloomType; - import org.apache.hadoop.hbase.regionserver.HStore; - import org.apache.hadoop.hbase.regionserver.ScanInfo; - import org.apache.hadoop.hbase.regionserver.ScanType; - import org.apache.hadoop.hbase.regionserver.StoreFile; - import org.apache.hadoop.hbase.regionserver.StoreFileScanner; - import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestPartitionedMobFileCompactor { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static String family = "family"; + private final static String qf = "qf"; + private HColumnDescriptor hcd = new HColumnDescriptor(family); + private Configuration conf = TEST_UTIL.getConfiguration(); + private CacheConfig cacheConf = new CacheConfig(conf); + private FileSystem fs; + private List<FileStatus> mobFiles = new ArrayList<>(); + private List<FileStatus> delFiles = new ArrayList<>(); + private List<FileStatus> allFiles = new ArrayList<>(); + private Path basePath; + private String mobSuffix; + private String delSuffix; + private static ExecutorService pool; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + TEST_UTIL.startMiniCluster(1); - pool = createThreadPool(TEST_UTIL.getConfiguration()); ++ pool = createThreadPool(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + pool.shutdown(); + TEST_UTIL.shutdownMiniCluster(); + } + + private void init(String tableName) throws Exception { + fs = FileSystem.get(conf); + Path testDir = FSUtils.getRootDir(conf); + Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); + basePath = new Path(new Path(mobTestDir, tableName), family); + mobSuffix = UUID.randomUUID().toString().replaceAll("-", ""); + delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del"; + } + + @Test + public void testCompactionSelectWithAllFiles() throws Exception { + resetConf(); + String tableName = "testCompactionSelectWithAllFiles"; + init(tableName); + int count = 10; + // create 10 mob files. + createStoreFiles(basePath, family, qf, count, Type.Put); + // create 10 del files + createStoreFiles(basePath, family, qf, count, Type.Delete); + listFiles(); + long mergeSize = MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD; + List<String> expectedStartKeys = new ArrayList<>(); + for(FileStatus file : mobFiles) { + if(file.getLen() < mergeSize) { + String fileName = file.getPath().getName(); + String startKey = fileName.substring(0, 32); + expectedStartKeys.add(startKey); + } + } + testSelectFiles(tableName, CompactionType.ALL_FILES, false, expectedStartKeys); + } + + @Test + public void testCompactionSelectWithPartFiles() throws Exception { + resetConf(); + String tableName = "testCompactionSelectWithPartFiles"; + init(tableName); + int count = 10; + // create 10 mob files. + createStoreFiles(basePath, family, qf, count, Type.Put); + // create 10 del files + createStoreFiles(basePath, family, qf, count, Type.Delete); + listFiles(); + long mergeSize = 4000; + List<String> expectedStartKeys = new ArrayList<>(); + for(FileStatus file : mobFiles) { + if(file.getLen() < 4000) { + String fileName = file.getPath().getName(); + String startKey = fileName.substring(0, 32); + expectedStartKeys.add(startKey); + } + } + // set the mob file compaction mergeable threshold + conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); + testSelectFiles(tableName, CompactionType.PART_FILES, false, expectedStartKeys); + } + + @Test + public void testCompactionSelectWithForceAllFiles() throws Exception { + resetConf(); + String tableName = "testCompactionSelectWithForceAllFiles"; + init(tableName); + int count = 10; + // create 10 mob files. + createStoreFiles(basePath, family, qf, count, Type.Put); + // create 10 del files + createStoreFiles(basePath, family, qf, count, Type.Delete); + listFiles(); + long mergeSize = 4000; + List<String> expectedStartKeys = new ArrayList<>(); + for(FileStatus file : mobFiles) { + String fileName = file.getPath().getName(); + String startKey = fileName.substring(0, 32); + expectedStartKeys.add(startKey); + } + // set the mob file compaction mergeable threshold + conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); + testSelectFiles(tableName, CompactionType.ALL_FILES, true, expectedStartKeys); + } + + @Test + public void testCompactDelFilesWithDefaultBatchSize() throws Exception { + resetConf(); + String tableName = "testCompactDelFilesWithDefaultBatchSize"; + init(tableName); + // create 20 mob files. + createStoreFiles(basePath, family, qf, 20, Type.Put); + // create 13 del files + createStoreFiles(basePath, family, qf, 13, Type.Delete); + listFiles(); + testCompactDelFiles(tableName, 1, 13, false); + } + + @Test + public void testCompactDelFilesWithSmallBatchSize() throws Exception { + resetConf(); + String tableName = "testCompactDelFilesWithSmallBatchSize"; + init(tableName); + // create 20 mob files. + createStoreFiles(basePath, family, qf, 20, Type.Put); + // create 13 del files + createStoreFiles(basePath, family, qf, 13, Type.Delete); + listFiles(); + + // set the mob file compaction batch size + conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 4); + testCompactDelFiles(tableName, 1, 13, false); + } + + @Test + public void testCompactDelFilesChangeMaxDelFileCount() throws Exception { + resetConf(); + String tableName = "testCompactDelFilesWithSmallBatchSize"; + init(tableName); + // create 20 mob files. + createStoreFiles(basePath, family, qf, 20, Type.Put); + // create 13 del files + createStoreFiles(basePath, family, qf, 13, Type.Delete); + listFiles(); + + // set the max del file count + conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, 5); + // set the mob file compaction batch size + conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 2); + testCompactDelFiles(tableName, 4, 13, false); + } + + /** + * Tests the selectFiles + * @param tableName the table name + * @param type the expected compaction type + * @param expected the expected start keys + */ + private void testSelectFiles(String tableName, final CompactionType type, + final boolean isForceAllFiles, final List<String> expected) throws IOException { + PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, + TableName.valueOf(tableName), hcd, pool) { + @Override + public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles) + throws IOException { + if (files == null || files.isEmpty()) { + return null; + } + PartitionedMobFileCompactionRequest request = select(files, isForceAllFiles); + // assert the compaction type + Assert.assertEquals(type, request.type); + // assert get the right partitions + compareCompactedPartitions(expected, request.compactionPartitions); + // assert get the right del files + compareDelFiles(request.delFiles); + return null; + } + }; + compactor.compact(allFiles, isForceAllFiles); + } + + /** + * Tests the compacteDelFile + * @param tableName the table name + * @param expectedFileCount the expected file count + * @param expectedCellCount the expected cell count + */ + private void testCompactDelFiles(String tableName, final int expectedFileCount, + final int expectedCellCount, boolean isForceAllFiles) throws IOException { + PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, + TableName.valueOf(tableName), hcd, pool) { + @Override + protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request) + throws IOException { + List<Path> delFilePaths = new ArrayList<Path>(); + for (FileStatus delFile : request.delFiles) { + delFilePaths.add(delFile.getPath()); + } + List<Path> newDelPaths = compactDelFiles(request, delFilePaths); + // assert the del files are merged. + Assert.assertEquals(expectedFileCount, newDelPaths.size()); + Assert.assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths)); + return null; + } + }; + compactor.compact(allFiles, isForceAllFiles); + } + + /** + * Lists the files in the path + */ + private void listFiles() throws IOException { + for (FileStatus file : fs.listStatus(basePath)) { + allFiles.add(file); + if (file.getPath().getName().endsWith("_del")) { + delFiles.add(file); + } else { + mobFiles.add(file); + } + } + } + + /** + * Compares the compacted partitions. + * @param partitions the collection of CompactedPartitions + */ + private void compareCompactedPartitions(List<String> expected, + Collection<CompactionPartition> partitions) { + List<String> actualKeys = new ArrayList<>(); + for (CompactionPartition partition : partitions) { + actualKeys.add(partition.getPartitionId().getStartKey()); + } + Collections.sort(expected); + Collections.sort(actualKeys); + Assert.assertEquals(expected.size(), actualKeys.size()); + for (int i = 0; i < expected.size(); i++) { + Assert.assertEquals(expected.get(i), actualKeys.get(i)); + } + } + + /** + * Compares the del files. + * @param allDelFiles all the del files + */ + private void compareDelFiles(Collection<FileStatus> allDelFiles) { + int i = 0; + for (FileStatus file : allDelFiles) { + Assert.assertEquals(delFiles.get(i), file); + i++; + } + } + + /** + * Creates store files. + * @param basePath the path to create file + * @family the family name + * @qualifier the column qualifier + * @count the store file number + * @type the key type + */ + private void createStoreFiles(Path basePath, String family, String qualifier, int count, + Type type) throws IOException { + HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); + String startKey = "row_"; + MobFileName mobFileName = null; + for (int i = 0; i < count; i++) { + byte[] startRow = Bytes.toBytes(startKey + i) ; + if(type.equals(Type.Delete)) { + mobFileName = MobFileName.create(startRow, MobUtils.formatDate( + new Date()), delSuffix); + } + if(type.equals(Type.Put)){ + mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate( + new Date()), mobSuffix); + } + StoreFile.Writer mobFileWriter = new StoreFile.WriterBuilder(conf, cacheConf, fs) + .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build(); + writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier), + type, (i+1)*1000); + } + } + + /** + * Writes data to store file. + * @param writer the store file writer + * @param row the row key + * @param family the family name + * @param qualifier the column qualifier + * @param type the key type + * @param size the size of value + */ + private static void writeStoreFile(final StoreFile.Writer writer, byte[]row, byte[] family, + byte[] qualifier, Type type, int size) throws IOException { + long now = System.currentTimeMillis(); + try { + byte[] dummyData = new byte[size]; + new Random().nextBytes(dummyData); + writer.append(new KeyValue(row, family, qualifier, now, type, dummyData)); + } finally { + writer.close(); + } + } + + /** + * Gets the number of del cell in the del files + * @param paths the del file paths + * @return the cell size + */ + private int countDelCellsInDelFiles(List<Path> paths) throws IOException { + List<StoreFile> sfs = new ArrayList<StoreFile>(); + int size = 0; + for(Path path : paths) { + StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE); + sfs.add(sf); + } + List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, + false, null, HConstants.LATEST_TIMESTAMP); + Scan scan = new Scan(); + scan.setMaxVersions(hcd.getMaxVersions()); + long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); + long ttl = HStore.determineTTLFromFamily(hcd); + ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR); + StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_RETAIN_DELETES, null, + scanners, 0L, HConstants.LATEST_TIMESTAMP); + List<Cell> results = new ArrayList<>(); + boolean hasMore = true; ++ + while (hasMore) { + hasMore = scanner.next(results); + size += results.size(); + results.clear(); + } + scanner.close(); + return size; + } + - private static ExecutorService createThreadPool(Configuration conf) { ++ private static ExecutorService createThreadPool() { + int maxThreads = 10; + long keepAliveTime = 60; + final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>(); + ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, + TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // waiting for a thread to pick up instead of throwing exceptions. + queue.put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException(e); + } + } + }); + ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); + return pool; + } + + /** + * Resets the configuration. + */ + private void resetConf() { + conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD); + conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); + conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java index 49345e4,0000000..3023849 mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java @@@ -1,168 -1,0 +1,169 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + ++import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.serializer.JavaSerialization; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestMobSweepJob { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + TEST_UTIL.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, + JavaSerialization.class.getName() + "," + WritableSerialization.class.getName()); + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private void writeFileNames(FileSystem fs, Configuration conf, Path path, + String[] filesNames) throws IOException { + // write the names to a sequence file + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, + String.class, String.class); + try { + for (String fileName : filesNames) { + writer.append(fileName, MobConstants.EMPTY_STRING); + } + } finally { + IOUtils.closeStream(writer); + } + } + + @Test + public void testSweeperJobWithOutUnusedFile() throws Exception { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Configuration configuration = new Configuration( + TEST_UTIL.getConfiguration()); + Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/0/visited"); + Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/0/all"); + configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, + vistiedFileNamesPath.toString()); + configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY, + allFileNamesPath.toString()); + + writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1", + "2", "3", "4", "5", "6"}); + + Path r0 = new Path(vistiedFileNamesPath, "r0"); + writeFileNames(fs, configuration, r0, new String[] { "1", + "2", "3"}); + Path r1 = new Path(vistiedFileNamesPath, "r1"); + writeFileNames(fs, configuration, r1, new String[] { "1", "4", "5"}); + Path r2 = new Path(vistiedFileNamesPath, "r2"); + writeFileNames(fs, configuration, r2, new String[] { "2", "3", "6"}); + + SweepJob sweepJob = new SweepJob(configuration, fs); + List<String> toBeArchived = sweepJob.getUnusedFiles(configuration); + + assertEquals(0, toBeArchived.size()); + } + + @Test + public void testSweeperJobWithUnusedFile() throws Exception { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Configuration configuration = new Configuration( + TEST_UTIL.getConfiguration()); + Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/1/visited"); + Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/1/all"); + configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, + vistiedFileNamesPath.toString()); + configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY, + allFileNamesPath.toString()); + + writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1", + "2", "3", "4", "5", "6"}); + + Path r0 = new Path(vistiedFileNamesPath, "r0"); + writeFileNames(fs, configuration, r0, new String[] { "1", + "2", "3"}); + Path r1 = new Path(vistiedFileNamesPath, "r1"); + writeFileNames(fs, configuration, r1, new String[] { "1", "5"}); + Path r2 = new Path(vistiedFileNamesPath, "r2"); + writeFileNames(fs, configuration, r2, new String[] { "2", "3"}); + + SweepJob sweepJob = new SweepJob(configuration, fs); + List<String> toBeArchived = sweepJob.getUnusedFiles(configuration); + + assertEquals(2, toBeArchived.size()); - assertEquals(new String[] { "4", "6" }, toBeArchived.toArray(new String[0])); ++ assertArrayEquals(new String[]{"4", "6"}, toBeArchived.toArray(new String[0])); + } + + @Test + public void testSweeperJobWithRedundantFile() throws Exception { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Configuration configuration = new Configuration( + TEST_UTIL.getConfiguration()); + Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/2/visited"); + Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/2/all"); + configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, + vistiedFileNamesPath.toString()); + configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY, + allFileNamesPath.toString()); + + writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1", + "2", "3", "4", "5", "6"}); + + Path r0 = new Path(vistiedFileNamesPath, "r0"); + writeFileNames(fs, configuration, r0, new String[] { "1", + "2", "3"}); + Path r1 = new Path(vistiedFileNamesPath, "r1"); + writeFileNames(fs, configuration, r1, new String[] { "1", "5", "6", "7"}); + Path r2 = new Path(vistiedFileNamesPath, "r2"); + writeFileNames(fs, configuration, r2, new String[] { "2", "3", "4"}); + + SweepJob sweepJob = new SweepJob(configuration, fs); + List<String> toBeArchived = sweepJob.getUnusedFiles(configuration); + + assertEquals(0, toBeArchived.size()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java index 308b50e,0000000..8c24123 mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java @@@ -1,220 -1,0 +1,219 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; - import org.apache.hadoop.hbase.client.Admin; - import org.apache.hadoop.hbase.client.HTable; - import org.apache.hadoop.hbase.client.Put; ++import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable; +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.serializer.JavaSerialization; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.counters.GenericCounter; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Matchers; + +@Category(MediumTests.class) +public class TestMobSweepReducer { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static String tableName = "testSweepReducer"; + private final static String row = "row"; + private final static String family = "family"; + private final static String qf = "qf"; - private static HTable table; ++ private static BufferedMutator table; + private static Admin admin; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @SuppressWarnings("deprecation") + @Before + public void setUp() throws Exception { + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setMobEnabled(true); + hcd.setMobThreshold(3L); + hcd.setMaxVersions(4); + desc.addFamily(hcd); + + admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc); - table = new HTable(TEST_UTIL.getConfiguration(), tableName); ++ table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()) ++ .getBufferedMutator(TableName.valueOf(tableName)); + } + + @After + public void tearDown() throws Exception { + admin.disableTable(TableName.valueOf(tableName)); + admin.deleteTable(TableName.valueOf(tableName)); + admin.close(); + } + + private List<String> getKeyFromSequenceFile(FileSystem fs, Path path, + Configuration conf) throws Exception { + List<String> list = new ArrayList<String>(); + SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path)); + + String next = (String) reader.next((String) null); + while (next != null) { + list.add(next); + next = (String) reader.next((String) null); + } + reader.close(); + return list; + } + + @Test + public void testRun() throws Exception { + + TableName tn = TableName.valueOf(tableName); + byte[] mobValueBytes = new byte[100]; + + //get the path where mob files lie in + Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, family); + + Put put = new Put(Bytes.toBytes(row)); - put.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes); ++ put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes); + Put put2 = new Put(Bytes.toBytes(row + "ignore")); - put2.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes); - table.put(put); - table.put(put2); - table.flushCommits(); ++ put2.addColumn(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes); ++ table.mutate(put); ++ table.mutate(put2); ++ table.flush(); + admin.flush(tn); + + FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + //check the generation of a mob file + assertEquals(1, fileStatuses.length); + + String mobFile1 = fileStatuses[0].getPath().getName(); + + Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); + configuration.setFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, 0.6f); + configuration.setStrings(TableInputFormat.INPUT_TABLE, tableName); + configuration.setStrings(TableInputFormat.SCAN_COLUMN_FAMILY, family); + configuration.setStrings(SweepJob.WORKING_VISITED_DIR_KEY, "jobWorkingNamesDir"); + configuration.setStrings(SweepJob.WORKING_FILES_DIR_KEY, "compactionFileDir"); + configuration.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, + JavaSerialization.class.getName()); + configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, "compactionVisitedDir"); + configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, + System.currentTimeMillis() + 24 * 3600 * 1000); + + ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable()); + TableName lockName = MobUtils.getTableLockName(tn); + String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString()); + configuration.set(SweepJob.SWEEP_JOB_ID, "1"); + configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode); + ServerName serverName = SweepJob.getCurrentServerName(configuration); + configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString()); + + TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw, + serverName); + TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool"); + lock.acquire(); + try { + // use the same counter when mocking + Counter counter = new GenericCounter(); + Reducer<Text, KeyValue, Writable, Writable>.Context ctx = mock(Reducer.Context.class); + when(ctx.getConfiguration()).thenReturn(configuration); + when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter); + when(ctx.nextKey()).thenReturn(true).thenReturn(false); + when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1)); + + byte[] refBytes = Bytes.toBytes(mobFile1); + long valueLength = refBytes.length; + byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes); + KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qf), 1, + KeyValue.Type.Put, newValue); + List<KeyValue> list = new ArrayList<KeyValue>(); + list.add(kv2); + + when(ctx.getValues()).thenReturn(list); + + SweepReducer reducer = new SweepReducer(); + reducer.run(ctx); + } finally { + lock.release(); + } + FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + String mobFile2 = filsStatuses2[0].getPath().getName(); + //new mob file is generated, old one has been archived + assertEquals(1, filsStatuses2.length); + assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1)); + + //test sequence file + String workingPath = configuration.get(SweepJob.WORKING_VISITED_DIR_KEY); + FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath)); + Set<String> files = new TreeSet<String>(); + for (FileStatus st : statuses) { + files.addAll(getKeyFromSequenceFile(TEST_UTIL.getTestFileSystem(), + st.getPath(), configuration)); + } + assertEquals(1, files.size()); + assertEquals(true, files.contains(mobFile1)); + } +}
