http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java ---------------------------------------------------------------------- diff --cc hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index 40c5651,5335bef..13f8163 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@@ -262,12 -252,10 +258,12 @@@ public class HColumnDescriptor implemen DEFAULT_VALUES.put(EVICT_BLOCKS_ON_CLOSE, String.valueOf(DEFAULT_EVICT_BLOCKS_ON_CLOSE)); DEFAULT_VALUES.put(PREFETCH_BLOCKS_ON_OPEN, String.valueOf(DEFAULT_PREFETCH_BLOCKS_ON_OPEN)); for (String s : DEFAULT_VALUES.keySet()) { - RESERVED_KEYWORDS.add(new ImmutableBytesWritable(Bytes.toBytes(s))); + RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(s))); } - RESERVED_KEYWORDS.add(new ImmutableBytesWritable(Bytes.toBytes(ENCRYPTION))); - RESERVED_KEYWORDS.add(new ImmutableBytesWritable(Bytes.toBytes(ENCRYPTION_KEY))); - RESERVED_KEYWORDS.add(new ImmutableBytesWritable(IS_MOB_BYTES)); - RESERVED_KEYWORDS.add(new ImmutableBytesWritable(MOB_THRESHOLD_BYTES)); - RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION))); - RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION_KEY))); ++ RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION))); ++ RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION_KEY))); ++ RESERVED_KEYWORDS.add(new Bytes(IS_MOB_BYTES)); ++ RESERVED_KEYWORDS.add(new Bytes(MOB_THRESHOLD_BYTES)); } private static final int UNINITIALIZED = -1;
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java ---------------------------------------------------------------------- diff --cc hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java index 0000000,976876cf..1180954 mode 000000,100644..100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java @@@ -1,0 -1,120 +1,138 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase; + + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertTrue; + + import org.apache.hadoop.hbase.exceptions.DeserializationException; + import org.apache.hadoop.hbase.io.compress.Compression; + import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; + import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; + import org.apache.hadoop.hbase.regionserver.BloomType; ++import org.apache.hadoop.hbase.util.Bytes; ++import org.apache.hadoop.hbase.util.PrettyPrinter; + import org.apache.hadoop.hbase.testclassification.MiscTests; + import org.apache.hadoop.hbase.testclassification.SmallTests; + import org.apache.hadoop.hbase.util.BuilderStyleTest; + import org.junit.Test; + import org.junit.experimental.categories.Category; + + /** Tests the HColumnDescriptor with appropriate arguments */ + @Category({MiscTests.class, SmallTests.class}) + public class TestHColumnDescriptor { + @Test + public void testPb() throws DeserializationException { + HColumnDescriptor hcd = new HColumnDescriptor( + new HColumnDescriptor(HConstants.CATALOG_FAMILY) + .setInMemory(true) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL) + .setBloomFilterType(BloomType.NONE) + .setCacheDataInL1(true)); + final int v = 123; + hcd.setBlocksize(v); + hcd.setTimeToLive(v); + hcd.setBlockCacheEnabled(!HColumnDescriptor.DEFAULT_BLOCKCACHE); + hcd.setValue("a", "b"); + hcd.setMaxVersions(v); + assertEquals(v, hcd.getMaxVersions()); + hcd.setMinVersions(v); + assertEquals(v, hcd.getMinVersions()); + hcd.setKeepDeletedCells(KeepDeletedCells.TRUE); + hcd.setInMemory(!HColumnDescriptor.DEFAULT_IN_MEMORY); + boolean inmemory = hcd.isInMemory(); + hcd.setScope(v); + hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); + hcd.setBloomFilterType(BloomType.ROW); + hcd.setCompressionType(Algorithm.SNAPPY); ++ hcd.setMobEnabled(true); ++ hcd.setMobThreshold(1000L); + + + byte [] bytes = hcd.toByteArray(); + HColumnDescriptor deserializedHcd = HColumnDescriptor.parseFrom(bytes); + assertTrue(hcd.equals(deserializedHcd)); + assertEquals(v, hcd.getBlocksize()); + assertEquals(v, hcd.getTimeToLive()); + assertEquals(hcd.getValue("a"), deserializedHcd.getValue("a")); + assertEquals(hcd.getMaxVersions(), deserializedHcd.getMaxVersions()); + assertEquals(hcd.getMinVersions(), deserializedHcd.getMinVersions()); + assertEquals(hcd.getKeepDeletedCells(), deserializedHcd.getKeepDeletedCells()); + assertEquals(inmemory, deserializedHcd.isInMemory()); + assertEquals(hcd.getScope(), deserializedHcd.getScope()); + assertTrue(deserializedHcd.getCompressionType().equals(Compression.Algorithm.SNAPPY)); + assertTrue(deserializedHcd.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF)); + assertTrue(deserializedHcd.getBloomFilterType().equals(BloomType.ROW)); ++ assertEquals(hcd.isMobEnabled(), deserializedHcd.isMobEnabled()); ++ assertEquals(hcd.getMobThreshold(), deserializedHcd.getMobThreshold()); + } + + @Test + /** Tests HColumnDescriptor with empty familyName*/ + public void testHColumnDescriptorShouldThrowIAEWhenFamiliyNameEmpty() + throws Exception { + try { + new HColumnDescriptor("".getBytes()); + } catch (IllegalArgumentException e) { + assertEquals("Family name can not be empty", e.getLocalizedMessage()); + } + } + + /** + * Test that we add and remove strings from configuration properly. + */ + @Test + public void testAddGetRemoveConfiguration() throws Exception { + HColumnDescriptor desc = new HColumnDescriptor("foo"); + String key = "Some"; + String value = "value"; + desc.setConfiguration(key, value); + assertEquals(value, desc.getConfigurationValue(key)); + desc.removeConfiguration(key); + assertEquals(null, desc.getConfigurationValue(key)); + } + + @Test ++ public void testMobValuesInHColumnDescriptorShouldReadable() { ++ boolean isMob = true; ++ long threshold = 1000; ++ String isMobString = PrettyPrinter.format(Bytes.toStringBinary(Bytes.toBytes(isMob)), ++ HColumnDescriptor.getUnit(HColumnDescriptor.IS_MOB)); ++ String thresholdString = PrettyPrinter.format(Bytes.toStringBinary(Bytes.toBytes(threshold)), ++ HColumnDescriptor.getUnit(HColumnDescriptor.MOB_THRESHOLD)); ++ assertEquals(String.valueOf(isMob), isMobString); ++ assertEquals(String.valueOf(threshold), thresholdString); ++ } ++ ++ @Test + public void testClassMethodsAreBuilderStyle() { + /* HColumnDescriptor should have a builder style setup where setXXX/addXXX methods + * can be chainable together: + * . For example: + * HColumnDescriptor hcd + * = new HColumnDescriptor() + * .setFoo(foo) + * .setBar(bar) + * .setBuz(buz) + * + * This test ensures that all methods starting with "set" returns the declaring object + */ + + BuilderStyleTest.assertClassesAreBuilderStyle(HColumnDescriptor.class); + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java ---------------------------------------------------------------------- diff --cc hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java index 8613a35,2095b7a..7e200df --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java @@@ -26,10 -26,9 +26,14 @@@ public final class TagType // Please declare new Tag Types here to avoid step on pre-existing tag types. public static final byte ACL_TAG_TYPE = (byte) 1; public static final byte VISIBILITY_TAG_TYPE = (byte) 2; - public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3; + // public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3; // deprecated public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4; + + // mob tags + public static final byte MOB_REFERENCE_TAG_TYPE = (byte) 5; + public static final byte MOB_TABLE_NAME_TAG_TYPE = (byte) 6; ++ + // String based tag type used in replication + public static final byte STRING_VIS_TAG_TYPE = (byte) 7; + public static final byte TTL_TAG_TYPE = (byte)8; } http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-common/src/main/java/org/apache/hadoop/hbase/util/PrettyPrinter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-common/src/main/resources/hbase-default.xml ---------------------------------------------------------------------- diff --cc hbase-common/src/main/resources/hbase-default.xml index d1429ad,2985685..4ba6d69 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@@ -1454,118 -1462,12 +1462,126 @@@ possible configurations would overwhel <name>hbase.http.staticuser.user</name> <value>dr.stack</value> </property> + <!-- Mob properties. --> + <property> + <name>hbase.mob.file.cache.size</name> + <value>1000</value> + <description> + Number of opened file handlers to cache. + A larger value will benefit reads by providing more file handlers per mob + file cache and would reduce frequent file opening and closing. + However, if this is set too high, this could lead to a "too many opened file handlers" + The default value is 1000. + </description> + </property> + <property> + <name>hbase.mob.cache.evict.period</name> + <value>3600</value> + <description> + The amount of time in seconds before the mob cache evicts cached mob files. + The default value is 3600 seconds. + </description> + </property> + <property> + <name>hbase.mob.cache.evict.remain.ratio</name> + <value>0.5f</value> + <description> + The ratio (between 0.0 and 1.0) of files that remains cached after an eviction + is triggered when the number of cached mob files exceeds the hbase.mob.file.cache.size. + The default value is 0.5f. + </description> + </property> + <property> + <name>hbase.mob.sweep.tool.compaction.ratio</name> + <value>0.5f</value> + <description> + If there're too many cells deleted in a mob file, it's regarded + as an invalid file and needs to be merged. + If existingCellsSize/mobFileSize is less than ratio, it's regarded + as an invalid file. The default value is 0.5f. + </description> + </property> + <property> + <name>hbase.mob.sweep.tool.compaction.mergeable.size</name> + <value>134217728</value> + <description> + If the size of a mob file is less than this value, it's regarded as a small + file and needs to be merged. The default value is 128MB. + </description> + </property> + <property> + <name>hbase.mob.sweep.tool.compaction.memstore.flush.size</name> + <value>134217728</value> + <description> + The flush size for the memstore used by sweep job. Each sweep reducer owns such a memstore. + The default value is 128MB. + </description> + </property> + <property> + <name>hbase.master.mob.ttl.cleaner.period</name> + <value>86400000</value> + <description> + The period that ExpiredMobFileCleanerChore runs. The unit is millisecond. + The default value is one day. + </description> + </property> + <property> + <name>hbase.mob.file.compaction.mergeable.threshold</name> + <value>201326592</value> + <description> + If the size of a mob file is less than this value, it's regarded as a small + file and needs to be merged in mob file compaction. The default value is 192MB. + </description> + </property> + <property> + <name>hbase.mob.delfile.max.count</name> + <value>3</value> + <description> + The max number of del files that is allowed in the mob file compaction. + In the mob file compaction, when the number of existing del files is larger than + this value, they are merged until number of del files is not larger this value. + The default value is 3. + </description> + </property> + <property> + <name>hbase.mob.file.compaction.batch.size</name> + <value>100</value> + <description> + The max number of the mob files that is allowed in a batch of the mob file compaction. + The mob file compaction merges the small mob files to bigger ones. If the number of the + small files is very large, it could lead to a "too many opened file handlers" in the merge. + And the merge has to be split into batches. This value limits the number of mob files + that are selected in a batch of the mob file compaction. The default value is 100. + </description> + </property> + <property> + <name>hbase.master.mob.file.compaction.chore.period</name> + <value>604800000</value> + <description> + The period that MobFileCompactionChore runs. The unit is millisecond. + The default value is one week. + </description> + </property> + <property> + <name>hbase.mob.file.compactor.class</name> + <value>org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor</value> + <description> + Implementation of mob file compactor, the default one is PartitionedMobFileCompactor. + </description> + </property> + <property> + <name>hbase.master.mob.file.compaction.chore.threads.max</name> + <value>1</value> + <description> + The max number of threads used in MobFileCompactionChore. + </description> + </property> + <property> + <name>hbase.regionserver.handler.abort.on.error.percent</name> + <value>0.5</value> + <description>The percent of region server RPC threads failed to abort RS. + -1 Disable aborting; 0 Abort if even a single handler has died; + 0.x Abort only when this percent of handlers have died; + 1 Abort only all of the handers have died.</description> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java ---------------------------------------------------------------------- diff --cc hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index ee2dcbb,b86a8eb..90a9a09 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@@ -233,37 -257,21 +257,53 @@@ public interface MetricsRegionServerSou String MAJOR_COMPACTED_CELLS_SIZE = "majorCompactedCellsSize"; String MAJOR_COMPACTED_CELLS_SIZE_DESC = "The total amount of data processed during major compactions, in bytes"; + String MOB_COMPACTED_INTO_MOB_CELLS_COUNT = "mobCompactedIntoMobCellsCount"; + String MOB_COMPACTED_INTO_MOB_CELLS_COUNT_DESC = + "The number of cells moved to mob during compaction"; + String MOB_COMPACTED_FROM_MOB_CELLS_COUNT = "mobCompactedFromMobCellsCount"; + String MOB_COMPACTED_FROM_MOB_CELLS_COUNT_DESC = + "The number of cells moved from mob during compaction"; + String MOB_COMPACTED_INTO_MOB_CELLS_SIZE = "mobCompactedIntoMobCellsSize"; + String MOB_COMPACTED_INTO_MOB_CELLS_SIZE_DESC = + "The total amount of cells move to mob during compaction, in bytes"; + String MOB_COMPACTED_FROM_MOB_CELLS_SIZE = "mobCompactedFromMobCellsSize"; + String MOB_COMPACTED_FROM_MOB_CELLS_SIZE_DESC = + "The total amount of cells move from mob during compaction, in bytes"; + String MOB_FLUSH_COUNT = "mobFlushCount"; + String MOB_FLUSH_COUNT_DESC = "The number of the flushes in mob-enabled stores"; + String MOB_FLUSHED_CELLS_COUNT = "mobFlushedCellsCount"; + String MOB_FLUSHED_CELLS_COUNT_DESC = "The number of mob cells flushed to disk"; + String MOB_FLUSHED_CELLS_SIZE = "mobFlushedCellsSize"; + String MOB_FLUSHED_CELLS_SIZE_DESC = "The total amount of mob cells flushed to disk, in bytes"; + String MOB_SCAN_CELLS_COUNT = "mobScanCellsCount"; + String MOB_SCAN_CELLS_COUNT_DESC = "The number of scanned mob cells"; + String MOB_SCAN_CELLS_SIZE = "mobScanCellsSize"; + String MOB_SCAN_CELLS_SIZE_DESC = "The total amount of scanned mob cells, in bytes"; + String MOB_FILE_CACHE_ACCESS_COUNT = "mobFileCacheAccessCount"; + String MOB_FILE_CACHE_ACCESS_COUNT_DESC = "The count of accesses to the mob file cache"; + String MOB_FILE_CACHE_MISS_COUNT = "mobFileCacheMissCount"; + String MOB_FILE_CACHE_MISS_COUNT_DESC = "The count of misses to the mob file cache"; + String MOB_FILE_CACHE_HIT_PERCENT = "mobFileCacheHitPercent"; + String MOB_FILE_CACHE_HIT_PERCENT_DESC = "The hit percent to the mob file cache"; + String MOB_FILE_CACHE_EVICTED_COUNT = "mobFileCacheEvictedCount"; + String MOB_FILE_CACHE_EVICTED_COUNT_DESC = "The number of items evicted from the mob file cache"; + String MOB_FILE_CACHE_COUNT = "mobFileCacheCount"; + String MOB_FILE_CACHE_COUNT_DESC = "The count of cached mob files"; + String HEDGED_READS = "hedgedReads"; + String HEDGED_READS_DESC = "The number of times we started a hedged read"; + String HEDGED_READ_WINS = "hedgedReadWins"; + String HEDGED_READ_WINS_DESC = + "The number of times we started a hedged read and a hedged read won"; + + String BLOCKED_REQUESTS_COUNT = "blockedRequestCount"; + String BLOCKED_REQUESTS_COUNT_DESC = "The number of blocked requests because of memstore size is " + + "larger than blockingMemStoreSize"; + + String SPLIT_KEY = "splitTime"; + String SPLIT_REQUEST_KEY = "splitRequestCount"; + String SPLIT_REQUEST_DESC = "Number of splits requested"; + String SPLIT_SUCCESS_KEY = "splitSuccessCounnt"; + String SPLIT_SUCCESS_DESC = "Number of successfully executed splits"; + String FLUSH_KEY = "flushTime"; } http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java ---------------------------------------------------------------------- diff --cc hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java index ea55fe8,9634be7..2aad115 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java @@@ -248,72 -253,17 +253,88 @@@ public interface MetricsRegionServerWra long getMajorCompactedCellsSize(); /** ++<<<<<<< HEAD + * Gets the number of cells move to mob during compaction. + */ + long getMobCompactedIntoMobCellsCount(); + + /** + * Gets the number of cells move from mob during compaction. + */ + long getMobCompactedFromMobCellsCount(); + + /** + * Gets the total amount of cells move to mob during compaction, in bytes. + */ + long getMobCompactedIntoMobCellsSize(); + + /** + * Gets the total amount of cells move from mob during compaction, in bytes. + */ + long getMobCompactedFromMobCellsSize(); + + /** + * Gets the number of the flushes in mob-enabled stores. + */ + long getMobFlushCount(); + + /** + * Gets the number of mob cells flushed to disk. + */ + long getMobFlushedCellsCount(); + + /** + * Gets the total amount of mob cells flushed to disk, in bytes. + */ + long getMobFlushedCellsSize(); + + /** + * Gets the number of scanned mob cells. + */ + long getMobScanCellsCount(); + + /** + * Gets the total amount of scanned mob cells, in bytes. + */ + long getMobScanCellsSize(); + + /** + * Gets the count of accesses to the mob file cache. + */ + long getMobFileCacheAccessCount(); + + /** + * Gets the count of misses to the mob file cache. + */ + long getMobFileCacheMissCount(); + + /** + * Gets the number of items evicted from the mob file cache. + */ + long getMobFileCacheEvictedCount(); + + /** + * Gets the count of cached mob files. + */ + long getMobFileCacheCount(); + + /** + * Gets the hit percent to the mob file cache. + */ + int getMobFileCacheHitPercent(); ++ ++ /** + * @return Count of hedged read operations + */ + public long getHedgedReadOps(); + + /** + * @return Count of times a hedged read beat out the primary read. + */ + public long getHedgedReadWins(); + + /** + * @return Count of requests blocked because the memstore size is larger than blockingMemStoreSize + */ + public long getBlockedRequestsCount(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java ---------------------------------------------------------------------- diff --cc hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index 27d5437,2c1dcd3..45e2699 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@@ -219,35 -249,14 +249,44 @@@ public class MetricsRegionServerSourceI rsWrap.getCompactedCellsSize()) .addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE, MAJOR_COMPACTED_CELLS_SIZE_DESC), rsWrap.getMajorCompactedCellsSize()) + + .addCounter(Interns.info(MOB_COMPACTED_FROM_MOB_CELLS_COUNT, MOB_COMPACTED_FROM_MOB_CELLS_COUNT_DESC), + rsWrap.getMobCompactedFromMobCellsCount()) + .addCounter(Interns.info(MOB_COMPACTED_INTO_MOB_CELLS_COUNT, MOB_COMPACTED_INTO_MOB_CELLS_COUNT_DESC), + rsWrap.getMobCompactedIntoMobCellsCount()) + .addCounter(Interns.info(MOB_COMPACTED_FROM_MOB_CELLS_SIZE, MOB_COMPACTED_FROM_MOB_CELLS_SIZE_DESC), + rsWrap.getMobCompactedFromMobCellsSize()) + .addCounter(Interns.info(MOB_COMPACTED_INTO_MOB_CELLS_SIZE, MOB_COMPACTED_INTO_MOB_CELLS_SIZE_DESC), + rsWrap.getMobCompactedIntoMobCellsSize()) + .addCounter(Interns.info(MOB_FLUSH_COUNT, MOB_FLUSH_COUNT_DESC), + rsWrap.getMobFlushCount()) + .addCounter(Interns.info(MOB_FLUSHED_CELLS_COUNT, MOB_FLUSHED_CELLS_COUNT_DESC), + rsWrap.getMobFlushedCellsCount()) + .addCounter(Interns.info(MOB_FLUSHED_CELLS_SIZE, MOB_FLUSHED_CELLS_SIZE_DESC), + rsWrap.getMobFlushedCellsSize()) + .addCounter(Interns.info(MOB_SCAN_CELLS_COUNT, MOB_SCAN_CELLS_COUNT_DESC), + rsWrap.getMobScanCellsCount()) + .addCounter(Interns.info(MOB_SCAN_CELLS_SIZE, MOB_SCAN_CELLS_SIZE_DESC), + rsWrap.getMobScanCellsSize()) + .addGauge(Interns.info(MOB_FILE_CACHE_COUNT, MOB_FILE_CACHE_COUNT_DESC), + rsWrap.getMobFileCacheCount()) + .addCounter(Interns.info(MOB_FILE_CACHE_ACCESS_COUNT, MOB_FILE_CACHE_ACCESS_COUNT_DESC), + rsWrap.getMobFileCacheAccessCount()) + .addCounter(Interns.info(MOB_FILE_CACHE_MISS_COUNT, MOB_FILE_CACHE_MISS_COUNT_DESC), + rsWrap.getMobFileCacheMissCount()) + .addCounter( + Interns.info(MOB_FILE_CACHE_EVICTED_COUNT, MOB_FILE_CACHE_EVICTED_COUNT_DESC), + rsWrap.getMobFileCacheEvictedCount()) + .addGauge(Interns.info(MOB_FILE_CACHE_HIT_PERCENT, MOB_FILE_CACHE_HIT_PERCENT_DESC), + rsWrap.getMobFileCacheHitPercent()) ++ + .addCounter(Interns.info(HEDGED_READS, HEDGED_READS_DESC), rsWrap.getHedgedReadOps()) + .addCounter(Interns.info(HEDGED_READ_WINS, HEDGED_READ_WINS_DESC), + rsWrap.getHedgedReadWins()) + + .addCounter(Interns.info(BLOCKED_REQUESTS_COUNT, BLOCKED_REQUESTS_COUNT_DESC), + rsWrap.getBlockedRequestsCount()) + .tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC), rsWrap.getZookeeperQuorum()) .tag(Interns.info(SERVER_NAME_NAME, SERVER_NAME_DESC), rsWrap.getServerName()) http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithMOB.java ---------------------------------------------------------------------- diff --cc hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithMOB.java index f8a4412,0000000..85c01cc mode 100644,000000..100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithMOB.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithMOB.java @@@ -1,154 -1,0 +1,155 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HBaseAdmin; ++import org.apache.hadoop.hbase.testclassification.IntegrationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.LoadTestDataGeneratorWithMOB; +import org.apache.hadoop.hbase.util.LoadTestTool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Integration Test for MOB ingest. + */ +@Category(IntegrationTests.class) +public class IntegrationTestIngestWithMOB extends IntegrationTestIngest { + private static final char COLON = ':'; + + private byte[] mobColumnFamily = LoadTestTool.COLUMN_FAMILY; + public static final String THRESHOLD = "threshold"; + public static final String MIN_MOB_DATA_SIZE = "minMobDataSize"; + public static final String MAX_MOB_DATA_SIZE = "maxMobDataSize"; + private int threshold = 1024; // 1KB + private int minMobDataSize = 512; // 512B + private int maxMobDataSize = threshold * 5; // 5KB + private static final long JUNIT_RUN_TIME = 2 * 60 * 1000; // 2 minutes + + //similar to LOAD_TEST_TOOL_INIT_ARGS except OPT_IN_MEMORY is removed + protected String[] LOAD_TEST_TOOL_MOB_INIT_ARGS = { + LoadTestTool.OPT_COMPRESSION, + LoadTestTool.OPT_DATA_BLOCK_ENCODING, + LoadTestTool.OPT_ENCRYPTION, + LoadTestTool.OPT_NUM_REGIONS_PER_SERVER, + LoadTestTool.OPT_REGION_REPLICATION, + }; + + @Override + protected String[] getArgsForLoadTestToolInitTable() { + List<String> args = new ArrayList<String>(); + args.add("-tn"); + args.add(getTablename().getNameAsString()); + // pass all remaining args from conf with keys <test class name>.<load test tool arg> + String clazz = this.getClass().getSimpleName(); + for (String arg : LOAD_TEST_TOOL_MOB_INIT_ARGS) { + String val = conf.get(String.format("%s.%s", clazz, arg)); + if (val != null) { + args.add("-" + arg); + args.add(val); + } + } + args.add("-init_only"); + return args.toArray(new String[args.size()]); + } + + @Override + protected void addOptions() { + super.addOptions(); + super.addOptWithArg(THRESHOLD, "The threshold to classify cells to mob data"); + super.addOptWithArg(MIN_MOB_DATA_SIZE, "Minimum value size for mob data"); + super.addOptWithArg(MAX_MOB_DATA_SIZE, "Maximum value size for mob data"); + } + + @Override + protected void processOptions(CommandLine cmd) { + super.processOptions(cmd); + if (cmd.hasOption(THRESHOLD)) { + threshold = Integer.parseInt(cmd.getOptionValue(THRESHOLD)); + } + if (cmd.hasOption(MIN_MOB_DATA_SIZE)) { + minMobDataSize = Integer.parseInt(cmd.getOptionValue(MIN_MOB_DATA_SIZE)); + } + if (cmd.hasOption(MAX_MOB_DATA_SIZE)) { + maxMobDataSize = Integer.parseInt(cmd.getOptionValue(MAX_MOB_DATA_SIZE)); + } + if (minMobDataSize > maxMobDataSize) { + throw new IllegalArgumentException( + "The minMobDataSize should not be larger than minMobDataSize"); + } + } + + @Test + public void testIngest() throws Exception { + runIngestTest(JUNIT_RUN_TIME, 100, 10, 1024, 10, 20); + }; + + @Override + protected void initTable() throws IOException { + super.initTable(); + + byte[] tableName = getTablename().getName(); + HBaseAdmin admin = new HBaseAdmin(conf); + HTableDescriptor tableDesc = admin.getTableDescriptor(tableName); + LOG.info("Disabling table " + getTablename()); + admin.disableTable(tableName); + for (HColumnDescriptor columnDescriptor : tableDesc.getFamilies()) { + if(Arrays.equals(columnDescriptor.getName(), mobColumnFamily)) { + columnDescriptor.setMobEnabled(true); + columnDescriptor.setMobThreshold((long) threshold); + admin.modifyColumn(tableName, columnDescriptor); + } + } + LOG.info("Enabling table " + getTablename()); + admin.enableTable(tableName); + admin.close(); + } + + @Override + protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey, + long numKeys) { + String[] args = super.getArgsForLoadTestTool(mode, modeSpecificArg, startKey, numKeys); + List<String> tmp = new ArrayList<String>(Arrays.asList(args)); + // LoadTestDataGeneratorMOB:mobColumnFamily:minMobDataSize:maxMobDataSize + tmp.add(HIPHEN + LoadTestTool.OPT_GENERATOR); + StringBuilder sb = new StringBuilder(LoadTestDataGeneratorWithMOB.class.getName()); + sb.append(COLON); + sb.append(Bytes.toString(mobColumnFamily)); + sb.append(COLON); + sb.append(minMobDataSize); + sb.append(COLON); + sb.append(maxMobDataSize); + tmp.add(sb.toString()); + return tmp.toArray(new String[tmp.size()]); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int ret = ToolRunner.run(conf, new IntegrationTestIngestWithMOB(), args); + System.exit(ret); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java index 6e97a76,ff33951..a950dce --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java @@@ -94,37 -92,41 +94,45 @@@ public class HFileLink extends FileLin private final Path tempPath; /** - * @param conf {@link Configuration} from which to extract specific archive locations - * @param path The path of the HFile Link. - * @throws IOException on unexpected error. + * Dead simple hfile link constructor */ - public HFileLink(Configuration conf, Path path) throws IOException { - this(FSUtils.getRootDir(conf), HFileArchiveUtil.getArchivePath(conf), path); - public HFileLink(final Path originPath, final Path tempPath, ++ public HFileLink(final Path originPath, final Path tempPath, final Path mobPath, + final Path archivePath) { - this.tempPath = tempPath; ++ this.tempPath = tempPath; + this.originPath = originPath; ++ this.mobPath = mobPath; + this.archivePath = archivePath; - - setLocations(originPath, tempPath, archivePath); ++ setLocations(originPath, tempPath, mobPath, archivePath); } ++ /** - * @param rootDir Path to the root directory where hbase files are stored - * @param archiveDir Path to the hbase archive directory - * @param mobDir path to the hbase mob directory - * @param path The path of the HFile Link. + * @param conf {@link Configuration} from which to extract specific archive locations + * @param hFileLinkPattern The path ending with a HFileLink pattern. (table=region-hfile) + * @throws IOException on unexpected error. */ - public HFileLink(final Path rootDir, final Path archiveDir, final Path mobDir, final Path path) { - Path hfilePath = getRelativeTablePath(path); - this.tempPath = new Path(new Path(rootDir, HConstants.HBASE_TEMP_DIRECTORY), hfilePath); - this.originPath = new Path(rootDir, hfilePath); - this.mobPath = new Path(mobDir, hfilePath); - this.archivePath = new Path(archiveDir, hfilePath); - setLocations(originPath, mobPath, tempPath, archivePath); - } + public static final HFileLink buildFromHFileLinkPattern(Configuration conf, Path hFileLinkPattern) + throws IOException { + return buildFromHFileLinkPattern(FSUtils.getRootDir(conf), + HFileArchiveUtil.getArchivePath(conf), hFileLinkPattern); + } + - /** ++ + /** * @param rootDir Path to the root directory where hbase files are stored * @param archiveDir Path to the hbase archive directory - * @param path The path of the HFile Link. + * @param hFileLinkPattern The path of the HFile Link. */ - public HFileLink(final Path rootDir, final Path archiveDir, final Path path) { - this(rootDir, archiveDir, new Path(rootDir, MobConstants.MOB_DIR_NAME), path); + public final static HFileLink buildFromHFileLinkPattern(final Path rootDir, + final Path archiveDir, + final Path hFileLinkPattern) { + Path hfilePath = getHFileLinkPatternRelativePath(hFileLinkPattern); + Path tempPath = new Path(new Path(rootDir, HConstants.HBASE_TEMP_DIRECTORY), hfilePath); + Path originPath = new Path(rootDir, hfilePath); ++ Path mobPath = new Path(new Path(rootDir, MobConstants.MOB_DIR_NAME), hfilePath); + Path archivePath = new Path(archiveDir, hfilePath); - return new HFileLink(originPath, tempPath, archivePath); ++ return new HFileLink(originPath, tempPath, mobPath, archivePath); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java index cc0d0aa,0000000..427f2cd mode 100644,000000..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java @@@ -1,103 -1,0 +1,104 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; - import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; ++import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.exceptions.LockTimeoutException; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; + +/** + * The Class ExpiredMobFileCleanerChore for running cleaner regularly to remove the expired + * mob files. + */ [email protected] - public class ExpiredMobFileCleanerChore extends Chore { ++public class ExpiredMobFileCleanerChore extends ScheduledChore { + + private static final Log LOG = LogFactory.getLog(ExpiredMobFileCleanerChore.class); + private final HMaster master; + private TableLockManager tableLockManager; + private ExpiredMobFileCleaner cleaner; + + public ExpiredMobFileCleanerChore(HMaster master) { - super(master.getServerName() + "-ExpiredMobFileCleanerChore", master.getConfiguration().getInt( - MobConstants.MOB_CLEANER_PERIOD, MobConstants.DEFAULT_MOB_CLEANER_PERIOD), master); ++ super(master.getServerName() + "-ExpiredMobFileCleanerChore", master, ++ master.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD, ++ MobConstants.DEFAULT_MOB_CLEANER_PERIOD)); + this.master = master; + this.tableLockManager = master.getTableLockManager(); + cleaner = new ExpiredMobFileCleaner(); + } + + @Override + protected void chore() { + try { + TableDescriptors htds = master.getTableDescriptors(); + Map<String, HTableDescriptor> map = htds.getAll(); + for (HTableDescriptor htd : map.values()) { + for (HColumnDescriptor hcd : htd.getColumnFamilies()) { + if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) { + // clean only for mob-enabled column. + // obtain a read table lock before cleaning, synchronize with MobFileCompactionChore. + boolean tableLocked = false; + TableLock lock = null; + try { + // the tableLockManager might be null in testing. In that case, it is lock-free. + if (tableLockManager != null) { + lock = tableLockManager.readLock(MobUtils.getTableLockName(htd.getTableName()), + "Run ExpiredMobFileCleanerChore"); + lock.acquire(); + } + tableLocked = true; + cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd); + } catch (LockTimeoutException e) { + LOG.info("Fail to acquire the lock because of timeout, maybe a" + + " MobFileCompactor is running", e); + } catch (Exception e) { + LOG.error( + "Fail to clean the expired mob files for the column " + hcd.getNameAsString() + + " in the table " + htd.getNameAsString(), e); + } finally { + if (lock != null && tableLocked) { + try { + lock.release(); + } catch (IOException e) { + LOG.error( + "Fail to release the write lock for the table " + htd.getNameAsString(), e); + } + } + } + } + } + } + } catch (Exception e) { + LOG.error("Fail to clean the expired mob files", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 7ad49a3,61a1c66..aea7e7f --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@@ -612,11 -767,6 +769,12 @@@ public class HMaster extends HRegionSer // master initialization. See HBASE-5916. this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer(); + this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this); - Threads.setDaemonThreadRunning(expiredMobFileCleanerChore.getThread()); ++ getChoreService().scheduleChore(expiredMobFileCleanerChore); ++ + this.mobFileCompactChore = new MobFileCompactionChore(this); - Threads.setDaemonThreadRunning(mobFileCompactChore.getThread()); ++ getChoreService().scheduleChore(mobFileCompactChore); + if (this.cpHost != null) { // don't let cp initialization errors kill the master try { @@@ -863,23 -1061,17 +1069,23 @@@ } private void stopChores() { + if (this.expiredMobFileCleanerChore != null) { - this.expiredMobFileCleanerChore.interrupt(); ++ this.expiredMobFileCleanerChore.cancel(true); + } + if (this.mobFileCompactChore != null) { - this.mobFileCompactChore.interrupt(); ++ this.mobFileCompactChore.cancel(true); + } if (this.balancerChore != null) { - this.balancerChore.interrupt(); + this.balancerChore.cancel(true); } if (this.clusterStatusChore != null) { - this.clusterStatusChore.interrupt(); + this.clusterStatusChore.cancel(true); } if (this.catalogJanitorChore != null) { - this.catalogJanitorChore.interrupt(); + this.catalogJanitorChore.cancel(true); } if (this.clusterStatusPublisherChore != null){ - clusterStatusPublisherChore.interrupt(); + clusterStatusPublisherChore.cancel(true); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java index 12491af,0000000..ce2df81 mode 100644,000000..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java @@@ -1,166 -1,0 +1,166 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; - import org.apache.hadoop.hbase.Chore; ++import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.exceptions.LockTimeoutException; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactor; +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.hadoop.hbase.util.Threads; + +/** + * The Class MobFileCompactChore for running compaction regularly to merge small mob files. + */ [email protected] - public class MobFileCompactionChore extends Chore{ ++public class MobFileCompactionChore extends ScheduledChore { + + private static final Log LOG = LogFactory.getLog(MobFileCompactionChore.class); + private HMaster master; + private TableLockManager tableLockManager; + private ExecutorService pool; + + public MobFileCompactionChore(HMaster master) { - super(master.getServerName() + "-MobFileCompactChore", master.getConfiguration().getInt( - MobConstants.MOB_FILE_COMPACTION_CHORE_PERIOD, - MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD), master); ++ super(master.getServerName() + "-MobFileCompactChore", master, ++ master.getConfiguration().getInt(MobConstants.MOB_FILE_COMPACTION_CHORE_PERIOD, ++ MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD)); + this.master = master; + this.tableLockManager = master.getTableLockManager(); + this.pool = createThreadPool(); + } + + @Override + protected void chore() { + try { + String className = master.getConfiguration().get(MobConstants.MOB_FILE_COMPACTOR_CLASS_KEY, + PartitionedMobFileCompactor.class.getName()); + TableDescriptors htds = master.getTableDescriptors(); + Map<String, HTableDescriptor> map = htds.getAll(); + for (HTableDescriptor htd : map.values()) { + for (HColumnDescriptor hcd : htd.getColumnFamilies()) { + if (!hcd.isMobEnabled()) { + continue; + } + // instantiate the mob file compactor. + MobFileCompactor compactor = null; + try { + compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { + Configuration.class, FileSystem.class, TableName.class, HColumnDescriptor.class, + ExecutorService.class }, + new Object[] { master.getConfiguration(), master.getFileSystem(), htd.getTableName(), + hcd, pool }); + } catch (Exception e) { + throw new IOException("Unable to load configured mob file compactor '" + className + + "'", e); + } + // compact only for mob-enabled column. + // obtain a write table lock before performing compaction to avoid race condition + // with major compaction in mob-enabled column. + boolean tableLocked = false; + TableLock lock = null; + try { + // the tableLockManager might be null in testing. In that case, it is lock-free. + if (tableLockManager != null) { + lock = tableLockManager.writeLock(MobUtils.getTableLockName(htd.getTableName()), + "Run MobFileCompactChore"); + lock.acquire(); + } + tableLocked = true; + compactor.compact(); + } catch (LockTimeoutException e) { + LOG.info("Fail to acquire the lock because of timeout, maybe a major compaction or an" + + " ExpiredMobFileCleanerChore is running", e); + } catch (Exception e) { + LOG.error("Fail to compact the mob files for the column " + hcd.getNameAsString() + + " in the table " + htd.getNameAsString(), e); + } finally { + if (lock != null && tableLocked) { + try { + lock.release(); + } catch (IOException e) { + LOG.error( + "Fail to release the write lock for the table " + htd.getNameAsString(), e); + } + } + } + } + } + } catch (Exception e) { + LOG.error("Fail to clean the expired mob files", e); + } + } + + @Override + protected void cleanup() { + super.cleanup(); + pool.shutdown(); + } + + /** + * Creates a thread pool. + * @return A thread pool. + */ + private ExecutorService createThreadPool() { + Configuration conf = master.getConfiguration(); + int maxThreads = conf.getInt(MobConstants.MOB_FILE_COMPACTION_CHORE_THREADS_MAX, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_MAX); + if (maxThreads == 0) { + maxThreads = 1; + } + long keepAliveTime = conf.getLong(MobConstants.MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME); + final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>(); + ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, + TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // waiting for a thread to pick up instead of throwing exceptions. + queue.put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException(e); + } + } + }); + ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); + return pool; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java index 95e89d2,092a17d..0664a55 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java @@@ -24,29 -25,30 +25,28 @@@ import java.util.List import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; --import org.apache.hadoop.hbase.CoordinatedStateException; - import org.apache.hadoop.hbase.HColumnDescriptor; - import org.apache.hadoop.hbase.TableName; --import org.apache.hadoop.hbase.HRegionInfo; --import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MetaTableAccessor; --import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.TableName; ++import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.backup.HFileArchiver; - import org.apache.hadoop.hbase.MetaTableAccessor; + import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.client.Delete; + import org.apache.hadoop.hbase.client.Result; + import org.apache.hadoop.hbase.client.ResultScanner; + import org.apache.hadoop.hbase.client.Scan; + import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.executor.EventType; - import org.apache.hadoop.hbase.regionserver.HRegion; - import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MasterServices; - import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; + import org.apache.hadoop.hbase.master.RegionStates; + import org.apache.hadoop.hbase.regionserver.HRegion; ++import org.apache.hadoop.hbase.util.FSUtils; @InterfaceAudience.Private public class DeleteTableHandler extends TableEventHandler { @@@ -135,59 -165,37 +163,63 @@@ */ protected void removeTableData(final List<HRegionInfo> regions) throws IOException, CoordinatedStateException { - // 1. Remove regions from META - LOG.debug("Deleting regions from META"); - MetaTableAccessor.deleteRegions(this.server.getShortCircuitConnection(), regions); - - // ----------------------------------------------------------------------- - // NOTE: At this point we still have data on disk, but nothing in hbase:meta - // if the rename below fails, hbck will report an inconsistency. - // ----------------------------------------------------------------------- - - // 2. Move the table in /hbase/.tmp - MasterFileSystem mfs = this.masterServices.getMasterFileSystem(); - Path tempTableDir = mfs.moveTableToTemp(tableName); - - // 3. Archive regions from FS (temp directory) - FileSystem fs = mfs.getFileSystem(); - for (HRegionInfo hri: regions) { - LOG.debug("Archiving region " + hri.getRegionNameAsString() + " from FS"); - HFileArchiver.archiveRegion(fs, mfs.getRootDir(), - tempTableDir, HRegion.getRegionDir(tempTableDir, hri.getEncodedName())); - } + try { + // 1. Remove regions from META + LOG.debug("Deleting regions from META"); + MetaTableAccessor.deleteRegions(this.server.getConnection(), regions); + + // ----------------------------------------------------------------------- + // NOTE: At this point we still have data on disk, but nothing in hbase:meta + // if the rename below fails, hbck will report an inconsistency. + // ----------------------------------------------------------------------- + + // 2. Move the table in /hbase/.tmp + MasterFileSystem mfs = this.masterServices.getMasterFileSystem(); + Path tempTableDir = mfs.moveTableToTemp(tableName); + + // 3. Archive regions from FS (temp directory) + FileSystem fs = mfs.getFileSystem(); + for (HRegionInfo hri : regions) { + LOG.debug("Archiving region " + hri.getRegionNameAsString() + " from FS"); + HFileArchiver.archiveRegion(fs, mfs.getRootDir(), + tempTableDir, HRegion.getRegionDir(tempTableDir, hri.getEncodedName())); + } - // Archive the mob data if there is a mob-enabled column - HColumnDescriptor[] hcds = hTableDescriptor.getColumnFamilies(); - boolean hasMob = false; - for (HColumnDescriptor hcd : hcds) { - if (hcd.isMobEnabled()) { - hasMob = true; - break; ++ // Archive the mob data if there is a mob-enabled column ++ HColumnDescriptor[] hcds = hTableDescriptor.getColumnFamilies(); ++ boolean hasMob = false; ++ for (HColumnDescriptor hcd : hcds) { ++ if (hcd.isMobEnabled()) { ++ hasMob = true; ++ break; ++ } + } - } - Path mobTableDir = null; - if (hasMob) { - // Archive mob data - mobTableDir = FSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME), - tableName); - Path regionDir = - new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName()); - if (fs.exists(regionDir)) { - HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, regionDir); ++ Path mobTableDir = null; ++ if (hasMob) { ++ // Archive mob data ++ mobTableDir = FSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME), ++ tableName); ++ Path regionDir = ++ new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName()); ++ if (fs.exists(regionDir)) { ++ HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, regionDir); ++ } + } - } - // 4. Delete table directory from FS (temp directory) - if (!fs.delete(tempTableDir, true)) { - LOG.error("Couldn't delete " + tempTableDir); - } - // Delete the table directory where the mob files are saved - if (hasMob && mobTableDir != null && fs.exists(mobTableDir)) { - if (!fs.delete(mobTableDir, true)) { - LOG.error("Couldn't delete " + mobTableDir); + // 4. Delete table directory from FS (temp directory) + if (!fs.delete(tempTableDir, true)) { + LOG.error("Couldn't delete " + tempTableDir); + } ++ // Delete the table directory where the mob files are saved ++ if (hasMob && mobTableDir != null && fs.exists(mobTableDir)) { ++ if (!fs.delete(mobTableDir, true)) { ++ LOG.error("Couldn't delete " + mobTableDir); ++ } + } - } - LOG.debug("Table '" + tableName + "' archived!"); + LOG.debug("Table '" + tableName + "' archived!"); + } finally { + cleanupTableState(); + } } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java index d91d825,e70e1c8..1f2ed0a --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java @@@ -25,10 -25,9 +25,10 @@@ import java.util.concurrent.ThreadPoolE import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.classification.InterfaceAudience; - import org.apache.hadoop.classification.InterfaceStability; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionReplicaUtil; http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java index ec0cfe5,0000000..c2abc7c mode 100644,000000..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java @@@ -1,304 -1,0 +1,308 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HMobStore; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.MobCompactionStoreScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; ++import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Compact passed set of files in the mob-enabled column family. + */ [email protected] +public class DefaultMobCompactor extends DefaultCompactor { + + private static final Log LOG = LogFactory.getLog(DefaultMobCompactor.class); + private long mobSizeThreshold; + private HMobStore mobStore; + public DefaultMobCompactor(Configuration conf, Store store) { + super(conf, store); + // The mob cells reside in the mob-enabled column family which is held by HMobStore. + // During the compaction, the compactor reads the cells from the mob files and + // probably creates new mob files. All of these operations are included in HMobStore, + // so we need to cast the Store to HMobStore. + if (!(store instanceof HMobStore)) { + throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); + } + mobStore = (HMobStore) store; + mobSizeThreshold = store.getFamily().getMobThreshold(); + } + + /** + * Creates a writer for a new file in a temporary directory. + * @param fd The file details. + * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region. + * @return Writer for a new StoreFile in the tmp dir. + * @throws IOException + */ + @Override + protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException { + // make this writer with tags always because of possible new cells with tags. + StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, + true, fd.maxMVCCReadpoint >= smallestReadPoint, true); + return writer; + } + + @Override + protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners, + ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { + Scan scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + if (scanType == ScanType.COMPACT_DROP_DELETES) { + scanType = ScanType.COMPACT_RETAIN_DELETES; + return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners, + scanType, smallestReadPoint, earliestPutTs, true); + } else { + return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners, + scanType, smallestReadPoint, earliestPutTs, false); + } + } + ++ // TODO refactor to take advantage of the throughput controller. ++ + /** + * Performs compaction on a column family with the mob flag enabled. + * This is for when the mob threshold size has changed or if the mob + * column family mode has been toggled via an alter table statement. + * Compacts the files by the following rules. + * 1. If the cell has a mob reference tag, the cell's value is the path of the mob file. + * <ol> + * <li> + * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, + * directly copy the (with mob tag) cell into the new store file. + * </li> + * <li> + * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into + * the new store file. + * </li> + * </ol> + * 2. If the cell doesn't have a reference tag. + * <ol> + * <li> + * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, + * write this cell to a mob file, and write the path of this mob file to the store file. + * </li> + * <li> + * Otherwise, directly write this cell into the store file. + * </li> + * </ol> + * In the mob compaction, the {@link MobCompactionStoreScanner} is used as a scanner + * which could output the normal cells and delete markers together when required. + * After the major compaction on the normal hfiles, we have a guarantee that we have purged all + * deleted or old version mob refs, and the delete markers are written to a del file with the + * suffix _del. Because of this, it is safe to use the del file in the mob compaction. + * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the + * mob files. When the small mob files are merged into bigger ones, the del file is added into + * the scanner to filter the deleted cells. + * @param fd File details + * @param scanner Where to read from. + * @param writer Where to write to. + * @param smallestReadPoint Smallest read point. + * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint + * @param major Is a major compaction. + * @return Whether compaction ended; false if it was interrupted for any reason. + */ + @Override + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, - long smallestReadPoint, boolean cleanSeqId, boolean major) throws IOException { ++ long smallestReadPoint, boolean cleanSeqId, ++ CompactionThroughputController throughputController, boolean major) throws IOException { + if (!(scanner instanceof MobCompactionStoreScanner)) { + throw new IllegalArgumentException( + "The scanner should be an instance of MobCompactionStoreScanner"); + } + MobCompactionStoreScanner compactionScanner = (MobCompactionStoreScanner) scanner; + int bytesWritten = 0; + // Since scanner.next() can return 'false' but still be delivering data, + // we have to use a do/while loop. + List<Cell> cells = new ArrayList<Cell>(); + // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME + int closeCheckInterval = HStore.getCloseCheckInterval(); + boolean hasMore; + Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); + byte[] fileName = null; + StoreFile.Writer mobFileWriter = null; + StoreFile.Writer delFileWriter = null; + long mobCells = 0; + long deleteMarkersCount = 0; + Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName() + .getName()); + long mobCompactedIntoMobCellsCount = 0; + long mobCompactedFromMobCellsCount = 0; + long mobCompactedIntoMobCellsSize = 0; + long mobCompactedFromMobCellsSize = 0; + try { + try { + // If the mob file writer could not be created, directly write the cell to the store file. + mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, + store.getFamily().getCompression(), store.getRegionInfo().getStartKey()); + fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); + } catch (IOException e) { + LOG.error( + "Fail to create mob writer, " + + "we will continue the compaction by writing MOB cells directly in store files", + e); + } + delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, + store.getFamily().getCompression(), store.getRegionInfo().getStartKey()); + do { + hasMore = compactionScanner.next(cells, compactionKVMax); + // output to writer: + for (Cell c : cells) { - // TODO remove the KeyValueUtil.ensureKeyValue before merging back to trunk. - KeyValue kv = KeyValueUtil.ensureKeyValue(c); - resetSeqId(smallestReadPoint, cleanSeqId, kv); ++ if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { ++ CellUtil.setSequenceId(c, 0); ++ } + if (compactionScanner.isOutputDeleteMarkers() && CellUtil.isDelete(c)) { - delFileWriter.append(kv); ++ delFileWriter.append(c); + deleteMarkersCount++; - } else if (mobFileWriter == null || kv.getTypeByte() != KeyValue.Type.Put.getCode()) { ++ } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) { + // If the mob file writer is null or the kv type is not put, directly write the cell + // to the store file. - writer.append(kv); - } else if (MobUtils.isMobReferenceCell(kv)) { - if (MobUtils.hasValidMobRefCellValue(kv)) { - int size = MobUtils.getMobValueLength(kv); ++ writer.append(c); ++ } else if (MobUtils.isMobReferenceCell(c)) { ++ if (MobUtils.hasValidMobRefCellValue(c)) { ++ int size = MobUtils.getMobValueLength(c); + if (size > mobSizeThreshold) { + // If the value size is larger than the threshold, it's regarded as a mob. Since + // its value is already in the mob file, directly write this cell to the store file - writer.append(kv); ++ writer.append(c); + } else { + // If the value is not larger than the threshold, it's not regarded a mob. Retrieve + // the mob cell from the mob file, and write it back to the store file. - Cell cell = mobStore.resolve(kv, false); - if (cell.getValueLength() != 0) { ++ Cell mobCell = mobStore.resolve(c, false); ++ if (mobCell.getValueLength() != 0) { + // put the mob data back to the store file - KeyValue mobKv = KeyValueUtil.ensureKeyValue(cell); - mobKv.setSequenceId(kv.getSequenceId()); - writer.append(mobKv); ++ // KeyValue mobKv = KeyValueUtil.ensureKeyValue(cell); ++ CellUtil.setSequenceId(mobCell, c.getSequenceId()); ++ writer.append(mobCell); + mobCompactedFromMobCellsCount++; - mobCompactedFromMobCellsSize += cell.getValueLength(); ++ mobCompactedFromMobCellsSize += mobCell.getValueLength(); + } else { + // If the value of a file is empty, there might be issues when retrieving, + // directly write the cell to the store file, and leave it to be handled by the + // next compaction. - writer.append(kv); ++ writer.append(c); + } + } + } else { - LOG.warn("The value format of the KeyValue " + kv ++ LOG.warn("The value format of the KeyValue " + c + + " is wrong, its length is less than " + Bytes.SIZEOF_INT); - writer.append(kv); ++ writer.append(c); + } - } else if (kv.getValueLength() <= mobSizeThreshold) { ++ } else if (c.getValueLength() <= mobSizeThreshold) { + // If the value size of a cell is not larger than the threshold, directly write it to + // the store file. - writer.append(kv); ++ writer.append(c); + } else { + // If the value size of a cell is larger than the threshold, it's regarded as a mob, + // write this cell to a mob file, and write the path to the store file. + mobCells++; + // append the original keyValue in the mob file. - mobFileWriter.append(kv); - KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag); ++ mobFileWriter.append(c); ++ KeyValue reference = MobUtils.createMobRefKeyValue(c, fileName, tableNameTag); + // write the cell whose value is the path of a mob file to the store file. + writer.append(reference); + mobCompactedIntoMobCellsCount++; - mobCompactedIntoMobCellsSize += kv.getValueLength(); ++ mobCompactedIntoMobCellsSize += c.getValueLength(); + } + ++progress.currentCompactedKVs; + + // check periodically to see if a system stop is requested + if (closeCheckInterval > 0) { - bytesWritten += kv.getLength(); ++ bytesWritten += KeyValueUtil.length(c); + if (bytesWritten > closeCheckInterval) { + bytesWritten = 0; + if (!store.areWritesEnabled()) { + progress.cancel(); + return false; + } + } + } + } + cells.clear(); + } while (hasMore); + } finally { + if (mobFileWriter != null) { + mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); + mobFileWriter.close(); + } + if (delFileWriter != null) { + delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount); + delFileWriter.close(); + } + } + if (mobFileWriter != null) { + if (mobCells > 0) { + // If the mob file is not empty, commit it. + mobStore.commitFile(mobFileWriter.getPath(), path); + } else { + try { + // If the mob file is empty, delete it instead of committing. + store.getFileSystem().delete(mobFileWriter.getPath(), true); + } catch (IOException e) { + LOG.error("Fail to delete the temp mob file", e); + } + } + } + if (delFileWriter != null) { + if (deleteMarkersCount > 0) { + // If the del file is not empty, commit it. + // If the commit fails, the compaction is re-performed again. + mobStore.commitFile(delFileWriter.getPath(), path); + } else { + try { + // If the del file is empty, delete it instead of committing. + store.getFileSystem().delete(delFileWriter.getPath(), true); + } catch (IOException e) { + LOG.error("Fail to delete the temp del file", e); + } + } + } + mobStore.updateMobCompactedFromMobCellsCount(mobCompactedFromMobCellsCount); + mobStore.updateMobCompactedIntoMobCellsCount(mobCompactedIntoMobCellsCount); + mobStore.updateMobCompactedFromMobCellsSize(mobCompactedFromMobCellsSize); + mobStore.updateMobCompactedIntoMobCellsSize(mobCompactedIntoMobCellsSize); + progress.complete(); + return true; + } +}
