Repository: hbase Updated Branches: refs/heads/hbase-11339 a7f8035ec -> a0d9e18cc
HBASE-12030 Wrong compaction report and assert when MOB compaction switches to minor. Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a0d9e18c Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a0d9e18c Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a0d9e18c Branch: refs/heads/hbase-11339 Commit: a0d9e18ccf3842395a410a6c966f108ab2f55473 Parents: a7f8035 Author: anoopsjohn <[email protected]> Authored: Thu Sep 25 09:47:57 2014 +0530 Committer: anoopsjohn <[email protected]> Committed: Thu Sep 25 09:47:57 2014 +0530 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/HMobStore.java | 44 ++++++------ .../compactions/CompactionRequest.java | 19 ++++++ .../compactions/DefaultCompactor.java | 4 +- .../hbase/regionserver/TestMobCompaction.java | 72 ++++++++++++++++++-- 4 files changed, 108 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a0d9e18c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 2cef3df..ce9b81c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -322,19 +322,19 @@ public class HMobStore extends HStore { * In order to avoid this, we need mutually exclude the running of the major compaction and * sweeping in mob files. * The minor compaction is not affected. - * The major compaction is converted to a minor one when a sweeping is in progress. + * The major compaction is marked as retainDeleteMarkers when a sweeping is in progress. */ @Override public List<StoreFile> compact(CompactionContext compaction) throws IOException { // If it's major compaction, try to find whether there's a sweeper is running - // If yes, change the major compaction to a minor one. - if (compaction.getRequest().isMajor()) { + // If yes, mark the major compaction as retainDeleteMarkers + if (compaction.getRequest().isAllFiles()) { // Use the Zookeeper to coordinate. // 1. Acquire a operation lock. - // 1.1. If no, convert the major compaction to a minor one and continue the compaction. + // 1.1. If no, mark the major compaction as retainDeleteMarkers and continue the compaction. // 1.2. If the lock is obtained, search the node of sweeping. - // 1.2.1. If the node is there, the sweeping is in progress, convert the major - // compaction to a minor one and continue the compaction. + // 1.2.1. If the node is there, the sweeping is in progress, mark the major + // compaction as retainDeleteMarkers and continue the compaction. // 1.2.2. If the node is not there, add a child to the major compaction node, and // run the compaction directly. String compactionName = UUID.randomUUID().toString().replaceAll("-", ""); @@ -342,26 +342,27 @@ public class HMobStore extends HStore { try { zk = MobZookeeper.newInstance(region.getBaseConf(), compactionName); } catch (KeeperException e) { - LOG.error("Cannot connect to the zookeeper, ready to perform the minor compaction instead", - e); - // change the major compaction into a minor one - compaction.getRequest().setIsMajor(false, false); + LOG.error("Cannot connect to the zookeeper, forcing the delete markers to be retained", e); + compaction.getRequest().forceRetainDeleteMarkers(); return super.compact(compaction); } - boolean major = false; + boolean keepDeleteMarkers = true; + boolean majorCompactNodeAdded = false; try { // try to acquire the operation lock. if (zk.lockColumnFamily(getTableName().getNameAsString(), getFamily().getNameAsString())) { try { LOG.info("Obtain the lock for the store[" + this - + "], ready to perform the major compaction"); + + "], forcing the delete markers to be retained"); // check the sweeping node to find out whether the sweeping is in progress. boolean hasSweeper = zk.isSweeperZNodeExist(getTableName().getNameAsString(), getFamily().getNameAsString()); if (!hasSweeper) { // if not, add a child to the major compaction node of this store. - major = zk.addMajorCompactionZNode(getTableName().getNameAsString(), getFamily() - .getNameAsString(), compactionName); + majorCompactNodeAdded = zk.addMajorCompactionZNode(getTableName().getNameAsString(), + getFamily().getNameAsString(), compactionName); + // If we failed to add the major compact node, go with keep delete markers mode. + keepDeleteMarkers = !majorCompactNodeAdded; } } catch (Exception e) { LOG.error("Fail to handle the Zookeeper", e); @@ -371,17 +372,14 @@ public class HMobStore extends HStore { } } try { - if (major) { - return super.compact(compaction); - } else { - LOG.warn("Cannot obtain the lock or a sweep tool is running on this store[" - + this + "], ready to perform the minor compaction instead"); - // change the major compaction into a minor one - compaction.getRequest().setIsMajor(false, false); - return super.compact(compaction); + if (keepDeleteMarkers) { + LOG.warn("Cannot obtain the lock or a sweep tool is running on this store[" + this + + "], forcing the delete markers to be retained"); + compaction.getRequest().forceRetainDeleteMarkers(); } + return super.compact(compaction); } finally { - if (major) { + if (majorCompactNodeAdded) { try { zk.deleteMajorCompactionZNode(getTableName().getNameAsString(), getFamily() .getNameAsString(), compactionName); http://git-wip-us.apache.org/repos/asf/hbase/blob/a0d9e18c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java index 0fc64d2..76085fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -59,6 +59,8 @@ public class CompactionRequest implements Comparable<CompactionRequest> { private String storeName = ""; private long totalSize = -1L; + private Boolean retainDeleteMarkers = null; + /** * This ctor should be used by coprocessors that want to subclass CompactionRequest. */ @@ -200,6 +202,23 @@ public class CompactionRequest implements Comparable<CompactionRequest> { : (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES); } + /** + * Forcefully setting that this compaction has to retain the delete markers in the new compacted + * file, whatever be the type of the compaction.<br> + * Note : By default HBase drops delete markers when the compaction is on all files. + */ + public void forceRetainDeleteMarkers() { + this.retainDeleteMarkers = Boolean.TRUE; + } + + /** + * @return Whether the compaction has to retain the delete markers or not. + */ + public boolean isRetainDeleteMarkers() { + return (this.retainDeleteMarkers != null) ? this.retainDeleteMarkers.booleanValue() + : isAllFiles(); + } + @Override public String toString() { String fsList = Joiner.on(", ").join( http://git-wip-us.apache.org/repos/asf/hbase/blob/a0d9e18c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 8056dd0..be859c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -59,8 +59,8 @@ public class DefaultCompactor extends Compactor { InternalScanner scanner = null; try { /* Include deletes, unless we are doing a compaction of all files */ - ScanType scanType = - request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES; + ScanType scanType = request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES + : ScanType.COMPACT_DROP_DELETES; scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners); if (scanner == null) { scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs); http://git-wip-us.apache.org/repos/asf/hbase/blob/a0d9e18c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java index 51f4de3..3293212 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java @@ -37,13 +37,16 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; @@ -53,7 +56,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.mob.MobZookeeper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; @@ -99,9 +101,11 @@ public class TestMobCompaction { UTIL.shutdownMiniCluster(); } - private void init(long mobThreshold) throws Exception { + private void init(Configuration conf, long mobThreshold) throws Exception { + this.conf = conf; this.mobCellThreshold = mobThreshold; - conf = UTIL.getConfiguration(); + HBaseTestingUtility UTIL = new HBaseTestingUtility(conf); + compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); htd = UTIL.createTableDescriptor(name.getMethodName()); hcd = new HColumnDescriptor(COLUMN_FAMILY); @@ -125,7 +129,7 @@ public class TestMobCompaction { */ @Test public void testSmallerValue() throws Exception { - init(500); + init(UTIL.getConfiguration(), 500); byte[] dummyData = makeDummyData(300); // smaller than mob threshold HRegionIncommon loader = new HRegionIncommon(region); // one hfile per row @@ -153,7 +157,7 @@ public class TestMobCompaction { */ @Test public void testLargerValue() throws Exception { - init(200); + init(UTIL.getConfiguration(), 200); byte[] dummyData = makeDummyData(300); // larger than mob threshold HRegionIncommon loader = new HRegionIncommon(region); for (int i = 0; i < compactionThreshold; i++) { @@ -185,7 +189,7 @@ public class TestMobCompaction { @Test public void testMobCompactionWithBulkload() throws Exception { // The following will produce store files of 600. - init(300); + init(UTIL.getConfiguration(), 300); byte[] dummyData = makeDummyData(600); Path hbaseRootDir = FSUtils.getRootDir(conf); @@ -216,6 +220,62 @@ public class TestMobCompaction { assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles()); } + /** + * Tests the major compaction when the zk is not connected. + * After that the major compaction will be marked as retainDeleteMarkers, the delete marks + * will be retained. + * @throws Exception + */ + @Test + public void testMajorCompactionWithZKError() throws Exception { + Configuration conf = new Configuration(UTIL.getConfiguration()); + // use the wrong zk settings + conf.setInt("zookeeper.recovery.retry", 0); + conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 100); + conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, + conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181) - 1); + init(conf, 200); + byte[] dummyData = makeDummyData(300); // larger than mob threshold + HRegionIncommon loader = new HRegionIncommon(region); + byte[] deleteRow = Bytes.toBytes(0); + for (int i = 0; i < compactionThreshold - 1 ; i++) { + Put p = new Put(Bytes.toBytes(i)); + p.setDurability(Durability.SKIP_WAL); + p.add(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData); + loader.put(p); + loader.flushcache(); + } + Delete delete = new Delete(deleteRow); + delete.deleteFamily(COLUMN_FAMILY); + region.delete(delete); + loader.flushcache(); + + assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles()); + region.compactStores(true); + assertEquals("After compaction: store files", 1, countStoreFiles()); + + Scan scan = new Scan(); + scan.setRaw(true); + InternalScanner scanner = region.getScanner(scan); + List<Cell> results = new ArrayList<Cell>(); + scanner.next(results); + int deleteCount = 0; + while (!results.isEmpty()) { + for (Cell c : results) { + if (c.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) { + deleteCount++; + assertTrue(Bytes.equals(CellUtil.cloneRow(c), deleteRow)); + } + } + results.clear(); + scanner.next(results); + } + // assert the delete mark is retained, the major compaction is marked as + // retainDeleteMarkers. + assertEquals(1, deleteCount); + scanner.close(); + } + private int countStoreFiles() throws IOException { Store store = region.getStore(COLUMN_FAMILY); return store.getStorefilesCount();
