HBASE-16301 Trigger flush without waiting when compaction is disabled on a
table (huaxiang sun)
Conflicts:
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/12549de4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/12549de4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/12549de4
Branch: refs/heads/branch-1
Commit: 12549de43c9185772e7b6000a184226f4f33caca
Parents: b06119e
Author: Enis Soztutar <[email protected]>
Authored: Tue Aug 2 18:46:07 2016 -0700
Committer: Enis Soztutar <[email protected]>
Committed: Wed Aug 3 10:55:08 2016 -0700
----------------------------------------------------------------------
.../hbase/regionserver/MemStoreFlusher.java | 6 ++
.../regionserver/TestCompactSplitThread.java | 84 ++++++++++++++++++--
2 files changed, 85 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/12549de4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 941543c..6c2dfb2 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -547,6 +547,12 @@ class MemStoreFlusher implements FlushRequester {
}
private boolean isTooManyStoreFiles(Region region) {
+
+ // When compaction is disabled, the region is flushable
+ if (!region.getTableDesc().isCompactionEnabled()) {
+ return false;
+ }
+
for (Store store : region.getStores()) {
if (store.hasTooManyStoreFiles()) {
return true;
http://git-wip-us.apache.org/repos/asf/hbase/blob/12549de4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
index f57ade9..c845999 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
@@ -17,17 +17,26 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import
org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import
org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
+
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
@@ -39,15 +48,61 @@ public class TestCompactSplitThread {
private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
private final TableName tableName =
TableName.valueOf(getClass().getSimpleName());
private final byte[] family = Bytes.toBytes("f");
+ private static final int NUM_RS = 1;
+ private static final int blockingStoreFiles = 3;
+ private static Path rootDir;
+ private static FileSystem fs;
- @Test
- public void testThreadPoolSizeTuning() throws Exception {
- Configuration conf = TEST_UTIL.getConfiguration();
+
+
+ /**
+ * Setup the config for the cluster
+ */
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ setupConf(TEST_UTIL.getConfiguration());
+ TEST_UTIL.startMiniCluster(NUM_RS);
+ fs = TEST_UTIL.getDFSCluster().getFileSystem();
+ rootDir =
TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+
+ }
+
+ private static void setupConf(Configuration conf) {
+ // disable the ui
+ conf.setInt("hbase.regionsever.info.port", -1);
+ // so make sure we get a compaction when doing a load, but keep around some
+ // files in the store
+ conf.setInt("hbase.hstore.compaction.min", 2);
+ conf.setInt("hbase.hstore.compactionThreshold", 5);
+ // change the flush size to a small amount, regulating number of store
files
+ conf.setInt("hbase.hregion.memstore.flush.size", 25000);
+
+ // block writes if we get to blockingStoreFiles store files
+ conf.setInt("hbase.hstore.blockingStoreFiles", blockingStoreFiles);
+ // Ensure no extra cleaners on by default (e.g. TimeToLiveHFileCleaner)
conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 3);
conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 4);
conf.setInt(CompactSplitThread.SPLIT_THREADS, 5);
conf.setInt(CompactSplitThread.MERGE_THREADS, 6);
- TEST_UTIL.startMiniCluster(1);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TEST_UTIL.deleteTable(tableName);
+ }
+
+ @AfterClass
+ public static void cleanupTest() throws Exception {
+ try {
+ TEST_UTIL.shutdownMiniCluster();
+ } catch (Exception e) {
+ // NOOP;
+ }
+ }
+
+ @Test
+ public void testThreadPoolSizeTuning() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
Connection conn = ConnectionFactory.createConnection(conf);
try {
HTableDescriptor htd = new HTableDescriptor(tableName);
@@ -98,7 +153,26 @@ public class TestCompactSplitThread {
assertEquals(5, regionServer.compactSplitThread.getMergeThreadNum());
} finally {
conn.close();
- TEST_UTIL.shutdownMiniCluster();
}
}
+
+ @Test(timeout = 60000)
+ public void testFlushWithTableCompactionDisabled() throws Exception {
+ Admin admin = TEST_UTIL.getHBaseAdmin();
+
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.setCompactionEnabled(false);
+ TEST_UTIL.createTable(htd, new byte[][] { family }, null);
+
+ // load the table
+ for (int i = 0; i < blockingStoreFiles + 1; i ++) {
+ TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(tableName),
family);
+ TEST_UTIL.flush(tableName);
+ }
+
+ // Make sure that store file number is greater than blockingStoreFiles + 1
+ Path tableDir = FSUtils.getTableDir(rootDir, tableName);
+ Collection<String> hfiles = SnapshotTestingUtils.listHFileNames(fs,
tableDir);
+ assert(hfiles.size() > blockingStoreFiles + 1);
+ }
}