http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java index 9a8b7d9,0000000..4bf1623 mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java @@@ -1,652 -1,0 +1,652 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.filecompactions; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; - import org.apache.hadoop.hbase.LargeTests; ++import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestMobFileCompactor { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private Configuration conf = null; + private String tableNameAsString; + private TableName tableName; + private static HTable hTable; + private static Admin admin; + private static HTableDescriptor desc; + private static HColumnDescriptor hcd1; + private static HColumnDescriptor hcd2; + private static FileSystem fs; + private final static String family1 = "family1"; + private final static String family2 = "family2"; + private final static String qf1 = "qualifier1"; + private final static String qf2 = "qualifier2"; + private static byte[] KEYS = Bytes.toBytes("012"); + private static int regionNum = KEYS.length; + private static int delRowNum = 1; + private static int delCellNum = 6; + private static int cellNumPerRow = 3; + private static int rowNumPerFile = 2; + private static ExecutorService pool; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + TEST_UTIL.startMiniCluster(1); + pool = createThreadPool(TEST_UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + pool.shutdown(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + fs = TEST_UTIL.getTestFileSystem(); + conf = TEST_UTIL.getConfiguration(); + long tid = System.currentTimeMillis(); + tableNameAsString = "testMob" + tid; + tableName = TableName.valueOf(tableNameAsString); + hcd1 = new HColumnDescriptor(family1); + hcd1.setMobEnabled(true); + hcd1.setMobThreshold(0L); + hcd1.setMaxVersions(4); + hcd2 = new HColumnDescriptor(family2); + hcd2.setMobEnabled(true); + hcd2.setMobThreshold(0L); + hcd2.setMaxVersions(4); + desc = new HTableDescriptor(tableName); + desc.addFamily(hcd1); + desc.addFamily(hcd2); + admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc, getSplitKeys()); + hTable = new HTable(conf, tableNameAsString); + hTable.setAutoFlush(false, false); + } + + @After + public void tearDown() throws Exception { + admin.disableTable(tableName); + admin.deleteTable(tableName); + admin.close(); + hTable.close(); + fs.delete(TEST_UTIL.getDataTestDir(), true); + } + + @Test + public void testCompactionWithoutDelFiles() throws Exception { + resetConf(); + int count = 4; + // generate mob files + loadData(count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before compaction: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before compaction: mob file count", regionNum*count, countFiles(true, family1)); + assertEquals("Before compaction: del file count", 0, countFiles(false, family1)); + + MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After compaction: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("After compaction: mob file count", regionNum, countFiles(true, family1)); + assertEquals("After compaction: del file count", 0, countFiles(false, family1)); + } + + @Test + public void testCompactionWithDelFiles() throws Exception { + resetConf(); + int count = 4; + // generate mob files + loadData(count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion, + countMobCells(hTable)); + assertEquals("Before deleting: family1 mob file count", regionNum*count, + countFiles(true, family1)); + assertEquals("Before deleting: family2 mob file count", regionNum*count, + countFiles(true, family2)); + + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(true, family1)); + assertEquals("Before compaction: family2 file count", regionNum*count, + countFiles(true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(false, family2)); + + // do the mob file compaction + MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("After compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("After compaction: family1 mob file count", regionNum, + countFiles(true, family1)); + assertEquals("After compaction: family2 mob file count", regionNum*count, + countFiles(true, family2)); + assertEquals("After compaction: family1 del file count", 0, countFiles(false, family1)); + assertEquals("After compaction: family2 del file count", regionNum, + countFiles(false, family2)); + assertRefFileNameEqual(family1); + } + + private void assertRefFileNameEqual(String familyName) throws IOException { + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes(familyName)); + // Do not retrieve the mob data when scanning + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + ResultScanner results = hTable.getScanner(scan); + Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(), + tableName), familyName); + List<Path> actualFilePaths = new ArrayList<>(); + List<Path> expectFilePaths = new ArrayList<>(); + for (Result res : results) { + for (Cell cell : res.listCells()) { + byte[] referenceValue = CellUtil.cloneValue(cell); + String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT, + referenceValue.length - Bytes.SIZEOF_INT); + Path targetPath = new Path(mobFamilyPath, fileName); + if(!actualFilePaths.contains(targetPath)) { + actualFilePaths.add(targetPath); + } + } + } + results.close(); + if (fs.exists(mobFamilyPath)) { + FileStatus[] files = fs.listStatus(mobFamilyPath); + for (FileStatus file : files) { + if (!StoreFileInfo.isDelFile(file.getPath())) { + expectFilePaths.add(file.getPath()); + } + } + } + Collections.sort(actualFilePaths); + Collections.sort(expectFilePaths); + assertEquals(expectFilePaths, actualFilePaths); + } + + @Test + public void testCompactionWithDelFilesAndNotMergeAllFiles() throws Exception { + resetConf(); + int mergeSize = 5000; + // change the mob compaction merge size + conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); + + int count = 4; + // generate mob files + loadData(count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion, + countMobCells(hTable)); + assertEquals("Before deleting: mob file count", regionNum*count, countFiles(true, family1)); + + int largeFilesCount = countLargeFiles(mergeSize, family1); + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(true, family1)); + assertEquals("Before compaction: family2 mob file count", regionNum*count, + countFiles(true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(false, family2)); + + // do the mob file compaction + MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("After compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + // After the compaction, the files smaller than the mob compaction merge size + // is merge to one file + assertEquals("After compaction: family1 mob file count", largeFilesCount + regionNum, + countFiles(true, family1)); + assertEquals("After compaction: family2 mob file count", regionNum*count, + countFiles(true, family2)); + assertEquals("After compaction: family1 del file count", regionNum, + countFiles(false, family1)); + assertEquals("After compaction: family2 del file count", regionNum, + countFiles(false, family2)); + } + + @Test + public void testCompactionWithDelFilesAndWithSmallCompactionBatchSize() throws Exception { + resetConf(); + int batchSize = 2; + conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, batchSize); + int count = 4; + // generate mob files + loadData(count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + assertEquals("Before deleting: mob row count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("Before deleting: family1 mob file count", regionNum*count, + countFiles(true, family1)); + assertEquals("Before deleting: family2 mob file count", regionNum*count, + countFiles(true, family2)); + + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(true, family1)); + assertEquals("Before compaction: family2 mob file count", regionNum*count, + countFiles(true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(false, family2)); + + // do the mob file compaction + MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("After compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("After compaction: family1 mob file count", regionNum*(count/batchSize), + countFiles(true, family1)); + assertEquals("After compaction: family2 mob file count", regionNum*count, + countFiles(true, family2)); + assertEquals("After compaction: family1 del file count", 0, countFiles(false, family1)); + assertEquals("After compaction: family2 del file count", regionNum, + countFiles(false, family2)); + } + + @Test + public void testCompactionWithHFileLink() throws IOException, InterruptedException { + resetConf(); + int count = 4; + // generate mob files + loadData(count, rowNumPerFile); + int rowNumPerRegion = count*rowNumPerFile; + + long tid = System.currentTimeMillis(); + byte[] snapshotName1 = Bytes.toBytes("snaptb-" + tid); + // take a snapshot + admin.snapshot(snapshotName1, tableName); + + createDelFile(); + + assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("Before compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("Before compaction: family1 mob file count", regionNum*count, + countFiles(true, family1)); + assertEquals("Before compaction: family2 mob file count", regionNum*count, + countFiles(true, family2)); + assertEquals("Before compaction: family1 del file count", regionNum, + countFiles(false, family1)); + assertEquals("Before compaction: family2 del file count", regionNum, + countFiles(false, family2)); + + // do the mob file compaction + MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool); + compactor.compact(); + + assertEquals("After first compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), + countMobRows(hTable)); + assertEquals("After first compaction: mob cells count", + regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable)); + assertEquals("After first compaction: family1 mob file count", regionNum, + countFiles(true, family1)); + assertEquals("After first compaction: family2 mob file count", regionNum*count, + countFiles(true, family2)); + assertEquals("After first compaction: family1 del file count", 0, countFiles(false, family1)); + assertEquals("After first compaction: family2 del file count", regionNum, + countFiles(false, family2)); + assertEquals("After first compaction: family1 hfilelink count", 0, countHFileLinks(family1)); + assertEquals("After first compaction: family2 hfilelink count", 0, countHFileLinks(family2)); + + admin.disableTable(tableName); + // Restore from snapshot, the hfilelink will exist in mob dir + admin.restoreSnapshot(snapshotName1); + admin.enableTable(tableName); + + assertEquals("After restoring snapshot: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("After restoring snapshot: mob cells count", + regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable)); + assertEquals("After restoring snapshot: family1 mob file count", regionNum*count, + countFiles(true, family1)); + assertEquals("After restoring snapshot: family2 mob file count", regionNum*count, + countFiles(true, family2)); + assertEquals("After restoring snapshot: family1 del file count", 0, + countFiles(false, family1)); + assertEquals("After restoring snapshot: family2 del file count", 0, + countFiles(false, family2)); + assertEquals("After restoring snapshot: family1 hfilelink count", regionNum*count, + countHFileLinks(family1)); + assertEquals("After restoring snapshot: family2 hfilelink count", 0, + countHFileLinks(family2)); + + compactor.compact(); + + assertEquals("After second compaction: mob rows count", regionNum*rowNumPerRegion, + countMobRows(hTable)); + assertEquals("After second compaction: mob cells count", + regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable)); + assertEquals("After second compaction: family1 mob file count", regionNum, + countFiles(true, family1)); + assertEquals("After second compaction: family2 mob file count", regionNum*count, + countFiles(true, family2)); + assertEquals("After second compaction: family1 del file count", 0, countFiles(false, family1)); + assertEquals("After second compaction: family2 del file count", 0, countFiles(false, family2)); + assertEquals("After second compaction: family1 hfilelink count", 0, countHFileLinks(family1)); + assertEquals("After second compaction: family2 hfilelink count", 0, countHFileLinks(family2)); + } + + /** + * Gets the number of rows in the given table. + * @param table to get the scanner + * @return the number of rows + */ + private int countMobRows(final HTable table) throws IOException { + Scan scan = new Scan(); + // Do not retrieve the mob data when scanning + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + ResultScanner results = table.getScanner(scan); + int count = 0; + for (Result res : results) { + count++; + } + results.close(); + return count; + } + + /** + * Gets the number of cells in the given table. + * @param table to get the scanner + * @return the number of cells + */ + private int countMobCells(final HTable table) throws IOException { + Scan scan = new Scan(); + // Do not retrieve the mob data when scanning + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + ResultScanner results = table.getScanner(scan); + int count = 0; + for (Result res : results) { + for (Cell cell : res.listCells()) { + count++; + } + } + results.close(); + return count; + } + + /** + * Gets the number of files in the mob path. + * @param isMobFile gets number of the mob files or del files + * @param familyName the family name + * @return the number of the files + */ + private int countFiles(boolean isMobFile, String familyName) throws IOException { + Path mobDirPath = MobUtils.getMobFamilyPath( + MobUtils.getMobRegionPath(conf, tableName), familyName); + int count = 0; + if (fs.exists(mobDirPath)) { + FileStatus[] files = fs.listStatus(mobDirPath); + for (FileStatus file : files) { + if (isMobFile == true) { + if (!StoreFileInfo.isDelFile(file.getPath())) { + count++; + } + } else { + if (StoreFileInfo.isDelFile(file.getPath())) { + count++; + } + } + } + } + return count; + } + + /** + * Gets the number of HFileLink in the mob path. + * @param familyName the family name + * @return the number of the HFileLink + */ + private int countHFileLinks(String familyName) throws IOException { + Path mobDirPath = MobUtils.getMobFamilyPath( + MobUtils.getMobRegionPath(conf, tableName), familyName); + int count = 0; + if (fs.exists(mobDirPath)) { + FileStatus[] files = fs.listStatus(mobDirPath); + for (FileStatus file : files) { + if (HFileLink.isHFileLink(file.getPath())) { + count++; + } + } + } + return count; + } + + /** + * Gets the number of files. + * @param size the size of the file + * @param familyName the family name + * @return the number of files large than the size + */ + private int countLargeFiles(int size, String familyName) throws IOException { + Path mobDirPath = MobUtils.getMobFamilyPath( + MobUtils.getMobRegionPath(conf, tableName), familyName); + int count = 0; + if (fs.exists(mobDirPath)) { + FileStatus[] files = fs.listStatus(mobDirPath); + for (FileStatus file : files) { + // ignore the del files in the mob path + if ((!StoreFileInfo.isDelFile(file.getPath())) + && (file.getLen() > size)) { + count++; + } + } + } + return count; + } + + /** + * loads some data to the table. + * @param count the mob file number + */ + private void loadData(int fileNum, int rowNumPerFile) throws IOException, + InterruptedException { + if (fileNum <= 0) { + throw new IllegalArgumentException(); + } + for (byte k0 : KEYS) { + byte[] k = new byte[] { k0 }; + for (int i = 0; i < fileNum * rowNumPerFile; i++) { + byte[] key = Bytes.add(k, Bytes.toBytes(i)); + byte[] mobVal = makeDummyData(10 * (i + 1)); + Put put = new Put(key); + put.setDurability(Durability.SKIP_WAL); + put.add(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal); + put.add(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal); + put.add(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal); + hTable.put(put); + if ((i + 1) % rowNumPerFile == 0) { + hTable.flushCommits(); + admin.flush(tableName); + } + } + } + } + + /** + * delete the row, family and cell to create the del file + */ + private void createDelFile() throws IOException, InterruptedException { + for (byte k0 : KEYS) { + byte[] k = new byte[] { k0 }; + // delete a family + byte[] key1 = Bytes.add(k, Bytes.toBytes(0)); + Delete delete1 = new Delete(key1); + delete1.deleteFamily(Bytes.toBytes(family1)); + hTable.delete(delete1); + // delete one row + byte[] key2 = Bytes.add(k, Bytes.toBytes(2)); + Delete delete2 = new Delete(key2); + hTable.delete(delete2); + // delete one cell + byte[] key3 = Bytes.add(k, Bytes.toBytes(4)); + Delete delete3 = new Delete(key3); + delete3.deleteColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1)); + hTable.delete(delete3); + hTable.flushCommits(); + admin.flush(tableName); + List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions( + Bytes.toBytes(tableNameAsString)); + for (HRegion region : regions) { + region.waitForFlushesAndCompactions(); + region.compactStores(true); + } + } + } + /** + * Creates the dummy data with a specific size. + * @param the size of data + * @return the dummy data + */ + private byte[] makeDummyData(int size) { + byte[] dummyData = new byte[size]; + new Random().nextBytes(dummyData); + return dummyData; + } + + /** + * Gets the split keys + */ + public static byte[][] getSplitKeys() { + byte[][] splitKeys = new byte[KEYS.length - 1][]; + for (int i = 0; i < splitKeys.length; ++i) { + splitKeys[i] = new byte[] { KEYS[i + 1] }; + } + return splitKeys; + } + + private static ExecutorService createThreadPool(Configuration conf) { + int maxThreads = 10; + long keepAliveTime = 60; + final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>(); + ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, + keepAliveTime, TimeUnit.SECONDS, queue, + Threads.newDaemonThreadFactory("MobFileCompactionChore"), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // waiting for a thread to pick up instead of throwing exceptions. + queue.put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException(e); + } + } + }); + ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); + return pool; + } + + /** + * Resets the configuration. + */ + private void resetConf() { + conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD); + conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE); + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactionRequest.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactionRequest.java index ac66d95,0000000..f9159aa mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactionRequest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactionRequest.java @@@ -1,60 -1,0 +1,60 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.filecompactions; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; - import org.apache.hadoop.hbase.SmallTests; ++import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition; +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartitionId; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestPartitionedMobFileCompactionRequest { + + @Test + public void testCompactedPartitionId() { + String startKey1 = "startKey1"; + String startKey2 = "startKey2"; + String date1 = "date1"; + String date2 = "date2"; + CompactionPartitionId partitionId1 = new CompactionPartitionId(startKey1, date1); + CompactionPartitionId partitionId2 = new CompactionPartitionId(startKey2, date2); + CompactionPartitionId partitionId3 = new CompactionPartitionId(startKey1, date2); + + Assert.assertTrue(partitionId1.equals(partitionId1)); + Assert.assertFalse(partitionId1.equals(partitionId2)); + Assert.assertFalse(partitionId1.equals(partitionId3)); + Assert.assertFalse(partitionId2.equals(partitionId3)); + + Assert.assertEquals(startKey1, partitionId1.getStartKey()); + Assert.assertEquals(date1, partitionId1.getDate()); + } + + @Test + public void testCompactedPartition() { + CompactionPartitionId partitionId = new CompactionPartitionId("startKey1", "date1"); + CompactionPartition partition = new CompactionPartition(partitionId); + FileStatus file = new FileStatus(1, false, 1, 1024, 1, new Path("/test")); + partition.addFile(file); + Assert.assertEquals(file, partition.listFiles().get(0)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java index 1d64c0c,0000000..12c88b2 mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java @@@ -1,423 -1,0 +1,423 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.filecompactions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; - import org.apache.hadoop.hbase.LargeTests; ++import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType; +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.ScanInfo; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestPartitionedMobFileCompactor { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static String family = "family"; + private final static String qf = "qf"; + private HColumnDescriptor hcd = new HColumnDescriptor(family); + private Configuration conf = TEST_UTIL.getConfiguration(); + private CacheConfig cacheConf = new CacheConfig(conf); + private FileSystem fs; + private List<FileStatus> mobFiles = new ArrayList<>(); + private List<FileStatus> delFiles = new ArrayList<>(); + private List<FileStatus> allFiles = new ArrayList<>(); + private Path basePath; + private String mobSuffix; + private String delSuffix; + private static ExecutorService pool; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + TEST_UTIL.startMiniCluster(1); + pool = createThreadPool(TEST_UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + pool.shutdown(); + TEST_UTIL.shutdownMiniCluster(); + } + + private void init(String tableName) throws Exception { + fs = FileSystem.get(conf); + Path testDir = FSUtils.getRootDir(conf); + Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); + basePath = new Path(new Path(mobTestDir, tableName), family); + mobSuffix = UUID.randomUUID().toString().replaceAll("-", ""); + delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del"; + } + + @Test + public void testCompactionSelectWithAllFiles() throws Exception { + resetConf(); + String tableName = "testCompactionSelectWithAllFiles"; + init(tableName); + int count = 10; + // create 10 mob files. + createStoreFiles(basePath, family, qf, count, Type.Put); + // create 10 del files + createStoreFiles(basePath, family, qf, count, Type.Delete); + listFiles(); + long mergeSize = MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD; + List<String> expectedStartKeys = new ArrayList<>(); + for(FileStatus file : mobFiles) { + if(file.getLen() < mergeSize) { + String fileName = file.getPath().getName(); + String startKey = fileName.substring(0, 32); + expectedStartKeys.add(startKey); + } + } + testSelectFiles(tableName, CompactionType.ALL_FILES, expectedStartKeys); + } + + @Test + public void testCompactionSelectWithPartFiles() throws Exception { + resetConf(); + String tableName = "testCompactionSelectWithPartFiles"; + init(tableName); + int count = 10; + // create 10 mob files. + createStoreFiles(basePath, family, qf, count, Type.Put); + // create 10 del files + createStoreFiles(basePath, family, qf, count, Type.Delete); + listFiles(); + long mergeSize = 4000; + List<String> expectedStartKeys = new ArrayList<>(); + for(FileStatus file : mobFiles) { + if(file.getLen() < 4000) { + String fileName = file.getPath().getName(); + String startKey = fileName.substring(0, 32); + expectedStartKeys.add(startKey); + } + } + // set the mob file compaction mergeable threshold + conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); + testSelectFiles(tableName, CompactionType.PART_FILES, expectedStartKeys); + } + + @Test + public void testCompactDelFilesWithDefaultBatchSize() throws Exception { + resetConf(); + String tableName = "testCompactDelFilesWithDefaultBatchSize"; + init(tableName); + // create 20 mob files. + createStoreFiles(basePath, family, qf, 20, Type.Put); + // create 13 del files + createStoreFiles(basePath, family, qf, 13, Type.Delete); + listFiles(); + testCompactDelFiles(tableName, 1, 13); + } + + @Test + public void testCompactDelFilesWithSmallBatchSize() throws Exception { + resetConf(); + String tableName = "testCompactDelFilesWithSmallBatchSize"; + init(tableName); + // create 20 mob files. + createStoreFiles(basePath, family, qf, 20, Type.Put); + // create 13 del files + createStoreFiles(basePath, family, qf, 13, Type.Delete); + listFiles(); + + // set the mob file compaction batch size + conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 4); + testCompactDelFiles(tableName, 1, 13); + } + + @Test + public void testCompactDelFilesChangeMaxDelFileCount() throws Exception { + resetConf(); + String tableName = "testCompactDelFilesWithSmallBatchSize"; + init(tableName); + // create 20 mob files. + createStoreFiles(basePath, family, qf, 20, Type.Put); + // create 13 del files + createStoreFiles(basePath, family, qf, 13, Type.Delete); + listFiles(); + + // set the max del file count + conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, 5); + // set the mob file compaction batch size + conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 2); + testCompactDelFiles(tableName, 4, 13); + } + + /** + * Tests the selectFiles + * @param tableName the table name + * @param type the expected compaction type + * @param expected the expected start keys + */ + private void testSelectFiles(String tableName, final CompactionType type, + final List<String> expected) throws IOException { + PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, + TableName.valueOf(tableName), hcd, pool) { + @Override + public List<Path> compact(List<FileStatus> files) throws IOException { + if (files == null || files.isEmpty()) { + return null; + } + PartitionedMobFileCompactionRequest request = select(files); + // assert the compaction type is ALL_FILES + Assert.assertEquals(type, request.type); + // assert get the right partitions + compareCompactedPartitions(expected, request.compactionPartitions); + // assert get the right del files + compareDelFiles(request.delFiles); + return null; + } + }; + compactor.compact(allFiles); + } + + /** + * Tests the compacteDelFile + * @param tableName the table name + * @param expectedFileCount the expected file count + * @param expectedCellCount the expected cell count + */ + private void testCompactDelFiles(String tableName, final int expectedFileCount, + final int expectedCellCount) throws IOException { + PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, + TableName.valueOf(tableName), hcd, pool) { + @Override + protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request) + throws IOException { + List<Path> delFilePaths = new ArrayList<Path>(); + for (FileStatus delFile : request.delFiles) { + delFilePaths.add(delFile.getPath()); + } + List<Path> newDelPaths = compactDelFiles(request, delFilePaths); + // assert the del files are merged. + Assert.assertEquals(expectedFileCount, newDelPaths.size()); + Assert.assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths)); + return null; + } + }; + + compactor.compact(allFiles); + } + + /** + * Lists the files in the path + */ + private void listFiles() throws IOException { + for (FileStatus file : fs.listStatus(basePath)) { + allFiles.add(file); + if (file.getPath().getName().endsWith("_del")) { + delFiles.add(file); + } else { + mobFiles.add(file); + } + } + } + + /** + * Compares the compacted partitions. + * @param partitions the collection of CompactedPartitions + */ + private void compareCompactedPartitions(List<String> expected, + Collection<CompactionPartition> partitions) { + List<String> actualKeys = new ArrayList<>(); + for (CompactionPartition partition : partitions) { + actualKeys.add(partition.getPartitionId().getStartKey()); + } + Collections.sort(expected); + Collections.sort(actualKeys); + Assert.assertEquals(expected.size(), actualKeys.size()); + for (int i = 0; i < expected.size(); i++) { + Assert.assertEquals(expected.get(i), actualKeys.get(i)); + } + } + + /** + * Compares the del files. + * @param allDelFiles all the del files + */ + private void compareDelFiles(Collection<FileStatus> allDelFiles) { + int i = 0; + for (FileStatus file : allDelFiles) { + Assert.assertEquals(delFiles.get(i), file); + i++; + } + } + + /** + * Creates store files. + * @param basePath the path to create file + * @family the family name + * @qualifier the column qualifier + * @count the store file number + * @type the key type + */ + private void createStoreFiles(Path basePath, String family, String qualifier, int count, + Type type) throws IOException { + HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); + String startKey = "row_"; + MobFileName mobFileName = null; + for (int i = 0; i < count; i++) { + byte[] startRow = Bytes.toBytes(startKey + i) ; + if(type.equals(Type.Delete)) { + mobFileName = MobFileName.create(startRow, MobUtils.formatDate( + new Date()), delSuffix); + } + if(type.equals(Type.Put)){ + mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate( + new Date()), mobSuffix); + } + StoreFile.Writer mobFileWriter = new StoreFile.WriterBuilder(conf, cacheConf, fs) + .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build(); + writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier), + type, (i+1)*1000); + } + } + + /** + * Writes data to store file. + * @param writer the store file writer + * @param row the row key + * @param family the family name + * @param qualifier the column qualifier + * @param type the key type + * @param size the size of value + */ + private static void writeStoreFile(final StoreFile.Writer writer, byte[]row, byte[] family, + byte[] qualifier, Type type, int size) throws IOException { + long now = System.currentTimeMillis(); + try { + byte[] dummyData = new byte[size]; + new Random().nextBytes(dummyData); + writer.append(new KeyValue(row, family, qualifier, now, type, dummyData)); + } finally { + writer.close(); + } + } + + /** + * Gets the number of del cell in the del files + * @param paths the del file paths + * @return the cell size + */ + private int countDelCellsInDelFiles(List<Path> paths) throws IOException { + List<StoreFile> sfs = new ArrayList<StoreFile>(); + int size = 0; + for(Path path : paths) { + StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE); + sfs.add(sf); + } + List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, + false, null, HConstants.LATEST_TIMESTAMP); + Scan scan = new Scan(); + scan.setMaxVersions(hcd.getMaxVersions()); + long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); + long ttl = HStore.determineTTLFromFamily(hcd); + ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR); + StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_RETAIN_DELETES, null, + scanners, 0L, HConstants.LATEST_TIMESTAMP); + List<Cell> results = new ArrayList<>(); + boolean hasMore = true; + while (hasMore) { + hasMore = scanner.next(results); + size += results.size(); + results.clear(); + } + scanner.close(); + return size; + } + + private static ExecutorService createThreadPool(Configuration conf) { + int maxThreads = 10; + long keepAliveTime = 60; + final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>(); + ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, + TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // waiting for a thread to pick up instead of throwing exceptions. + queue.put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException(e); + } + } + }); + ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); + return pool; + } + + /** + * Resets the configuration. + */ + private void resetConf() { + conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD); + conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); + conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java index e0b9a83,0000000..49345e4 mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java @@@ -1,168 -1,0 +1,168 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; - import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; ++import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.serializer.JavaSerialization; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestMobSweepJob { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + TEST_UTIL.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, + JavaSerialization.class.getName() + "," + WritableSerialization.class.getName()); + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private void writeFileNames(FileSystem fs, Configuration conf, Path path, + String[] filesNames) throws IOException { + // write the names to a sequence file + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, + String.class, String.class); + try { + for (String fileName : filesNames) { + writer.append(fileName, MobConstants.EMPTY_STRING); + } + } finally { + IOUtils.closeStream(writer); + } + } + + @Test + public void testSweeperJobWithOutUnusedFile() throws Exception { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Configuration configuration = new Configuration( + TEST_UTIL.getConfiguration()); + Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/0/visited"); + Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/0/all"); + configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, + vistiedFileNamesPath.toString()); + configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY, + allFileNamesPath.toString()); + + writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1", + "2", "3", "4", "5", "6"}); + + Path r0 = new Path(vistiedFileNamesPath, "r0"); + writeFileNames(fs, configuration, r0, new String[] { "1", + "2", "3"}); + Path r1 = new Path(vistiedFileNamesPath, "r1"); + writeFileNames(fs, configuration, r1, new String[] { "1", "4", "5"}); + Path r2 = new Path(vistiedFileNamesPath, "r2"); + writeFileNames(fs, configuration, r2, new String[] { "2", "3", "6"}); + + SweepJob sweepJob = new SweepJob(configuration, fs); + List<String> toBeArchived = sweepJob.getUnusedFiles(configuration); + + assertEquals(0, toBeArchived.size()); + } + + @Test + public void testSweeperJobWithUnusedFile() throws Exception { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Configuration configuration = new Configuration( + TEST_UTIL.getConfiguration()); + Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/1/visited"); + Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/1/all"); + configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, + vistiedFileNamesPath.toString()); + configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY, + allFileNamesPath.toString()); + + writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1", + "2", "3", "4", "5", "6"}); + + Path r0 = new Path(vistiedFileNamesPath, "r0"); + writeFileNames(fs, configuration, r0, new String[] { "1", + "2", "3"}); + Path r1 = new Path(vistiedFileNamesPath, "r1"); + writeFileNames(fs, configuration, r1, new String[] { "1", "5"}); + Path r2 = new Path(vistiedFileNamesPath, "r2"); + writeFileNames(fs, configuration, r2, new String[] { "2", "3"}); + + SweepJob sweepJob = new SweepJob(configuration, fs); + List<String> toBeArchived = sweepJob.getUnusedFiles(configuration); + + assertEquals(2, toBeArchived.size()); + assertEquals(new String[] { "4", "6" }, toBeArchived.toArray(new String[0])); + } + + @Test + public void testSweeperJobWithRedundantFile() throws Exception { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Configuration configuration = new Configuration( + TEST_UTIL.getConfiguration()); + Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/2/visited"); + Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration), + "/hbase/mobcompaction/SweepJob/working/names/2/all"); + configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, + vistiedFileNamesPath.toString()); + configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY, + allFileNamesPath.toString()); + + writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1", + "2", "3", "4", "5", "6"}); + + Path r0 = new Path(vistiedFileNamesPath, "r0"); + writeFileNames(fs, configuration, r0, new String[] { "1", + "2", "3"}); + Path r1 = new Path(vistiedFileNamesPath, "r1"); + writeFileNames(fs, configuration, r1, new String[] { "1", "5", "6", "7"}); + Path r2 = new Path(vistiedFileNamesPath, "r2"); + writeFileNames(fs, configuration, r2, new String[] { "2", "3", "4"}); + + SweepJob sweepJob = new SweepJob(configuration, fs); + List<String> toBeArchived = sweepJob.getUnusedFiles(configuration); + + assertEquals(0, toBeArchived.size()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java index 2aa3a4a,0000000..9e95a39 mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java @@@ -1,120 -1,0 +1,120 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; - import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.TableName; ++import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +@Category(SmallTests.class) +public class TestMobSweepMapper { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void TestMap() throws Exception { + String prefix = "0000"; + final String fileName = "19691231f2cd014ea28f42788214560a21a44cef"; + final String mobFilePath = prefix + fileName; + + ImmutableBytesWritable r = new ImmutableBytesWritable(Bytes.toBytes("r")); + final KeyValue[] kvList = new KeyValue[1]; + kvList[0] = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), + Bytes.toBytes("column"), Bytes.toBytes(mobFilePath)); + + Result columns = mock(Result.class); - when(columns.raw()).thenReturn(kvList); ++ when(columns.rawCells()).thenReturn(kvList); + + Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable()); + TableName tn = TableName.valueOf("testSweepMapper"); + TableName lockName = MobUtils.getTableLockName(tn); + String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString()); + configuration.set(SweepJob.SWEEP_JOB_ID, "1"); + configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode); + ServerName serverName = SweepJob.getCurrentServerName(configuration); + configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString()); + + TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw, + serverName); + TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool"); + lock.acquire(); + try { + Mapper<ImmutableBytesWritable, Result, Text, KeyValue>.Context ctx = + mock(Mapper.Context.class); + when(ctx.getConfiguration()).thenReturn(configuration); + SweepMapper map = new SweepMapper(); + doAnswer(new Answer<Void>() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Text text = (Text) invocation.getArguments()[0]; + KeyValue kv = (KeyValue) invocation.getArguments()[1]; + + assertEquals(Bytes.toString(text.getBytes(), 0, text.getLength()), fileName); + assertEquals(0, Bytes.compareTo(kv.getKey(), kvList[0].getKey())); + + return null; + } + }).when(ctx).write(any(Text.class), any(KeyValue.class)); + + map.map(r, columns, ctx); + } finally { + lock.release(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java index 1a69d06,0000000..308b50e mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java @@@ -1,220 -1,0 +1,220 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; - import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable; +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter; ++import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.serializer.JavaSerialization; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.counters.GenericCounter; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Matchers; + +@Category(MediumTests.class) +public class TestMobSweepReducer { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static String tableName = "testSweepReducer"; + private final static String row = "row"; + private final static String family = "family"; + private final static String qf = "qf"; + private static HTable table; + private static Admin admin; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @SuppressWarnings("deprecation") + @Before + public void setUp() throws Exception { + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setMobEnabled(true); + hcd.setMobThreshold(3L); + hcd.setMaxVersions(4); + desc.addFamily(hcd); + + admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc); + table = new HTable(TEST_UTIL.getConfiguration(), tableName); + } + + @After + public void tearDown() throws Exception { + admin.disableTable(TableName.valueOf(tableName)); + admin.deleteTable(TableName.valueOf(tableName)); + admin.close(); + } + + private List<String> getKeyFromSequenceFile(FileSystem fs, Path path, + Configuration conf) throws Exception { + List<String> list = new ArrayList<String>(); + SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path)); + + String next = (String) reader.next((String) null); + while (next != null) { + list.add(next); + next = (String) reader.next((String) null); + } + reader.close(); + return list; + } + + @Test + public void testRun() throws Exception { + + TableName tn = TableName.valueOf(tableName); + byte[] mobValueBytes = new byte[100]; + + //get the path where mob files lie in + Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, family); + + Put put = new Put(Bytes.toBytes(row)); + put.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes); + Put put2 = new Put(Bytes.toBytes(row + "ignore")); + put2.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes); + table.put(put); + table.put(put2); + table.flushCommits(); + admin.flush(tn); + + FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + //check the generation of a mob file + assertEquals(1, fileStatuses.length); + + String mobFile1 = fileStatuses[0].getPath().getName(); + + Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); + configuration.setFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, 0.6f); + configuration.setStrings(TableInputFormat.INPUT_TABLE, tableName); + configuration.setStrings(TableInputFormat.SCAN_COLUMN_FAMILY, family); + configuration.setStrings(SweepJob.WORKING_VISITED_DIR_KEY, "jobWorkingNamesDir"); + configuration.setStrings(SweepJob.WORKING_FILES_DIR_KEY, "compactionFileDir"); + configuration.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, + JavaSerialization.class.getName()); + configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, "compactionVisitedDir"); + configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, + System.currentTimeMillis() + 24 * 3600 * 1000); + + ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable()); + TableName lockName = MobUtils.getTableLockName(tn); + String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString()); + configuration.set(SweepJob.SWEEP_JOB_ID, "1"); + configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode); + ServerName serverName = SweepJob.getCurrentServerName(configuration); + configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString()); + + TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw, + serverName); + TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool"); + lock.acquire(); + try { + // use the same counter when mocking + Counter counter = new GenericCounter(); + Reducer<Text, KeyValue, Writable, Writable>.Context ctx = mock(Reducer.Context.class); + when(ctx.getConfiguration()).thenReturn(configuration); + when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter); + when(ctx.nextKey()).thenReturn(true).thenReturn(false); + when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1)); + + byte[] refBytes = Bytes.toBytes(mobFile1); + long valueLength = refBytes.length; + byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes); + KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qf), 1, + KeyValue.Type.Put, newValue); + List<KeyValue> list = new ArrayList<KeyValue>(); + list.add(kv2); + + when(ctx.getValues()).thenReturn(list); + + SweepReducer reducer = new SweepReducer(); + reducer.run(ctx); + } finally { + lock.release(); + } + FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + String mobFile2 = filsStatuses2[0].getPath().getName(); + //new mob file is generated, old one has been archived + assertEquals(1, filsStatuses2.length); + assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1)); + + //test sequence file + String workingPath = configuration.get(SweepJob.WORKING_VISITED_DIR_KEY); + FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath)); + Set<String> files = new TreeSet<String>(); + for (FileStatus st : statuses) { + files.addAll(getKeyFromSequenceFile(TEST_UTIL.getTestFileSystem(), + st.getPath(), configuration)); + } + assertEquals(1, files.size()); + assertEquals(true, files.contains(mobFile1)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java index c4817aa,0000000..1689c2a mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java @@@ -1,307 -1,0 +1,307 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; - import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; ++import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.ToolRunner; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestMobSweeper { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private String tableName; + private final static String row = "row_"; + private final static String family = "family"; + private final static String column = "column"; + private static HTable table; + private static Admin admin; + + private Random random = new Random(); + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 15); // avoid major compactions + TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.max", 30); // avoid major compactions + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + + TEST_UTIL.startMiniCluster(); + + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + TEST_UTIL.shutdownMiniMapReduceCluster(); + } + + @SuppressWarnings("deprecation") + @Before + public void setUp() throws Exception { + long tid = System.currentTimeMillis(); + tableName = "testSweeper" + tid; + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setMobEnabled(true); + hcd.setMobThreshold(3L); + hcd.setMaxVersions(4); + desc.addFamily(hcd); + + admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc); + table = new HTable(TEST_UTIL.getConfiguration(), tableName); + table.setAutoFlush(false); + + } + + @After + public void tearDown() throws Exception { + admin.disableTable(TableName.valueOf(tableName)); + admin.deleteTable(TableName.valueOf(tableName)); + admin.close(); + } + + private Path getMobFamilyPath(Configuration conf, String tableNameStr, + String familyName) { + Path p = new Path(MobUtils.getMobRegionPath(conf, TableName.valueOf(tableNameStr)), + familyName); + return p; + } + + + private String mergeString(Set<String> set) { + StringBuilder sb = new StringBuilder(); + for (String s : set) + sb.append(s); + return sb.toString(); + } + + + private void generateMobTable(int count, int flushStep) + throws IOException, InterruptedException { + if (count <= 0 || flushStep <= 0) + return; + int index = 0; + for (int i = 0; i < count; i++) { + byte[] mobVal = new byte[101*1024]; + random.nextBytes(mobVal); + + Put put = new Put(Bytes.toBytes(row + i)); + put.add(Bytes.toBytes(family), Bytes.toBytes(column), mobVal); + table.put(put); + if (index++ % flushStep == 0) { + table.flushCommits(); + admin.flush(TableName.valueOf(tableName)); + } + + + } + table.flushCommits(); + admin.flush(TableName.valueOf(tableName)); + } + + @Test + public void testSweeper() throws Exception { + + int count = 10; + //create table and generate 10 mob files + generateMobTable(count, 1); + + //get mob files + Path mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family); + FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + // mobFileSet0 stores the orignal mob files + TreeSet<String> mobFilesSet = new TreeSet<String>(); + for (FileStatus status : fileStatuses) { + mobFilesSet.add(status.getPath().getName()); + } + + //scan the table, retreive the references + Scan scan = new Scan(); + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE)); + ResultScanner rs = table.getScanner(scan); + TreeSet<String> mobFilesScanned = new TreeSet<String>(); + for (Result res : rs) { + byte[] valueBytes = res.getValue(Bytes.toBytes(family), + Bytes.toBytes(column)); + mobFilesScanned.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT, + valueBytes.length - Bytes.SIZEOF_INT)); + } + + //there should be 10 mob files + assertEquals(10, mobFilesScanned.size()); + //check if we store the correct reference of mob files + assertEquals(mergeString(mobFilesSet), mergeString(mobFilesScanned)); + + + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 24 * 60 * 60 * 1000); + + String[] args = new String[2]; + args[0] = tableName; + args[1] = family; + ToolRunner.run(conf, new Sweeper(), args); + + + mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family); + fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + mobFilesSet = new TreeSet<String>(); + for (FileStatus status : fileStatuses) { + mobFilesSet.add(status.getPath().getName()); + } + + assertEquals(10, mobFilesSet.size()); + + + scan = new Scan(); + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE)); + rs = table.getScanner(scan); + TreeSet<String> mobFilesScannedAfterJob = new TreeSet<String>(); + for (Result res : rs) { + byte[] valueBytes = res.getValue(Bytes.toBytes(family), Bytes.toBytes( + column)); + mobFilesScannedAfterJob.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT, + valueBytes.length - Bytes.SIZEOF_INT)); + } + + assertEquals(10, mobFilesScannedAfterJob.size()); + + fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + mobFilesSet = new TreeSet<String>(); + for (FileStatus status : fileStatuses) { + mobFilesSet.add(status.getPath().getName()); + } + + assertEquals(10, mobFilesSet.size()); + assertEquals(true, mobFilesScannedAfterJob.iterator().next() + .equalsIgnoreCase(mobFilesSet.iterator().next())); + + } + + @Test + public void testCompactionDelaySweeper() throws Exception { + + int count = 10; + //create table and generate 10 mob files + generateMobTable(count, 1); + + //get mob files + Path mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family); + FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + // mobFileSet0 stores the orignal mob files + TreeSet<String> mobFilesSet = new TreeSet<String>(); + for (FileStatus status : fileStatuses) { + mobFilesSet.add(status.getPath().getName()); + } + + //scan the table, retreive the references + Scan scan = new Scan(); + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE)); + ResultScanner rs = table.getScanner(scan); + TreeSet<String> mobFilesScanned = new TreeSet<String>(); + for (Result res : rs) { + byte[] valueBytes = res.getValue(Bytes.toBytes(family), + Bytes.toBytes(column)); + mobFilesScanned.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT, + valueBytes.length - Bytes.SIZEOF_INT)); + } + + //there should be 10 mob files + assertEquals(10, mobFilesScanned.size()); + //check if we store the correct reference of mob files + assertEquals(mergeString(mobFilesSet), mergeString(mobFilesScanned)); + + + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 0); + + String[] args = new String[2]; + args[0] = tableName; + args[1] = family; + ToolRunner.run(conf, new Sweeper(), args); + + + mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family); + fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + mobFilesSet = new TreeSet<String>(); + for (FileStatus status : fileStatuses) { + mobFilesSet.add(status.getPath().getName()); + } + + assertEquals(1, mobFilesSet.size()); + + + scan = new Scan(); + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE)); + rs = table.getScanner(scan); + TreeSet<String> mobFilesScannedAfterJob = new TreeSet<String>(); + for (Result res : rs) { + byte[] valueBytes = res.getValue(Bytes.toBytes(family), Bytes.toBytes( + column)); + mobFilesScannedAfterJob.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT, + valueBytes.length - Bytes.SIZEOF_INT)); + } + + assertEquals(1, mobFilesScannedAfterJob.size()); + + fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); + mobFilesSet = new TreeSet<String>(); + for (FileStatus status : fileStatuses) { + mobFilesSet.add(status.getPath().getName()); + } + + assertEquals(1, mobFilesSet.size()); + assertEquals(true, mobFilesScannedAfterJob.iterator().next() + .equalsIgnoreCase(mobFilesSet.iterator().next())); + + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java index c7d146b,852d319..0d28e54 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java @@@ -242,73 -242,22 +242,92 @@@ public class MetricsRegionServerWrapper } @Override + public long getHedgedReadOps() { + return 100; + } + + @Override + public long getHedgedReadWins() { + return 10; + } + + @Override + public long getBlockedRequestsCount() { + return 0; + } + + @Override + public int getSplitQueueSize() { + return 0; + } ++ ++ @Override + public long getMobCompactedIntoMobCellsCount() { + return 20; + } + + @Override + public long getMobCompactedFromMobCellsCount() { + return 10; + } + + @Override + public long getMobCompactedIntoMobCellsSize() { + return 200; + } + + @Override + public long getMobCompactedFromMobCellsSize() { + return 100; + } + + @Override + public long getMobFlushCount() { + return 1; + } + + @Override + public long getMobFlushedCellsCount() { + return 10; + } + + @Override + public long getMobFlushedCellsSize() { + return 1000; + } + + @Override + public long getMobScanCellsCount() { + return 10; + } + + @Override + public long getMobScanCellsSize() { + return 1000; + } + + @Override + public long getMobFileCacheAccessCount() { + return 100; + } + + @Override + public long getMobFileCacheMissCount() { + return 50; + } + + @Override + public long getMobFileCacheEvictedCount() { + return 0; + } + + @Override + public long getMobFileCacheCount() { + return 100; + } + + @Override + public int getMobFileCacheHitPercent() { + return 50; + } - - } + }
