Repository: hbase Updated Branches: refs/heads/0.98 d50aecec5 -> 09becf097
http://git-wip-us.apache.org/repos/asf/hbase/blob/09becf09/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 62535e0..5254b77 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -39,14 +39,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.RegionTransition; @@ -74,9 +73,10 @@ import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.RegionState; -import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; http://git-wip-us.apache.org/repos/asf/hbase/blob/09becf09/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 101402d..0a21569 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.io.compress.Compression; @@ -63,9 +62,11 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; @@ -358,7 +359,7 @@ public class TestStore { Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); // after compact; check the lowest time stamp - store.compact(store.requestCompaction()); + store.compact(store.requestCompaction(), NoLimitCompactionThroughputController.INSTANCE); lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); http://git-wip-us.apache.org/repos/asf/hbase/blob/09becf09/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java index 2a918e7..5471ee5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -120,7 +121,8 @@ public class TestStripeCompactor { StoreFileWritersCapture writers = new StoreFileWritersCapture(); StripeCompactor sc = createCompactor(writers, input); List<Path> paths = - sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo); + sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo, + NoLimitCompactionThroughputController.INSTANCE); writers.verifyKvs(output, allFiles, true); if (allFiles) { assertEquals(output.length, paths.size()); @@ -155,8 +157,9 @@ public class TestStripeCompactor { byte[] left, byte[] right, KeyValue[][] output) throws Exception { StoreFileWritersCapture writers = new StoreFileWritersCapture(); StripeCompactor sc = createCompactor(writers, input); - List<Path> paths = sc.compact( - createDummyRequest(), targetCount, targetSize, left, right, null, null); + List<Path> paths = + sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null, + NoLimitCompactionThroughputController.INSTANCE); assertEquals(output.length, paths.size()); writers.verifyKvs(output, true, true); List<byte[]> boundaries = new ArrayList<byte[]>(); http://git-wip-us.apache.org/repos/asf/hbase/blob/09becf09/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java index 2c7637b..4e002de 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java @@ -17,8 +17,16 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Arrays; @@ -29,6 +37,8 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -62,9 +72,10 @@ public class TestStripeStoreEngine { TestStoreEngine se = createEngine(conf); StripeCompactor mockCompactor = mock(StripeCompactor.class); se.setCompactorOverride(mockCompactor); - when(mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), - any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class))) - .thenReturn(new ArrayList<Path>()); + when( + mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class), + any(byte[].class), any(byte[].class), any(byte[].class), + any(CompactionThroughputController.class))).thenReturn(new ArrayList<Path>()); // Produce 3 L0 files. StoreFile sf = createFile(); @@ -82,9 +93,10 @@ public class TestStripeStoreEngine { assertEquals(2, compaction.getRequest().getFiles().size()); assertFalse(compaction.getRequest().getFiles().contains(sf)); // Make sure the correct method it called on compactor. - compaction.compact(); + compaction.compact(NoLimitCompactionThroughputController.INSTANCE); verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L, - StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null); + StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null, + NoLimitCompactionThroughputController.INSTANCE); } private static StoreFile createFile() throws Exception { http://git-wip-us.apache.org/repos/asf/hbase/blob/09becf09/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java new file mode 100644 index 0000000..586f363 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreEngine; +import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; +import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class }) +public class TestCompactionWithThroughputController { + + private static final Log LOG = LogFactory.getLog(TestCompactionWithThroughputController.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final double EPSILON = 1E-6; + + private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); + + private final byte[] family = Bytes.toBytes("f"); + + private final byte[] qualifier = Bytes.toBytes("q"); + + private Store getStoreWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (HRegion region : hrs.getOnlineRegions(tableName)) { + return region.getStores().values().iterator().next(); + } + } + return null; + } + + private Store prepareData() throws IOException, InterruptedException { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + HTable table = TEST_UTIL.createTable(tableName, family); + Random rand = new Random(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[128 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).add(family, qualifier, value)); + } + admin.flush(tableName.getName()); + } + return getStoreWithName(tableName); + } + + private long testCompactionWithThroughputLimit() throws Exception { + long throughputLimit = 1024L * 1024; + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); + conf.setInt(CompactionConfiguration.MIN_KEY, 100); + conf.setInt(CompactionConfiguration.MAX_KEY, 200); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + conf.setLong( + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, + throughputLimit); + conf.setLong( + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, + throughputLimit); + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + PressureAwareCompactionThroughputController.class.getName()); + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(); + assertEquals(10, store.getStorefilesCount()); + long startTime = System.currentTimeMillis(); + TEST_UTIL.getHBaseAdmin().majorCompact(tableName.getName()); + Thread.sleep(5000); + assertEquals(10, store.getStorefilesCount()); + while (store.getStorefilesCount() != 1) { + Thread.sleep(20); + } + long duration = System.currentTimeMillis() - startTime; + double throughput = (double) store.getStorefilesSize() / duration * 1000; + // confirm that the speed limit work properly(not too fast, and also not too slow) + // 20% is the max acceptable error rate. + assertTrue(throughput < throughputLimit * 1.2); + assertTrue(throughput > throughputLimit * 0.8); + return System.currentTimeMillis() - startTime; + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + + private long testCompactionWithoutThroughputLimit() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); + conf.setInt(CompactionConfiguration.MIN_KEY, 100); + conf.setInt(CompactionConfiguration.MAX_KEY, 200); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + NoLimitCompactionThroughputController.class.getName()); + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(); + assertEquals(10, store.getStorefilesCount()); + long startTime = System.currentTimeMillis(); + TEST_UTIL.getHBaseAdmin().majorCompact(tableName.getName()); + while (store.getStorefilesCount() != 1) { + Thread.sleep(20); + } + return System.currentTimeMillis() - startTime; + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + + @Test + public void testCompaction() throws Exception { + long limitTime = testCompactionWithThroughputLimit(); + long noLimitTime = testCompactionWithoutThroughputLimit(); + LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use " + + noLimitTime + "ms"); + // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this + // is a very weak assumption. + assertTrue(limitTime > noLimitTime * 2); + } + + /** + * Test the tuning task of {@link PressureAwareCompactionThroughputController} + */ + @Test + public void testThroughputTuning() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); + conf.setLong( + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, + 20L * 1024 * 1024); + conf.setLong( + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, + 10L * 1024 * 1024); + conf.setInt(CompactionConfiguration.MIN_KEY, 4); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6); + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + PressureAwareCompactionThroughputController.class.getName()); + conf.setInt( + PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, + 1000); + TEST_UTIL.startMiniCluster(1); + HConnection conn = HConnectionManager.createConnection(conf); + try { + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + htd.setCompactionEnabled(false); + TEST_UTIL.getHBaseAdmin().createTable(htd); + TEST_UTIL.waitTableAvailable(tableName.getName()); + HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); + PressureAwareCompactionThroughputController throughputController = + (PressureAwareCompactionThroughputController) regionServer.compactSplitThread + .getCompactionThroughputController(); + assertEquals(10L * 1024 * 1024, throughputController.maxThroughput, EPSILON); + HTableInterface table = conn.getTable(tableName); + for (int i = 0; i < 5; i++) { + table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0])); + TEST_UTIL.flush(tableName); + } + Thread.sleep(2000); + assertEquals(15L * 1024 * 1024, throughputController.maxThroughput, EPSILON); + + table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0])); + TEST_UTIL.flush(tableName); + Thread.sleep(2000); + assertEquals(20L * 1024 * 1024, throughputController.maxThroughput, EPSILON); + + table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0])); + TEST_UTIL.flush(tableName); + Thread.sleep(2000); + assertEquals(Double.MAX_VALUE, throughputController.maxThroughput, EPSILON); + } finally { + conn.close(); + TEST_UTIL.shutdownMiniCluster(); + } + } + + /** + * Test the logic that we calculate compaction pressure for a striped store. + */ + @Test + public void testGetCompactionPressureForStripedStore() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); + conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false); + conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2); + conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12); + TEST_UTIL.startMiniCluster(1); + HConnection conn = HConnectionManager.createConnection(conf); + try { + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + htd.setCompactionEnabled(false); + TEST_UTIL.getHBaseAdmin().createTable(htd); + TEST_UTIL.waitTableAvailable(tableName.getName()); + HStore store = (HStore) getStoreWithName(tableName); + assertEquals(0, store.getStorefilesCount()); + assertEquals(0.0, store.getCompactionPressure(), EPSILON); + HTableInterface table = conn.getTable(tableName); + for (int i = 0; i < 4; i++) { + table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0])); + table.put(new Put(Bytes.toBytes(100 + i)).add(family, qualifier, new byte[0])); + TEST_UTIL.flush(tableName); + } + assertEquals(8, store.getStorefilesCount()); + assertEquals(0.0, store.getCompactionPressure(), EPSILON); + + table.put(new Put(Bytes.toBytes(4)).add(family, qualifier, new byte[0])); + table.put(new Put(Bytes.toBytes(104)).add(family, qualifier, new byte[0])); + TEST_UTIL.flush(tableName); + assertEquals(10, store.getStorefilesCount()); + assertEquals(0.5, store.getCompactionPressure(), EPSILON); + + table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0])); + table.put(new Put(Bytes.toBytes(105)).add(family, qualifier, new byte[0])); + TEST_UTIL.flush(tableName); + assertEquals(12, store.getStorefilesCount()); + assertEquals(1.0, store.getCompactionPressure(), EPSILON); + + table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0])); + table.put(new Put(Bytes.toBytes(106)).add(family, qualifier, new byte[0])); + TEST_UTIL.flush(tableName); + assertEquals(14, store.getStorefilesCount()); + assertEquals(2.0, store.getCompactionPressure(), EPSILON); + } finally { + conn.close(); + TEST_UTIL.shutdownMiniCluster(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/09becf09/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index 269f68a..2b51502 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -47,6 +47,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; @@ -208,9 +209,10 @@ public class TestStripeCompactionPolicy { assertTrue(policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); assertEquals(si.getStorefiles(), scr.getRequest().getFiles()); - scr.execute(sc); - verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), - aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY)); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); + verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), + aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), + any(NoLimitCompactionThroughputController.class)); } @Test @@ -442,7 +444,7 @@ public class TestStripeCompactionPolicy { // All the Stripes are expired, so the Compactor will not create any Writers. We need to create // an empty file to preserve metadata StripeCompactor sc = createCompactor(); - List<Path> paths = scr.execute(sc); + List<Path> paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); assertEquals(1, paths.size()); } @@ -501,22 +503,21 @@ public class TestStripeCompactionPolicy { assertTrue(policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); - scr.execute(sc); - verify(sc, times(1)).compact(eq(scr.getRequest()), argThat( - new ArgumentMatcher<List<byte[]>>() { - @Override - public boolean matches(Object argument) { - @SuppressWarnings("unchecked") - List<byte[]> other = (List<byte[]>)argument; - if (other.size() != boundaries.size()) return false; - for (int i = 0; i < other.size(); ++i) { - if (!Bytes.equals(other.get(i), boundaries.get(i))) return false; - } - return true; - } - }), - dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom), - dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo)); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); + verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() { + @Override + public boolean matches(Object argument) { + @SuppressWarnings("unchecked") + List<byte[]> other = (List<byte[]>) argument; + if (other.size() != boundaries.size()) return false; + for (int i = 0; i < other.size(); ++i) { + if (!Bytes.equals(other.get(i), boundaries.get(i))) return false; + } + return true; + } + }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom), + dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo), + any(NoLimitCompactionThroughputController.class)); } /** @@ -537,11 +538,12 @@ public class TestStripeCompactionPolicy { assertTrue(!needsCompaction || policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); - scr.execute(sc); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); verify(sc, times(1)).compact(eq(scr.getRequest()), - count == null ? anyInt() : eq(count.intValue()), - size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end), - dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end)); + count == null ? anyInt() : eq(count.intValue()), + size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end), + dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end), + any(NoLimitCompactionThroughputController.class)); } /** Verify arbitrary flush. */ @@ -727,7 +729,10 @@ public class TestStripeCompactionPolicy { HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo")); StoreFileWritersCapture writers = new StoreFileWritersCapture(); Store store = mock(Store.class); + HRegionInfo info = mock(HRegionInfo.class); + when(info.getRegionNameAsString()).thenReturn("testRegion"); when(store.getFamily()).thenReturn(col); + when(store.getRegionInfo()).thenReturn(info); when( store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
