http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
----------------------------------------------------------------------
diff --cc 
hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
index 40c5651,5335bef..13f8163
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
@@@ -262,12 -252,10 +258,12 @@@ public class HColumnDescriptor implemen
        DEFAULT_VALUES.put(EVICT_BLOCKS_ON_CLOSE, 
String.valueOf(DEFAULT_EVICT_BLOCKS_ON_CLOSE));
        DEFAULT_VALUES.put(PREFETCH_BLOCKS_ON_OPEN, 
String.valueOf(DEFAULT_PREFETCH_BLOCKS_ON_OPEN));
        for (String s : DEFAULT_VALUES.keySet()) {
-         RESERVED_KEYWORDS.add(new ImmutableBytesWritable(Bytes.toBytes(s)));
+         RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(s)));
        }
-       RESERVED_KEYWORDS.add(new 
ImmutableBytesWritable(Bytes.toBytes(ENCRYPTION)));
-       RESERVED_KEYWORDS.add(new 
ImmutableBytesWritable(Bytes.toBytes(ENCRYPTION_KEY)));
-       RESERVED_KEYWORDS.add(new ImmutableBytesWritable(IS_MOB_BYTES));
-       RESERVED_KEYWORDS.add(new ImmutableBytesWritable(MOB_THRESHOLD_BYTES));
 -    RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION)));
 -    RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION_KEY)));
++      RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION)));
++      RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION_KEY)));
++      RESERVED_KEYWORDS.add(new Bytes(IS_MOB_BYTES));
++      RESERVED_KEYWORDS.add(new Bytes(MOB_THRESHOLD_BYTES));
    }
  
    private static final int UNINITIALIZED = -1;

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
----------------------------------------------------------------------
diff --cc 
hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
index 0000000,976876cf..1180954
mode 000000,100644..100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
@@@ -1,0 -1,120 +1,138 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ 
+ import org.apache.hadoop.hbase.exceptions.DeserializationException;
+ import org.apache.hadoop.hbase.io.compress.Compression;
+ import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+ import org.apache.hadoop.hbase.regionserver.BloomType;
++import org.apache.hadoop.hbase.util.Bytes;
++import org.apache.hadoop.hbase.util.PrettyPrinter;
+ import org.apache.hadoop.hbase.testclassification.MiscTests;
+ import org.apache.hadoop.hbase.testclassification.SmallTests;
+ import org.apache.hadoop.hbase.util.BuilderStyleTest;
+ import org.junit.Test;
+ import org.junit.experimental.categories.Category;
+ 
+ /** Tests the HColumnDescriptor with appropriate arguments */
+ @Category({MiscTests.class, SmallTests.class})
+ public class TestHColumnDescriptor {
+   @Test
+   public void testPb() throws DeserializationException {
+     HColumnDescriptor hcd = new HColumnDescriptor(
+         new HColumnDescriptor(HConstants.CATALOG_FAMILY)
+             .setInMemory(true)
+             .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+             .setBloomFilterType(BloomType.NONE)
+             .setCacheDataInL1(true));
+     final int v = 123;
+     hcd.setBlocksize(v);
+     hcd.setTimeToLive(v);
+     hcd.setBlockCacheEnabled(!HColumnDescriptor.DEFAULT_BLOCKCACHE);
+     hcd.setValue("a", "b");
+     hcd.setMaxVersions(v);
+     assertEquals(v, hcd.getMaxVersions());
+     hcd.setMinVersions(v);
+     assertEquals(v, hcd.getMinVersions());
+     hcd.setKeepDeletedCells(KeepDeletedCells.TRUE);
+     hcd.setInMemory(!HColumnDescriptor.DEFAULT_IN_MEMORY);
+     boolean inmemory = hcd.isInMemory();
+     hcd.setScope(v);
+     hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
+     hcd.setBloomFilterType(BloomType.ROW);
+     hcd.setCompressionType(Algorithm.SNAPPY);
++    hcd.setMobEnabled(true);
++    hcd.setMobThreshold(1000L);
+ 
+ 
+     byte [] bytes = hcd.toByteArray();
+     HColumnDescriptor deserializedHcd = HColumnDescriptor.parseFrom(bytes);
+     assertTrue(hcd.equals(deserializedHcd));
+     assertEquals(v, hcd.getBlocksize());
+     assertEquals(v, hcd.getTimeToLive());
+     assertEquals(hcd.getValue("a"), deserializedHcd.getValue("a"));
+     assertEquals(hcd.getMaxVersions(), deserializedHcd.getMaxVersions());
+     assertEquals(hcd.getMinVersions(), deserializedHcd.getMinVersions());
+     assertEquals(hcd.getKeepDeletedCells(), 
deserializedHcd.getKeepDeletedCells());
+     assertEquals(inmemory, deserializedHcd.isInMemory());
+     assertEquals(hcd.getScope(), deserializedHcd.getScope());
+     
assertTrue(deserializedHcd.getCompressionType().equals(Compression.Algorithm.SNAPPY));
+     
assertTrue(deserializedHcd.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF));
+     assertTrue(deserializedHcd.getBloomFilterType().equals(BloomType.ROW));
++    assertEquals(hcd.isMobEnabled(), deserializedHcd.isMobEnabled());
++    assertEquals(hcd.getMobThreshold(), deserializedHcd.getMobThreshold());
+   }
+ 
+   @Test
+   /** Tests HColumnDescriptor with empty familyName*/
+   public void testHColumnDescriptorShouldThrowIAEWhenFamiliyNameEmpty()
+       throws Exception {
+     try {
+       new HColumnDescriptor("".getBytes());
+     } catch (IllegalArgumentException e) {
+       assertEquals("Family name can not be empty", e.getLocalizedMessage());
+     }
+   }
+ 
+   /**
+    * Test that we add and remove strings from configuration properly.
+    */
+   @Test
+   public void testAddGetRemoveConfiguration() throws Exception {
+     HColumnDescriptor desc = new HColumnDescriptor("foo");
+     String key = "Some";
+     String value = "value";
+     desc.setConfiguration(key, value);
+     assertEquals(value, desc.getConfigurationValue(key));
+     desc.removeConfiguration(key);
+     assertEquals(null, desc.getConfigurationValue(key));
+   }
+ 
+   @Test
++  public void testMobValuesInHColumnDescriptorShouldReadable() {
++    boolean isMob = true;
++    long threshold = 1000;
++    String isMobString = 
PrettyPrinter.format(Bytes.toStringBinary(Bytes.toBytes(isMob)),
++            HColumnDescriptor.getUnit(HColumnDescriptor.IS_MOB));
++    String thresholdString = 
PrettyPrinter.format(Bytes.toStringBinary(Bytes.toBytes(threshold)),
++            HColumnDescriptor.getUnit(HColumnDescriptor.MOB_THRESHOLD));
++    assertEquals(String.valueOf(isMob), isMobString);
++    assertEquals(String.valueOf(threshold), thresholdString);
++  }
++
++  @Test
+   public void testClassMethodsAreBuilderStyle() {
+     /* HColumnDescriptor should have a builder style setup where 
setXXX/addXXX methods
+      * can be chainable together:
+      * . For example:
+      * HColumnDescriptor hcd
+      *   = new HColumnDescriptor()
+      *     .setFoo(foo)
+      *     .setBar(bar)
+      *     .setBuz(buz)
+      *
+      * This test ensures that all methods starting with "set" returns the 
declaring object
+      */
+ 
+     BuilderStyleTest.assertClassesAreBuilderStyle(HColumnDescriptor.class);
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
----------------------------------------------------------------------
diff --cc hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index 8613a35,2095b7a..7e200df
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@@ -26,10 -26,9 +26,14 @@@ public final class TagType 
    // Please declare new Tag Types here to avoid step on pre-existing tag 
types.
    public static final byte ACL_TAG_TYPE = (byte) 1;
    public static final byte VISIBILITY_TAG_TYPE = (byte) 2;
-   public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3;
+   // public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3; // deprecated
    public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = 
(byte)4;
 +
 +  // mob tags
 +  public static final byte MOB_REFERENCE_TAG_TYPE = (byte) 5;
 +  public static final byte MOB_TABLE_NAME_TAG_TYPE = (byte) 6;
++
+   // String based tag type used in replication
+   public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
+   public static final byte TTL_TAG_TYPE = (byte)8;
  }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-common/src/main/java/org/apache/hadoop/hbase/util/PrettyPrinter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --cc hbase-common/src/main/resources/hbase-default.xml
index d1429ad,2985685..4ba6d69
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@@ -1454,118 -1462,12 +1462,126 @@@ possible configurations would overwhel
      <name>hbase.http.staticuser.user</name>
      <value>dr.stack</value>
    </property>
 +  <!-- Mob properties. -->
 +  <property>
 +    <name>hbase.mob.file.cache.size</name>
 +    <value>1000</value>
 +    <description>
 +      Number of opened file handlers to cache.
 +      A larger value will benefit reads by providing more file handlers per 
mob
 +      file cache and would reduce frequent file opening and closing.
 +      However, if this is set too high, this could lead to a "too many opened 
file handlers"
 +      The default value is 1000.
 +    </description>
 +  </property>
 +  <property>
 +    <name>hbase.mob.cache.evict.period</name>
 +    <value>3600</value>
 +    <description>
 +      The amount of time in seconds before the mob cache evicts cached mob 
files.
 +      The default value is 3600 seconds.
 +    </description>
 +  </property>
 +  <property>
 +    <name>hbase.mob.cache.evict.remain.ratio</name>
 +    <value>0.5f</value>
 +    <description>
 +      The ratio (between 0.0 and 1.0) of files that remains cached after an 
eviction
 +      is triggered when the number of cached mob files exceeds the 
hbase.mob.file.cache.size.
 +      The default value is 0.5f.
 +    </description>
 +  </property>
 +  <property>
 +    <name>hbase.mob.sweep.tool.compaction.ratio</name>
 +    <value>0.5f</value>
 +    <description>
 +      If there're too many cells deleted in a mob file, it's regarded
 +      as an invalid file and needs to be merged.
 +      If existingCellsSize/mobFileSize is less than ratio, it's regarded
 +      as an invalid file. The default value is 0.5f.
 +    </description>
 +  </property>
 +  <property>
 +    <name>hbase.mob.sweep.tool.compaction.mergeable.size</name>
 +    <value>134217728</value>
 +    <description>
 +      If the size of a mob file is less than this value, it's regarded as a 
small
 +      file and needs to be merged. The default value is 128MB.
 +    </description>
 +  </property>
 +  <property>
 +    <name>hbase.mob.sweep.tool.compaction.memstore.flush.size</name>
 +    <value>134217728</value>
 +    <description>
 +      The flush size for the memstore used by sweep job. Each sweep reducer 
owns such a memstore.
 +      The default value is 128MB.
 +    </description>
 +  </property>
 +  <property>
 +    <name>hbase.master.mob.ttl.cleaner.period</name>
 +    <value>86400000</value>
 +    <description>
 +      The period that ExpiredMobFileCleanerChore runs. The unit is 
millisecond.
 +      The default value is one day.
 +    </description>
 +  </property>
 +  <property>
 +    <name>hbase.mob.file.compaction.mergeable.threshold</name>
 +    <value>201326592</value>
 +    <description>
 +      If the size of a mob file is less than this value, it's regarded as a 
small
 +      file and needs to be merged in mob file compaction. The default value 
is 192MB.
 +    </description>
 +  </property>
 +  <property>
 +    <name>hbase.mob.delfile.max.count</name>
 +    <value>3</value>
 +    <description>
 +      The max number of del files that is allowed in the mob file compaction.
 +      In the mob file compaction, when the number of existing del files is 
larger than
 +      this value, they are merged until number of del files is not larger 
this value.
 +      The default value is 3.
 +    </description>
 +  </property>
 +  <property>
 +    <name>hbase.mob.file.compaction.batch.size</name>
 +    <value>100</value>
 +    <description>
 +      The max number of the mob files that is allowed in a batch of the mob 
file compaction.
 +      The mob file compaction merges the small mob files to bigger ones. If 
the number of the
 +      small files is very large, it could lead to a "too many opened file 
handlers" in the merge.
 +      And the merge has to be split into batches. This value limits the 
number of mob files
 +      that are selected in a batch of the mob file compaction. The default 
value is 100.
 +    </description>
 +  </property>
 +  <property>
 +    <name>hbase.master.mob.file.compaction.chore.period</name>
 +    <value>604800000</value>
 +    <description>
 +      The period that MobFileCompactionChore runs. The unit is millisecond.
 +      The default value is one week.
 +    </description>
 +  </property>
 +  <property>
 +    <name>hbase.mob.file.compactor.class</name>
 +    
<value>org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor</value>
 +    <description>
 +      Implementation of mob file compactor, the default one is 
PartitionedMobFileCompactor.
 +    </description>
 +  </property>
 +  <property>
 +    <name>hbase.master.mob.file.compaction.chore.threads.max</name>
 +    <value>1</value>
 +    <description>
 +      The max number of threads used in MobFileCompactionChore.
 +    </description>
 +  </property>
+   <property>
+     <name>hbase.regionserver.handler.abort.on.error.percent</name>
+     <value>0.5</value>
+     <description>The percent of region server RPC threads failed to abort RS.
+     -1 Disable aborting; 0 Abort if even a single handler has died;
+     0.x Abort only when this percent of handlers have died;
+     1 Abort only all of the handers have died.</description>
+   </property>
  </configuration>

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
----------------------------------------------------------------------
diff --cc 
hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
index ee2dcbb,b86a8eb..90a9a09
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
@@@ -233,37 -257,21 +257,53 @@@ public interface MetricsRegionServerSou
    String MAJOR_COMPACTED_CELLS_SIZE = "majorCompactedCellsSize";
    String MAJOR_COMPACTED_CELLS_SIZE_DESC =
        "The total amount of data processed during major compactions, in bytes";
 +  String MOB_COMPACTED_INTO_MOB_CELLS_COUNT = "mobCompactedIntoMobCellsCount";
 +  String MOB_COMPACTED_INTO_MOB_CELLS_COUNT_DESC =
 +      "The number of cells moved to mob during compaction";
 +  String MOB_COMPACTED_FROM_MOB_CELLS_COUNT = "mobCompactedFromMobCellsCount";
 +  String MOB_COMPACTED_FROM_MOB_CELLS_COUNT_DESC =
 +      "The number of cells moved from mob during compaction";
 +  String MOB_COMPACTED_INTO_MOB_CELLS_SIZE = "mobCompactedIntoMobCellsSize";
 +  String MOB_COMPACTED_INTO_MOB_CELLS_SIZE_DESC =
 +      "The total amount of cells move to mob during compaction, in bytes";
 +  String MOB_COMPACTED_FROM_MOB_CELLS_SIZE = "mobCompactedFromMobCellsSize";
 +  String MOB_COMPACTED_FROM_MOB_CELLS_SIZE_DESC =
 +      "The total amount of cells move from mob during compaction, in bytes";
 +  String MOB_FLUSH_COUNT = "mobFlushCount";
 +  String MOB_FLUSH_COUNT_DESC = "The number of the flushes in mob-enabled 
stores";
 +  String MOB_FLUSHED_CELLS_COUNT = "mobFlushedCellsCount";
 +  String MOB_FLUSHED_CELLS_COUNT_DESC = "The number of mob cells flushed to 
disk";
 +  String MOB_FLUSHED_CELLS_SIZE = "mobFlushedCellsSize";
 +  String MOB_FLUSHED_CELLS_SIZE_DESC = "The total amount of mob cells flushed 
to disk, in bytes";
 +  String MOB_SCAN_CELLS_COUNT = "mobScanCellsCount";
 +  String MOB_SCAN_CELLS_COUNT_DESC = "The number of scanned mob cells";
 +  String MOB_SCAN_CELLS_SIZE = "mobScanCellsSize";
 +  String MOB_SCAN_CELLS_SIZE_DESC = "The total amount of scanned mob cells, 
in bytes";
 +  String MOB_FILE_CACHE_ACCESS_COUNT = "mobFileCacheAccessCount";
 +  String MOB_FILE_CACHE_ACCESS_COUNT_DESC = "The count of accesses to the mob 
file cache";
 +  String MOB_FILE_CACHE_MISS_COUNT = "mobFileCacheMissCount";
 +  String MOB_FILE_CACHE_MISS_COUNT_DESC = "The count of misses to the mob 
file cache";
 +  String MOB_FILE_CACHE_HIT_PERCENT = "mobFileCacheHitPercent";
 +  String MOB_FILE_CACHE_HIT_PERCENT_DESC = "The hit percent to the mob file 
cache";
 +  String MOB_FILE_CACHE_EVICTED_COUNT = "mobFileCacheEvictedCount";
 +  String MOB_FILE_CACHE_EVICTED_COUNT_DESC = "The number of items evicted 
from the mob file cache";
 +  String MOB_FILE_CACHE_COUNT = "mobFileCacheCount";
 +  String MOB_FILE_CACHE_COUNT_DESC = "The count of cached mob files";
  
+   String HEDGED_READS = "hedgedReads";
+   String HEDGED_READS_DESC = "The number of times we started a hedged read";
+   String HEDGED_READ_WINS = "hedgedReadWins";
+   String HEDGED_READ_WINS_DESC =
+       "The number of times we started a hedged read and a hedged read won";
+ 
+   String BLOCKED_REQUESTS_COUNT = "blockedRequestCount";
+   String BLOCKED_REQUESTS_COUNT_DESC = "The number of blocked requests 
because of memstore size is "
+       + "larger than blockingMemStoreSize";
+ 
+   String SPLIT_KEY = "splitTime";
+   String SPLIT_REQUEST_KEY = "splitRequestCount";
+   String SPLIT_REQUEST_DESC = "Number of splits requested";
+   String SPLIT_SUCCESS_KEY = "splitSuccessCounnt";
+   String SPLIT_SUCCESS_DESC = "Number of successfully executed splits";
+   String FLUSH_KEY = "flushTime";
  }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
----------------------------------------------------------------------
diff --cc 
hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
index ea55fe8,9634be7..2aad115
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
@@@ -248,72 -253,17 +253,88 @@@ public interface MetricsRegionServerWra
    long getMajorCompactedCellsSize();
  
    /**
++<<<<<<< HEAD
 +   * Gets the number of cells move to mob during compaction.
 +   */
 +  long getMobCompactedIntoMobCellsCount();
 +
 +  /**
 +   * Gets the number of cells move from mob during compaction.
 +   */
 +  long getMobCompactedFromMobCellsCount();
 +
 +  /**
 +   * Gets the total amount of cells move to mob during compaction, in bytes.
 +   */
 +  long getMobCompactedIntoMobCellsSize();
 +
 +  /**
 +   * Gets the total amount of cells move from mob during compaction, in bytes.
 +   */
 +  long getMobCompactedFromMobCellsSize();
 +
 +  /**
 +   * Gets the number of the flushes in mob-enabled stores.
 +   */
 +  long getMobFlushCount();
 +
 +  /**
 +   * Gets the number of mob cells flushed to disk.
 +   */
 +  long getMobFlushedCellsCount();
 +
 +  /**
 +   * Gets the total amount of mob cells flushed to disk, in bytes.
 +   */
 +  long getMobFlushedCellsSize();
 +
 +  /**
 +   * Gets the number of scanned mob cells.
 +   */
 +  long getMobScanCellsCount();
 +
 +  /**
 +   * Gets the total amount of scanned mob cells, in bytes.
 +   */
 +  long getMobScanCellsSize();
 +
 +  /**
 +   * Gets the count of accesses to the mob file cache.
 +   */
 +  long getMobFileCacheAccessCount();
 +
 +  /**
 +   * Gets the count of misses to the mob file cache.
 +   */
 +  long getMobFileCacheMissCount();
 +
 +  /**
 +   * Gets the number of items evicted from the mob file cache.
 +   */
 +  long getMobFileCacheEvictedCount();
 +
 +  /**
 +   * Gets the count of cached mob files.
 +   */
 +  long getMobFileCacheCount();
 +
 +  /**
 +   * Gets the hit percent to the mob file cache.
 +   */
 +  int getMobFileCacheHitPercent();
++
++  /**
+    * @return Count of hedged read operations
+    */
+   public long getHedgedReadOps();
+ 
+   /**
+    * @return Count of times a hedged read beat out the primary read.
+    */
+   public long getHedgedReadWins();
+ 
+   /**
+    * @return Count of requests blocked because the memstore size is larger 
than blockingMemStoreSize
+    */
+   public long getBlockedRequestsCount();
  }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
----------------------------------------------------------------------
diff --cc 
hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
index 27d5437,2c1dcd3..45e2699
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
@@@ -219,35 -249,14 +249,44 @@@ public class MetricsRegionServerSourceI
                rsWrap.getCompactedCellsSize())
            .addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE, 
MAJOR_COMPACTED_CELLS_SIZE_DESC),
                rsWrap.getMajorCompactedCellsSize())
+ 
 +          .addCounter(Interns.info(MOB_COMPACTED_FROM_MOB_CELLS_COUNT, 
MOB_COMPACTED_FROM_MOB_CELLS_COUNT_DESC),
 +              rsWrap.getMobCompactedFromMobCellsCount())
 +          .addCounter(Interns.info(MOB_COMPACTED_INTO_MOB_CELLS_COUNT, 
MOB_COMPACTED_INTO_MOB_CELLS_COUNT_DESC),
 +              rsWrap.getMobCompactedIntoMobCellsCount())
 +          .addCounter(Interns.info(MOB_COMPACTED_FROM_MOB_CELLS_SIZE, 
MOB_COMPACTED_FROM_MOB_CELLS_SIZE_DESC),
 +              rsWrap.getMobCompactedFromMobCellsSize())
 +          .addCounter(Interns.info(MOB_COMPACTED_INTO_MOB_CELLS_SIZE, 
MOB_COMPACTED_INTO_MOB_CELLS_SIZE_DESC),
 +              rsWrap.getMobCompactedIntoMobCellsSize())
 +          .addCounter(Interns.info(MOB_FLUSH_COUNT, MOB_FLUSH_COUNT_DESC),
 +              rsWrap.getMobFlushCount())
 +          .addCounter(Interns.info(MOB_FLUSHED_CELLS_COUNT, 
MOB_FLUSHED_CELLS_COUNT_DESC),
 +              rsWrap.getMobFlushedCellsCount())
 +          .addCounter(Interns.info(MOB_FLUSHED_CELLS_SIZE, 
MOB_FLUSHED_CELLS_SIZE_DESC),
 +              rsWrap.getMobFlushedCellsSize())
 +          .addCounter(Interns.info(MOB_SCAN_CELLS_COUNT, 
MOB_SCAN_CELLS_COUNT_DESC),
 +              rsWrap.getMobScanCellsCount())
 +          .addCounter(Interns.info(MOB_SCAN_CELLS_SIZE, 
MOB_SCAN_CELLS_SIZE_DESC),
 +              rsWrap.getMobScanCellsSize())
 +          .addGauge(Interns.info(MOB_FILE_CACHE_COUNT, 
MOB_FILE_CACHE_COUNT_DESC),
 +              rsWrap.getMobFileCacheCount())
 +          .addCounter(Interns.info(MOB_FILE_CACHE_ACCESS_COUNT, 
MOB_FILE_CACHE_ACCESS_COUNT_DESC),
 +              rsWrap.getMobFileCacheAccessCount())
 +          .addCounter(Interns.info(MOB_FILE_CACHE_MISS_COUNT, 
MOB_FILE_CACHE_MISS_COUNT_DESC),
 +              rsWrap.getMobFileCacheMissCount())
 +          .addCounter(
 +              Interns.info(MOB_FILE_CACHE_EVICTED_COUNT, 
MOB_FILE_CACHE_EVICTED_COUNT_DESC),
 +              rsWrap.getMobFileCacheEvictedCount())
 +          .addGauge(Interns.info(MOB_FILE_CACHE_HIT_PERCENT, 
MOB_FILE_CACHE_HIT_PERCENT_DESC),
 +              rsWrap.getMobFileCacheHitPercent())
++
+           .addCounter(Interns.info(HEDGED_READS, HEDGED_READS_DESC), 
rsWrap.getHedgedReadOps())
+           .addCounter(Interns.info(HEDGED_READ_WINS, HEDGED_READ_WINS_DESC),
+               rsWrap.getHedgedReadWins())
+ 
+           .addCounter(Interns.info(BLOCKED_REQUESTS_COUNT, 
BLOCKED_REQUESTS_COUNT_DESC),
+             rsWrap.getBlockedRequestsCount())
+ 
            .tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC),
                rsWrap.getZookeeperQuorum())
            .tag(Interns.info(SERVER_NAME_NAME, SERVER_NAME_DESC), 
rsWrap.getServerName())

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithMOB.java
----------------------------------------------------------------------
diff --cc 
hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithMOB.java
index f8a4412,0000000..85c01cc
mode 100644,000000..100644
--- 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithMOB.java
+++ 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithMOB.java
@@@ -1,154 -1,0 +1,155 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.List;
 +
 +import org.apache.commons.cli.CommandLine;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.hbase.client.HBaseAdmin;
++import org.apache.hadoop.hbase.testclassification.IntegrationTests;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.LoadTestDataGeneratorWithMOB;
 +import org.apache.hadoop.hbase.util.LoadTestTool;
 +import org.apache.hadoop.util.ToolRunner;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +/**
 + * Integration Test for MOB ingest.
 + */
 +@Category(IntegrationTests.class)
 +public class IntegrationTestIngestWithMOB extends IntegrationTestIngest {
 +  private static final char COLON = ':';
 +
 +  private byte[] mobColumnFamily = LoadTestTool.COLUMN_FAMILY;
 +  public static final String THRESHOLD = "threshold";
 +  public static final String MIN_MOB_DATA_SIZE = "minMobDataSize";
 +  public static final String MAX_MOB_DATA_SIZE = "maxMobDataSize";
 +  private int threshold = 1024; // 1KB
 +  private int minMobDataSize = 512; // 512B
 +  private int maxMobDataSize = threshold * 5; // 5KB
 +  private static final long JUNIT_RUN_TIME = 2 * 60 * 1000; // 2 minutes
 +
 +  //similar to LOAD_TEST_TOOL_INIT_ARGS except OPT_IN_MEMORY is removed
 +  protected String[] LOAD_TEST_TOOL_MOB_INIT_ARGS = {
 +      LoadTestTool.OPT_COMPRESSION,
 +      LoadTestTool.OPT_DATA_BLOCK_ENCODING,
 +      LoadTestTool.OPT_ENCRYPTION,
 +      LoadTestTool.OPT_NUM_REGIONS_PER_SERVER,
 +      LoadTestTool.OPT_REGION_REPLICATION,
 +  };
 +
 +  @Override
 +  protected String[] getArgsForLoadTestToolInitTable() {
 +    List<String> args = new ArrayList<String>();
 +    args.add("-tn");
 +    args.add(getTablename().getNameAsString());
 +    // pass all remaining args from conf with keys <test class name>.<load 
test tool arg>
 +    String clazz = this.getClass().getSimpleName();
 +    for (String arg : LOAD_TEST_TOOL_MOB_INIT_ARGS) {
 +      String val = conf.get(String.format("%s.%s", clazz, arg));
 +      if (val != null) {
 +        args.add("-" + arg);
 +        args.add(val);
 +      }
 +    }
 +    args.add("-init_only");
 +    return args.toArray(new String[args.size()]);
 +  }
 +
 +  @Override
 +  protected void addOptions() {
 +    super.addOptions();
 +    super.addOptWithArg(THRESHOLD, "The threshold to classify cells to mob 
data");
 +    super.addOptWithArg(MIN_MOB_DATA_SIZE, "Minimum value size for mob data");
 +    super.addOptWithArg(MAX_MOB_DATA_SIZE, "Maximum value size for mob data");
 +  }
 +
 +  @Override
 +  protected void processOptions(CommandLine cmd) {
 +    super.processOptions(cmd);
 +    if (cmd.hasOption(THRESHOLD)) {
 +      threshold = Integer.parseInt(cmd.getOptionValue(THRESHOLD));
 +    }
 +    if (cmd.hasOption(MIN_MOB_DATA_SIZE)) {
 +      minMobDataSize = 
Integer.parseInt(cmd.getOptionValue(MIN_MOB_DATA_SIZE));
 +    }
 +    if (cmd.hasOption(MAX_MOB_DATA_SIZE)) {
 +      maxMobDataSize = 
Integer.parseInt(cmd.getOptionValue(MAX_MOB_DATA_SIZE));
 +    }
 +    if (minMobDataSize > maxMobDataSize) {
 +      throw new IllegalArgumentException(
 +          "The minMobDataSize should not be larger than minMobDataSize");
 +    }
 +  }
 +
 +  @Test
 +  public void testIngest() throws Exception {
 +    runIngestTest(JUNIT_RUN_TIME, 100, 10, 1024, 10, 20);
 +  };
 +
 +  @Override
 +  protected void initTable() throws IOException {
 +    super.initTable();
 +
 +    byte[] tableName = getTablename().getName();
 +    HBaseAdmin admin = new HBaseAdmin(conf);
 +    HTableDescriptor tableDesc = admin.getTableDescriptor(tableName);
 +    LOG.info("Disabling table " + getTablename());
 +    admin.disableTable(tableName);
 +    for (HColumnDescriptor columnDescriptor : tableDesc.getFamilies()) {
 +      if(Arrays.equals(columnDescriptor.getName(), mobColumnFamily)) {
 +        columnDescriptor.setMobEnabled(true);
 +        columnDescriptor.setMobThreshold((long) threshold);
 +        admin.modifyColumn(tableName, columnDescriptor);
 +      }
 +    }
 +    LOG.info("Enabling table " + getTablename());
 +    admin.enableTable(tableName);
 +    admin.close();
 +  }
 +
 +  @Override
 +  protected String[] getArgsForLoadTestTool(String mode, String 
modeSpecificArg, long startKey,
 +      long numKeys) {
 +    String[] args = super.getArgsForLoadTestTool(mode, modeSpecificArg, 
startKey, numKeys);
 +    List<String> tmp = new ArrayList<String>(Arrays.asList(args));
 +    // LoadTestDataGeneratorMOB:mobColumnFamily:minMobDataSize:maxMobDataSize
 +    tmp.add(HIPHEN + LoadTestTool.OPT_GENERATOR);
 +    StringBuilder sb = new 
StringBuilder(LoadTestDataGeneratorWithMOB.class.getName());
 +    sb.append(COLON);
 +    sb.append(Bytes.toString(mobColumnFamily));
 +    sb.append(COLON);
 +    sb.append(minMobDataSize);
 +    sb.append(COLON);
 +    sb.append(maxMobDataSize);
 +    tmp.add(sb.toString());
 +    return tmp.toArray(new String[tmp.size()]);
 +  }
 +
 +  public static void main(String[] args) throws Exception {
 +    Configuration conf = HBaseConfiguration.create();
 +    IntegrationTestingUtility.setUseDistributedCluster(conf);
 +    int ret = ToolRunner.run(conf, new IntegrationTestIngestWithMOB(), args);
 +    System.exit(ret);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
index 6e97a76,ff33951..a950dce
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
@@@ -94,37 -92,41 +94,45 @@@ public class HFileLink extends FileLin
    private final Path tempPath;
  
    /**
-    * @param conf {@link Configuration} from which to extract specific archive 
locations
-    * @param path The path of the HFile Link.
-    * @throws IOException on unexpected error.
+    * Dead simple hfile link constructor
     */
-   public HFileLink(Configuration conf, Path path) throws IOException {
-     this(FSUtils.getRootDir(conf), HFileArchiveUtil.getArchivePath(conf), 
path);
 -  public HFileLink(final Path originPath, final Path tempPath,
++  public HFileLink(final Path originPath, final Path tempPath, final Path 
mobPath,
+                    final Path archivePath) {
 -    this.tempPath  = tempPath;
++    this.tempPath = tempPath;
+     this.originPath = originPath;
++    this.mobPath = mobPath;
+     this.archivePath = archivePath;
 -
 -    setLocations(originPath, tempPath, archivePath);
++    setLocations(originPath, tempPath, mobPath, archivePath);
    }
  
++
    /**
-    * @param rootDir Path to the root directory where hbase files are stored
-    * @param archiveDir Path to the hbase archive directory
-    * @param mobDir path to the hbase mob directory
-    * @param path The path of the HFile Link.
+    * @param conf {@link Configuration} from which to extract specific archive 
locations
+    * @param hFileLinkPattern The path ending with a HFileLink pattern. 
(table=region-hfile)
+    * @throws IOException on unexpected error.
     */
-     public HFileLink(final Path rootDir, final Path archiveDir, final Path 
mobDir, final Path path) {
-         Path hfilePath = getRelativeTablePath(path);
-         this.tempPath = new Path(new Path(rootDir, 
HConstants.HBASE_TEMP_DIRECTORY), hfilePath);
-         this.originPath = new Path(rootDir, hfilePath);
-         this.mobPath = new Path(mobDir, hfilePath);
-         this.archivePath = new Path(archiveDir, hfilePath);
-         setLocations(originPath, mobPath, tempPath, archivePath);
-     }
+   public static final HFileLink buildFromHFileLinkPattern(Configuration conf, 
Path hFileLinkPattern)
+           throws IOException {
+     return buildFromHFileLinkPattern(FSUtils.getRootDir(conf),
+             HFileArchiveUtil.getArchivePath(conf), hFileLinkPattern);
+   }
  
 +
-     /**
++
+   /**
     * @param rootDir Path to the root directory where hbase files are stored
     * @param archiveDir Path to the hbase archive directory
-    * @param path The path of the HFile Link.
+    * @param hFileLinkPattern The path of the HFile Link.
     */
-   public HFileLink(final Path rootDir, final Path archiveDir, final Path 
path) {
-     this(rootDir, archiveDir, new Path(rootDir, MobConstants.MOB_DIR_NAME), 
path);
+   public final static HFileLink buildFromHFileLinkPattern(final Path rootDir,
+                                                           final Path 
archiveDir,
+                                                           final Path 
hFileLinkPattern) {
+     Path hfilePath = getHFileLinkPatternRelativePath(hFileLinkPattern);
+     Path tempPath = new Path(new Path(rootDir, 
HConstants.HBASE_TEMP_DIRECTORY), hfilePath);
+     Path originPath = new Path(rootDir, hfilePath);
++    Path mobPath = new Path(new Path(rootDir, MobConstants.MOB_DIR_NAME), 
hfilePath);
+     Path archivePath = new Path(archiveDir, hfilePath);
 -    return new HFileLink(originPath, tempPath, archivePath);
++    return new HFileLink(originPath, tempPath, mobPath, archivePath);
    }
  
    /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
index cc0d0aa,0000000..427f2cd
mode 100644,000000..100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
@@@ -1,103 -1,0 +1,104 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.hadoop.hbase.master;
 +
 +import java.io.IOException;
 +import java.util.Map;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.classification.InterfaceAudience;
- import org.apache.hadoop.hbase.Chore;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HTableDescriptor;
++import org.apache.hadoop.hbase.ScheduledChore;
 +import org.apache.hadoop.hbase.TableDescriptors;
 +import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
 +import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 +import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +
 +/**
 + * The Class ExpiredMobFileCleanerChore for running cleaner regularly to 
remove the expired
 + * mob files.
 + */
 [email protected]
- public class ExpiredMobFileCleanerChore extends Chore {
++public class ExpiredMobFileCleanerChore extends ScheduledChore {
 +
 +  private static final Log LOG = 
LogFactory.getLog(ExpiredMobFileCleanerChore.class);
 +  private final HMaster master;
 +  private TableLockManager tableLockManager;
 +  private ExpiredMobFileCleaner cleaner;
 +
 +  public ExpiredMobFileCleanerChore(HMaster master) {
-     super(master.getServerName() + "-ExpiredMobFileCleanerChore", 
master.getConfiguration().getInt(
-         MobConstants.MOB_CLEANER_PERIOD, 
MobConstants.DEFAULT_MOB_CLEANER_PERIOD), master);
++    super(master.getServerName() + "-ExpiredMobFileCleanerChore", master,
++        master.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
++                MobConstants.DEFAULT_MOB_CLEANER_PERIOD));
 +    this.master = master;
 +    this.tableLockManager = master.getTableLockManager();
 +    cleaner = new ExpiredMobFileCleaner();
 +  }
 +
 +  @Override
 +  protected void chore() {
 +    try {
 +      TableDescriptors htds = master.getTableDescriptors();
 +      Map<String, HTableDescriptor> map = htds.getAll();
 +      for (HTableDescriptor htd : map.values()) {
 +        for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
 +          if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
 +            // clean only for mob-enabled column.
 +            // obtain a read table lock before cleaning, synchronize with 
MobFileCompactionChore.
 +            boolean tableLocked = false;
 +            TableLock lock = null;
 +            try {
 +              // the tableLockManager might be null in testing. In that case, 
it is lock-free.
 +              if (tableLockManager != null) {
 +                lock = 
tableLockManager.readLock(MobUtils.getTableLockName(htd.getTableName()),
 +                  "Run ExpiredMobFileCleanerChore");
 +                lock.acquire();
 +              }
 +              tableLocked = true;
 +              
cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd);
 +            } catch (LockTimeoutException e) {
 +              LOG.info("Fail to acquire the lock because of timeout, maybe a"
 +                + " MobFileCompactor is running", e);
 +            } catch (Exception e) {
 +              LOG.error(
 +                "Fail to clean the expired mob files for the column " + 
hcd.getNameAsString()
 +                  + " in the table " + htd.getNameAsString(), e);
 +            } finally {
 +              if (lock != null && tableLocked) {
 +                try {
 +                  lock.release();
 +                } catch (IOException e) {
 +                  LOG.error(
 +                    "Fail to release the write lock for the table " + 
htd.getNameAsString(), e);
 +                }
 +              }
 +            }
 +          }
 +        }
 +      }
 +    } catch (Exception e) {
 +      LOG.error("Fail to clean the expired mob files", e);
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 7ad49a3,61a1c66..aea7e7f
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@@ -612,11 -767,6 +769,12 @@@ public class HMaster extends HRegionSer
      // master initialization. See HBASE-5916.
      
this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
  
 +    this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
-     Threads.setDaemonThreadRunning(expiredMobFileCleanerChore.getThread());
++    getChoreService().scheduleChore(expiredMobFileCleanerChore);
++
 +    this.mobFileCompactChore = new MobFileCompactionChore(this);
-     Threads.setDaemonThreadRunning(mobFileCompactChore.getThread());
++    getChoreService().scheduleChore(mobFileCompactChore);
 +
      if (this.cpHost != null) {
        // don't let cp initialization errors kill the master
        try {
@@@ -863,23 -1061,17 +1069,23 @@@
    }
  
    private void stopChores() {
 +    if (this.expiredMobFileCleanerChore != null) {
-       this.expiredMobFileCleanerChore.interrupt();
++      this.expiredMobFileCleanerChore.cancel(true);
 +    }
 +    if (this.mobFileCompactChore != null) {
-       this.mobFileCompactChore.interrupt();
++      this.mobFileCompactChore.cancel(true);
 +    }
      if (this.balancerChore != null) {
-       this.balancerChore.interrupt();
+       this.balancerChore.cancel(true);
      }
      if (this.clusterStatusChore != null) {
-       this.clusterStatusChore.interrupt();
+       this.clusterStatusChore.cancel(true);
      }
      if (this.catalogJanitorChore != null) {
-       this.catalogJanitorChore.interrupt();
+       this.catalogJanitorChore.cancel(true);
      }
      if (this.clusterStatusPublisherChore != null){
-       clusterStatusPublisherChore.interrupt();
+       clusterStatusPublisherChore.cancel(true);
      }
    }
  

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java
index 12491af,0000000..ce2df81
mode 100644,000000..100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java
@@@ -1,166 -1,0 +1,166 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.master;
 +
 +import java.io.IOException;
 +import java.util.Map;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.RejectedExecutionException;
 +import java.util.concurrent.RejectedExecutionHandler;
 +import java.util.concurrent.SynchronousQueue;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.classification.InterfaceAudience;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.hbase.Chore;
++import org.apache.hadoop.hbase.ScheduledChore;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HTableDescriptor;
 +import org.apache.hadoop.hbase.TableDescriptors;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
 +import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactor;
 +import 
org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor;
 +import org.apache.hadoop.hbase.util.ReflectionUtils;
 +import org.apache.hadoop.hbase.util.Threads;
 +
 +/**
 + * The Class MobFileCompactChore for running compaction regularly to merge 
small mob files.
 + */
 [email protected]
- public class MobFileCompactionChore extends Chore{
++public class MobFileCompactionChore extends ScheduledChore {
 +
 +  private static final Log LOG = 
LogFactory.getLog(MobFileCompactionChore.class);
 +  private HMaster master;
 +  private TableLockManager tableLockManager;
 +  private ExecutorService pool;
 +
 +  public MobFileCompactionChore(HMaster master) {
-     super(master.getServerName() + "-MobFileCompactChore", 
master.getConfiguration().getInt(
-       MobConstants.MOB_FILE_COMPACTION_CHORE_PERIOD,
-       MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD), master);
++    super(master.getServerName() + "-MobFileCompactChore", master,
++        
master.getConfiguration().getInt(MobConstants.MOB_FILE_COMPACTION_CHORE_PERIOD,
++      MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD));
 +    this.master = master;
 +    this.tableLockManager = master.getTableLockManager();
 +    this.pool = createThreadPool();
 +  }
 +
 +  @Override
 +  protected void chore() {
 +    try {
 +      String className = 
master.getConfiguration().get(MobConstants.MOB_FILE_COMPACTOR_CLASS_KEY,
 +        PartitionedMobFileCompactor.class.getName());
 +      TableDescriptors htds = master.getTableDescriptors();
 +      Map<String, HTableDescriptor> map = htds.getAll();
 +      for (HTableDescriptor htd : map.values()) {
 +        for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
 +          if (!hcd.isMobEnabled()) {
 +            continue;
 +          }
 +          // instantiate the mob file compactor.
 +          MobFileCompactor compactor = null;
 +          try {
 +            compactor = ReflectionUtils.instantiateWithCustomCtor(className, 
new Class[] {
 +              Configuration.class, FileSystem.class, TableName.class, 
HColumnDescriptor.class,
 +              ExecutorService.class },
 +              new Object[] { master.getConfiguration(), 
master.getFileSystem(), htd.getTableName(),
 +                hcd, pool });
 +          } catch (Exception e) {
 +            throw new IOException("Unable to load configured mob file 
compactor '" + className
 +              + "'", e);
 +          }
 +          // compact only for mob-enabled column.
 +          // obtain a write table lock before performing compaction to avoid 
race condition
 +          // with major compaction in mob-enabled column.
 +          boolean tableLocked = false;
 +          TableLock lock = null;
 +          try {
 +            // the tableLockManager might be null in testing. In that case, 
it is lock-free.
 +            if (tableLockManager != null) {
 +              lock = 
tableLockManager.writeLock(MobUtils.getTableLockName(htd.getTableName()),
 +                "Run MobFileCompactChore");
 +              lock.acquire();
 +            }
 +            tableLocked = true;
 +            compactor.compact();
 +          } catch (LockTimeoutException e) {
 +            LOG.info("Fail to acquire the lock because of timeout, maybe a 
major compaction or an"
 +              + " ExpiredMobFileCleanerChore is running", e);
 +          } catch (Exception e) {
 +            LOG.error("Fail to compact the mob files for the column " + 
hcd.getNameAsString()
 +              + " in the table " + htd.getNameAsString(), e);
 +          } finally {
 +            if (lock != null && tableLocked) {
 +              try {
 +                lock.release();
 +              } catch (IOException e) {
 +                LOG.error(
 +                  "Fail to release the write lock for the table " + 
htd.getNameAsString(), e);
 +              }
 +            }
 +          }
 +        }
 +      }
 +    } catch (Exception e) {
 +      LOG.error("Fail to clean the expired mob files", e);
 +    }
 +  }
 +
 +  @Override
 +  protected void cleanup() {
 +    super.cleanup();
 +    pool.shutdown();
 +  }
 +
 +  /**
 +   * Creates a thread pool.
 +   * @return A thread pool.
 +   */
 +  private ExecutorService createThreadPool() {
 +    Configuration conf = master.getConfiguration();
 +    int maxThreads = 
conf.getInt(MobConstants.MOB_FILE_COMPACTION_CHORE_THREADS_MAX,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_MAX);
 +    if (maxThreads == 0) {
 +      maxThreads = 1;
 +    }
 +    long keepAliveTime = 
conf.getLong(MobConstants.MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME);
 +    final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
 +    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, 
keepAliveTime,
 +      TimeUnit.SECONDS, queue, 
Threads.newDaemonThreadFactory("MobFileCompactionChore"),
 +      new RejectedExecutionHandler() {
 +        @Override
 +        public void rejectedExecution(Runnable r, ThreadPoolExecutor 
executor) {
 +          try {
 +            // waiting for a thread to pick up instead of throwing exceptions.
 +            queue.put(r);
 +          } catch (InterruptedException e) {
 +            throw new RejectedExecutionException(e);
 +          }
 +        }
 +    });
 +    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
 +    return pool;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
index 95e89d2,092a17d..0664a55
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
@@@ -24,29 -25,30 +25,28 @@@ import java.util.List
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.classification.InterfaceAudience;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
--import org.apache.hadoop.hbase.CoordinatedStateException;
- import org.apache.hadoop.hbase.HColumnDescriptor;
- import org.apache.hadoop.hbase.TableName;
--import org.apache.hadoop.hbase.HRegionInfo;
--import org.apache.hadoop.hbase.HTableDescriptor;
 -import org.apache.hadoop.hbase.MetaTableAccessor;
--import org.apache.hadoop.hbase.Server;
 -import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.*;
  import org.apache.hadoop.hbase.backup.HFileArchiver;
- import org.apache.hadoop.hbase.MetaTableAccessor;
+ import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.client.Delete;
+ import org.apache.hadoop.hbase.client.Result;
+ import org.apache.hadoop.hbase.client.ResultScanner;
+ import org.apache.hadoop.hbase.client.Scan;
+ import org.apache.hadoop.hbase.client.Table;
  import org.apache.hadoop.hbase.executor.EventType;
- import org.apache.hadoop.hbase.regionserver.HRegion;
- import org.apache.hadoop.hbase.util.FSUtils;
  import org.apache.hadoop.hbase.master.AssignmentManager;
  import org.apache.hadoop.hbase.master.HMaster;
  import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
  import org.apache.hadoop.hbase.master.MasterFileSystem;
  import org.apache.hadoop.hbase.master.MasterServices;
- import org.apache.hadoop.hbase.master.RegionStates;
  import org.apache.hadoop.hbase.master.RegionState.State;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobUtils;
+ import org.apache.hadoop.hbase.master.RegionStates;
+ import org.apache.hadoop.hbase.regionserver.HRegion;
++import org.apache.hadoop.hbase.util.FSUtils;
  
  @InterfaceAudience.Private
  public class DeleteTableHandler extends TableEventHandler {
@@@ -135,59 -165,37 +163,63 @@@
     */
    protected void removeTableData(final List<HRegionInfo> regions)
        throws IOException, CoordinatedStateException {
-     // 1. Remove regions from META
-     LOG.debug("Deleting regions from META");
-     MetaTableAccessor.deleteRegions(this.server.getShortCircuitConnection(), 
regions);
- 
-     // -----------------------------------------------------------------------
-     // NOTE: At this point we still have data on disk, but nothing in 
hbase:meta
-     //       if the rename below fails, hbck will report an inconsistency.
-     // -----------------------------------------------------------------------
- 
-     // 2. Move the table in /hbase/.tmp
-     MasterFileSystem mfs = this.masterServices.getMasterFileSystem();
-     Path tempTableDir = mfs.moveTableToTemp(tableName);
- 
-     // 3. Archive regions from FS (temp directory)
-     FileSystem fs = mfs.getFileSystem();
-     for (HRegionInfo hri: regions) {
-       LOG.debug("Archiving region " + hri.getRegionNameAsString() + " from 
FS");
-       HFileArchiver.archiveRegion(fs, mfs.getRootDir(),
-           tempTableDir, HRegion.getRegionDir(tempTableDir, 
hri.getEncodedName()));
-     }
+     try {
+       // 1. Remove regions from META
+       LOG.debug("Deleting regions from META");
+       MetaTableAccessor.deleteRegions(this.server.getConnection(), regions);
+ 
+       // 
-----------------------------------------------------------------------
+       // NOTE: At this point we still have data on disk, but nothing in 
hbase:meta
+       //       if the rename below fails, hbck will report an inconsistency.
+       // 
-----------------------------------------------------------------------
+ 
+       // 2. Move the table in /hbase/.tmp
+       MasterFileSystem mfs = this.masterServices.getMasterFileSystem();
+       Path tempTableDir = mfs.moveTableToTemp(tableName);
+ 
+       // 3. Archive regions from FS (temp directory)
+       FileSystem fs = mfs.getFileSystem();
+       for (HRegionInfo hri : regions) {
+         LOG.debug("Archiving region " + hri.getRegionNameAsString() + " from 
FS");
+         HFileArchiver.archiveRegion(fs, mfs.getRootDir(),
+             tempTableDir, HRegion.getRegionDir(tempTableDir, 
hri.getEncodedName()));
+       }
  
-     // Archive the mob data if there is a mob-enabled column
-     HColumnDescriptor[] hcds = hTableDescriptor.getColumnFamilies();
-     boolean hasMob = false;
-     for (HColumnDescriptor hcd : hcds) {
-       if (hcd.isMobEnabled()) {
-         hasMob = true;
-         break;
++      // Archive the mob data if there is a mob-enabled column
++      HColumnDescriptor[] hcds = hTableDescriptor.getColumnFamilies();
++      boolean hasMob = false;
++      for (HColumnDescriptor hcd : hcds) {
++        if (hcd.isMobEnabled()) {
++          hasMob = true;
++          break;
++        }
 +      }
-     }
-     Path mobTableDir = null;
-     if (hasMob) {
-       // Archive mob data
-       mobTableDir = FSUtils.getTableDir(new Path(mfs.getRootDir(), 
MobConstants.MOB_DIR_NAME),
-           tableName);
-       Path regionDir =
-           new Path(mobTableDir, 
MobUtils.getMobRegionInfo(tableName).getEncodedName());
-       if (fs.exists(regionDir)) {
-         HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, 
regionDir);
++      Path mobTableDir = null;
++      if (hasMob) {
++        // Archive mob data
++        mobTableDir = FSUtils.getTableDir(new Path(mfs.getRootDir(), 
MobConstants.MOB_DIR_NAME),
++                tableName);
++        Path regionDir =
++                new Path(mobTableDir, 
MobUtils.getMobRegionInfo(tableName).getEncodedName());
++        if (fs.exists(regionDir)) {
++          HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, 
regionDir);
++        }
 +      }
-     }
-     // 4. Delete table directory from FS (temp directory)
-     if (!fs.delete(tempTableDir, true)) {
-       LOG.error("Couldn't delete " + tempTableDir);
-     }
-     // Delete the table directory where the mob files are saved
-     if (hasMob && mobTableDir != null && fs.exists(mobTableDir)) {
-       if (!fs.delete(mobTableDir, true)) {
-         LOG.error("Couldn't delete " + mobTableDir);
+       // 4. Delete table directory from FS (temp directory)
+       if (!fs.delete(tempTableDir, true)) {
+         LOG.error("Couldn't delete " + tempTableDir);
+       }
++      // Delete the table directory where the mob files are saved
++      if (hasMob && mobTableDir != null && fs.exists(mobTableDir)) {
++        if (!fs.delete(mobTableDir, true)) {
++          LOG.error("Couldn't delete " + mobTableDir);
++        }
 +      }
-     }
  
-     LOG.debug("Table '" + tableName + "' archived!");
+       LOG.debug("Table '" + tableName + "' archived!");
+     } finally {
+       cleanupTableState();
+     }
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java
index d91d825,e70e1c8..1f2ed0a
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java
@@@ -25,10 -25,9 +25,10 @@@ import java.util.concurrent.ThreadPoolE
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.classification.InterfaceAudience;
- import org.apache.hadoop.classification.InterfaceStability;
- import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.hbase.HConstants;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.classification.InterfaceStability;
  import org.apache.hadoop.hbase.HRegionInfo;
  import org.apache.hadoop.hbase.ServerName;
  import org.apache.hadoop.hbase.client.RegionReplicaUtil;

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
index ec0cfe5,0000000..c2abc7c
mode 100644,000000..100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
@@@ -1,304 -1,0 +1,308 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Date;
 +import java.util.List;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.classification.InterfaceAudience;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.CellUtil;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.KeyValueUtil;
 +import org.apache.hadoop.hbase.Tag;
 +import org.apache.hadoop.hbase.TagType;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.regionserver.HMobStore;
 +import org.apache.hadoop.hbase.regionserver.HStore;
 +import org.apache.hadoop.hbase.regionserver.InternalScanner;
 +import org.apache.hadoop.hbase.regionserver.MobCompactionStoreScanner;
 +import org.apache.hadoop.hbase.regionserver.ScanType;
 +import org.apache.hadoop.hbase.regionserver.Store;
 +import org.apache.hadoop.hbase.regionserver.StoreFile;
 +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
 +import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
++import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
 +import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 +import org.apache.hadoop.hbase.util.Bytes;
 +
 +/**
 + * Compact passed set of files in the mob-enabled column family.
 + */
 [email protected]
 +public class DefaultMobCompactor extends DefaultCompactor {
 +
 +  private static final Log LOG = LogFactory.getLog(DefaultMobCompactor.class);
 +  private long mobSizeThreshold;
 +  private HMobStore mobStore;
 +  public DefaultMobCompactor(Configuration conf, Store store) {
 +    super(conf, store);
 +    // The mob cells reside in the mob-enabled column family which is held by 
HMobStore.
 +    // During the compaction, the compactor reads the cells from the mob 
files and
 +    // probably creates new mob files. All of these operations are included 
in HMobStore,
 +    // so we need to cast the Store to HMobStore.
 +    if (!(store instanceof HMobStore)) {
 +      throw new IllegalArgumentException("The store " + store + " is not a 
HMobStore");
 +    }
 +    mobStore = (HMobStore) store;
 +    mobSizeThreshold = store.getFamily().getMobThreshold();
 +  }
 +
 +  /**
 +   * Creates a writer for a new file in a temporary directory.
 +   * @param fd The file details.
 +   * @param smallestReadPoint The smallest mvcc readPoint across all the 
scanners in this region.
 +   * @return Writer for a new StoreFile in the tmp dir.
 +   * @throws IOException
 +   */
 +  @Override
 +  protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) 
throws IOException {
 +    // make this writer with tags always because of possible new cells with 
tags.
 +    StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, 
this.compactionCompression,
 +        true, fd.maxMVCCReadpoint >= smallestReadPoint, true);
 +    return writer;
 +  }
 +
 +  @Override
 +  protected InternalScanner createScanner(Store store, List<StoreFileScanner> 
scanners,
 +      ScanType scanType, long smallestReadPoint, long earliestPutTs) throws 
IOException {
 +    Scan scan = new Scan();
 +    scan.setMaxVersions(store.getFamily().getMaxVersions());
 +    if (scanType == ScanType.COMPACT_DROP_DELETES) {
 +      scanType = ScanType.COMPACT_RETAIN_DELETES;
 +      return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, 
scanners,
 +          scanType, smallestReadPoint, earliestPutTs, true);
 +    } else {
 +      return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, 
scanners,
 +          scanType, smallestReadPoint, earliestPutTs, false);
 +    }
 +  }
 +
++  // TODO refactor to take advantage of the throughput controller.
++
 +  /**
 +   * Performs compaction on a column family with the mob flag enabled.
 +   * This is for when the mob threshold size has changed or if the mob
 +   * column family mode has been toggled via an alter table statement.
 +   * Compacts the files by the following rules.
 +   * 1. If the cell has a mob reference tag, the cell's value is the path of 
the mob file.
 +   * <ol>
 +   * <li>
 +   * If the value size of a cell is larger than the threshold, this cell is 
regarded as a mob,
 +   * directly copy the (with mob tag) cell into the new store file.
 +   * </li>
 +   * <li>
 +   * Otherwise, retrieve the mob cell from the mob file, and writes a copy of 
the cell into
 +   * the new store file.
 +   * </li>
 +   * </ol>
 +   * 2. If the cell doesn't have a reference tag.
 +   * <ol>
 +   * <li>
 +   * If the value size of a cell is larger than the threshold, this cell is 
regarded as a mob,
 +   * write this cell to a mob file, and write the path of this mob file to 
the store file.
 +   * </li>
 +   * <li>
 +   * Otherwise, directly write this cell into the store file.
 +   * </li>
 +   * </ol>
 +   * In the mob compaction, the {@link MobCompactionStoreScanner} is used as 
a scanner
 +   * which could output the normal cells and delete markers together when 
required.
 +   * After the major compaction on the normal hfiles, we have a guarantee 
that we have purged all
 +   * deleted or old version mob refs, and the delete markers are written to a 
del file with the
 +   * suffix _del. Because of this, it is safe to use the del file in the mob 
compaction.
 +   * The mob compaction doesn't take place in the normal hfiles, it occurs 
directly in the
 +   * mob files. When the small mob files are merged into bigger ones, the del 
file is added into
 +   * the scanner to filter the deleted cells.
 +   * @param fd File details
 +   * @param scanner Where to read from.
 +   * @param writer Where to write to.
 +   * @param smallestReadPoint Smallest read point.
 +   * @param cleanSeqId When true, remove seqId(used to be mvcc) value which 
is <= smallestReadPoint
 +   * @param major Is a major compaction.
 +   * @return Whether compaction ended; false if it was interrupted for any 
reason.
 +   */
 +  @Override
 +  protected boolean performCompaction(FileDetails fd, InternalScanner 
scanner, CellSink writer,
-       long smallestReadPoint, boolean cleanSeqId, boolean major) throws 
IOException {
++      long smallestReadPoint, boolean cleanSeqId,
++      CompactionThroughputController throughputController,  boolean major) 
throws IOException {
 +    if (!(scanner instanceof MobCompactionStoreScanner)) {
 +      throw new IllegalArgumentException(
 +          "The scanner should be an instance of MobCompactionStoreScanner");
 +    }
 +    MobCompactionStoreScanner compactionScanner = (MobCompactionStoreScanner) 
scanner;
 +    int bytesWritten = 0;
 +    // Since scanner.next() can return 'false' but still be delivering data,
 +    // we have to use a do/while loop.
 +    List<Cell> cells = new ArrayList<Cell>();
 +    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
 +    int closeCheckInterval = HStore.getCloseCheckInterval();
 +    boolean hasMore;
 +    Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), 
store.getColumnFamilyName());
 +    byte[] fileName = null;
 +    StoreFile.Writer mobFileWriter = null;
 +    StoreFile.Writer delFileWriter = null;
 +    long mobCells = 0;
 +    long deleteMarkersCount = 0;
 +    Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, 
store.getTableName()
 +            .getName());
 +    long mobCompactedIntoMobCellsCount = 0;
 +    long mobCompactedFromMobCellsCount = 0;
 +    long mobCompactedIntoMobCellsSize = 0;
 +    long mobCompactedFromMobCellsSize = 0;
 +    try {
 +      try {
 +        // If the mob file writer could not be created, directly write the 
cell to the store file.
 +        mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), 
fd.maxKeyCount,
 +            store.getFamily().getCompression(), 
store.getRegionInfo().getStartKey());
 +        fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
 +      } catch (IOException e) {
 +        LOG.error(
 +            "Fail to create mob writer, "
 +                + "we will continue the compaction by writing MOB cells 
directly in store files",
 +            e);
 +      }
 +      delFileWriter = mobStore.createDelFileWriterInTmp(new 
Date(fd.latestPutTs), fd.maxKeyCount,
 +          store.getFamily().getCompression(), 
store.getRegionInfo().getStartKey());
 +      do {
 +        hasMore = compactionScanner.next(cells, compactionKVMax);
 +        // output to writer:
 +        for (Cell c : cells) {
-           // TODO remove the KeyValueUtil.ensureKeyValue before merging back 
to trunk.
-           KeyValue kv = KeyValueUtil.ensureKeyValue(c);
-           resetSeqId(smallestReadPoint, cleanSeqId, kv);
++          if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
++            CellUtil.setSequenceId(c, 0);
++          }
 +          if (compactionScanner.isOutputDeleteMarkers() && 
CellUtil.isDelete(c)) {
-             delFileWriter.append(kv);
++            delFileWriter.append(c);
 +            deleteMarkersCount++;
-           } else if (mobFileWriter == null || kv.getTypeByte() != 
KeyValue.Type.Put.getCode()) {
++          } else if (mobFileWriter == null || c.getTypeByte() != 
KeyValue.Type.Put.getCode()) {
 +            // If the mob file writer is null or the kv type is not put, 
directly write the cell
 +            // to the store file.
-             writer.append(kv);
-           } else if (MobUtils.isMobReferenceCell(kv)) {
-             if (MobUtils.hasValidMobRefCellValue(kv)) {
-               int size = MobUtils.getMobValueLength(kv);
++            writer.append(c);
++          } else if (MobUtils.isMobReferenceCell(c)) {
++            if (MobUtils.hasValidMobRefCellValue(c)) {
++              int size = MobUtils.getMobValueLength(c);
 +              if (size > mobSizeThreshold) {
 +                // If the value size is larger than the threshold, it's 
regarded as a mob. Since
 +                // its value is already in the mob file, directly write this 
cell to the store file
-                 writer.append(kv);
++                writer.append(c);
 +              } else {
 +                // If the value is not larger than the threshold, it's not 
regarded a mob. Retrieve
 +                // the mob cell from the mob file, and write it back to the 
store file.
-                 Cell cell = mobStore.resolve(kv, false);
-                 if (cell.getValueLength() != 0) {
++                Cell mobCell = mobStore.resolve(c, false);
++                if (mobCell.getValueLength() != 0) {
 +                  // put the mob data back to the store file
-                   KeyValue mobKv = KeyValueUtil.ensureKeyValue(cell);
-                   mobKv.setSequenceId(kv.getSequenceId());
-                   writer.append(mobKv);
++                  // KeyValue mobKv = KeyValueUtil.ensureKeyValue(cell);
++                  CellUtil.setSequenceId(mobCell, c.getSequenceId());
++                  writer.append(mobCell);
 +                  mobCompactedFromMobCellsCount++;
-                   mobCompactedFromMobCellsSize += cell.getValueLength();
++                  mobCompactedFromMobCellsSize += mobCell.getValueLength();
 +                } else {
 +                  // If the value of a file is empty, there might be issues 
when retrieving,
 +                  // directly write the cell to the store file, and leave it 
to be handled by the
 +                  // next compaction.
-                   writer.append(kv);
++                  writer.append(c);
 +                }
 +              }
 +            } else {
-               LOG.warn("The value format of the KeyValue " + kv
++              LOG.warn("The value format of the KeyValue " + c
 +                  + " is wrong, its length is less than " + Bytes.SIZEOF_INT);
-               writer.append(kv);
++              writer.append(c);
 +            }
-           } else if (kv.getValueLength() <= mobSizeThreshold) {
++          } else if (c.getValueLength() <= mobSizeThreshold) {
 +            // If the value size of a cell is not larger than the threshold, 
directly write it to
 +            // the store file.
-             writer.append(kv);
++            writer.append(c);
 +          } else {
 +            // If the value size of a cell is larger than the threshold, it's 
regarded as a mob,
 +            // write this cell to a mob file, and write the path to the store 
file.
 +            mobCells++;
 +            // append the original keyValue in the mob file.
-             mobFileWriter.append(kv);
-             KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, 
tableNameTag);
++            mobFileWriter.append(c);
++            KeyValue reference = MobUtils.createMobRefKeyValue(c, fileName, 
tableNameTag);
 +            // write the cell whose value is the path of a mob file to the 
store file.
 +            writer.append(reference);
 +            mobCompactedIntoMobCellsCount++;
-             mobCompactedIntoMobCellsSize += kv.getValueLength();
++            mobCompactedIntoMobCellsSize += c.getValueLength();
 +          }
 +          ++progress.currentCompactedKVs;
 +
 +          // check periodically to see if a system stop is requested
 +          if (closeCheckInterval > 0) {
-             bytesWritten += kv.getLength();
++            bytesWritten += KeyValueUtil.length(c);
 +            if (bytesWritten > closeCheckInterval) {
 +              bytesWritten = 0;
 +              if (!store.areWritesEnabled()) {
 +                progress.cancel();
 +                return false;
 +              }
 +            }
 +          }
 +        }
 +        cells.clear();
 +      } while (hasMore);
 +    } finally {
 +      if (mobFileWriter != null) {
 +        mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
 +        mobFileWriter.close();
 +      }
 +      if (delFileWriter != null) {
 +        delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount);
 +        delFileWriter.close();
 +      }
 +    }
 +    if (mobFileWriter != null) {
 +      if (mobCells > 0) {
 +        // If the mob file is not empty, commit it.
 +        mobStore.commitFile(mobFileWriter.getPath(), path);
 +      } else {
 +        try {
 +          // If the mob file is empty, delete it instead of committing.
 +          store.getFileSystem().delete(mobFileWriter.getPath(), true);
 +        } catch (IOException e) {
 +          LOG.error("Fail to delete the temp mob file", e);
 +        }
 +      }
 +    }
 +    if (delFileWriter != null) {
 +      if (deleteMarkersCount > 0) {
 +        // If the del file is not empty, commit it.
 +        // If the commit fails, the compaction is re-performed again.
 +        mobStore.commitFile(delFileWriter.getPath(), path);
 +      } else {
 +        try {
 +          // If the del file is empty, delete it instead of committing.
 +          store.getFileSystem().delete(delFileWriter.getPath(), true);
 +        } catch (IOException e) {
 +          LOG.error("Fail to delete the temp del file", e);
 +        }
 +      }
 +    }
 +    
mobStore.updateMobCompactedFromMobCellsCount(mobCompactedFromMobCellsCount);
 +    
mobStore.updateMobCompactedIntoMobCellsCount(mobCompactedIntoMobCellsCount);
 +    mobStore.updateMobCompactedFromMobCellsSize(mobCompactedFromMobCellsSize);
 +    mobStore.updateMobCompactedIntoMobCellsSize(mobCompactedIntoMobCellsSize);
 +    progress.complete();
 +    return true;
 +  }
 +}

Reply via email to