HBASE-11339 Merge remote-tracking branch 'apache/hbase-11339' (Jingcheng Du)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/493f36c8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/493f36c8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/493f36c8

Branch: refs/heads/master
Commit: 493f36c899bc990de0400fbf777a2bf29c5c60e3
Parents: 2073954 7ddae39
Author: Jonathan M Hsieh <[email protected]>
Authored: Wed Jul 22 12:49:42 2015 -0700
Committer: Jonathan M Hsieh <[email protected]>
Committed: Wed Jul 22 12:51:11 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/HColumnDescriptor.java  |   58 +-
 .../org/apache/hadoop/hbase/client/Admin.java   |   52 +
 .../apache/hadoop/hbase/client/HBaseAdmin.java  |  102 ++
 .../hbase/ipc/AsyncServerResponseHandler.java   |    3 +-
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  |    2 +-
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |   38 +-
 .../hadoop/hbase/protobuf/ProtobufUtil.java.rej |   16 +
 .../hadoop/hbase/TestHColumnDescriptor.java     |   18 +
 .../java/org/apache/hadoop/hbase/TagType.java   |    5 +
 .../apache/hadoop/hbase/util/PrettyPrinter.java |   10 +
 .../src/main/resources/hbase-default.xml        |  117 ++
 .../regionserver/MetricsRegionServerSource.java |   32 +
 .../MetricsRegionServerWrapper.java             |   70 ++
 .../MetricsRegionServerSourceImpl.java          |   32 +
 .../hbase/IntegrationTestAcidGuarantees.java    |    3 +-
 .../hbase/IntegrationTestIngestWithMOB.java     |  156 +++
 .../hbase/chaos/actions/CompactMobAction.java   |   65 ++
 .../chaos/factories/MobNoKillMonkeyFactory.java |   67 ++
 .../MobSlowDeterministicMonkeyFactory.java      |  156 +++
 .../hbase/chaos/factories/MonkeyFactory.java    |    4 +
 .../org/apache/hadoop/hbase/io/HFileLink.java   |   23 +-
 .../hbase/io/hfile/HFilePrettyPrinter.java      |  100 +-
 .../master/ExpiredMobFileCleanerChore.java      |  107 ++
 .../org/apache/hadoop/hbase/master/HMaster.java |  101 ++
 .../hadoop/hbase/master/MasterFileSystem.java   |   22 +-
 .../hbase/master/MasterMobCompactionThread.java |  184 ++++
 .../hadoop/hbase/master/MasterRpcServices.java  |  104 ++
 .../hadoop/hbase/master/MobCompactionChore.java |   93 ++
 .../hbase/master/cleaner/HFileLinkCleaner.java  |    6 +
 .../procedure/AddColumnFamilyProcedure.java     |    4 +-
 .../procedure/DeleteColumnFamilyProcedure.java  |    6 +-
 .../master/procedure/DeleteTableProcedure.java  |   43 +-
 .../procedure/MasterDDLOperationHelper.java     |    5 +-
 .../master/procedure/ModifyTableProcedure.java  |    3 +-
 .../snapshot/DisabledTableSnapshotHandler.java  |    8 +
 .../master/snapshot/MasterSnapshotVerifier.java |   12 +-
 .../apache/hadoop/hbase/mob/CachedMobFile.java  |  114 ++
 .../hbase/mob/DefaultMobStoreCompactor.java     |  300 +++++
 .../hbase/mob/DefaultMobStoreFlusher.java       |  226 ++++
 .../hadoop/hbase/mob/ExpiredMobFileCleaner.java |  120 ++
 .../apache/hadoop/hbase/mob/MobCacheConfig.java |   63 ++
 .../apache/hadoop/hbase/mob/MobConstants.java   |  121 +++
 .../org/apache/hadoop/hbase/mob/MobFile.java    |  152 +++
 .../apache/hadoop/hbase/mob/MobFileCache.java   |  325 ++++++
 .../apache/hadoop/hbase/mob/MobFileName.java    |  169 +++
 .../apache/hadoop/hbase/mob/MobStoreEngine.java |   48 +
 .../org/apache/hadoop/hbase/mob/MobUtils.java   |  898 +++++++++++++++
 .../mob/compactions/MobCompactionRequest.java   |   64 ++
 .../hbase/mob/compactions/MobCompactor.java     |   90 ++
 .../PartitionedMobCompactionRequest.java        |  146 +++
 .../compactions/PartitionedMobCompactor.java    |  679 ++++++++++++
 .../hbase/mob/mapreduce/MemStoreWrapper.java    |  185 ++++
 .../mapreduce/MobFilePathHashPartitioner.java   |   41 +
 .../hadoop/hbase/mob/mapreduce/SweepJob.java    |  603 ++++++++++
 .../mob/mapreduce/SweepJobNodeTracker.java      |  100 ++
 .../hadoop/hbase/mob/mapreduce/SweepMapper.java |   87 ++
 .../hbase/mob/mapreduce/SweepReducer.java       |  472 ++++++++
 .../hadoop/hbase/mob/mapreduce/Sweeper.java     |  119 ++
 .../hbase/regionserver/DefaultStoreEngine.java  |   20 +-
 .../hbase/regionserver/DefaultStoreFlusher.java |    2 +-
 .../hadoop/hbase/regionserver/HMobStore.java    |  585 ++++++++++
 .../hadoop/hbase/regionserver/HRegion.java      |   30 +-
 .../hbase/regionserver/HRegionServer.java       |    6 +
 .../hadoop/hbase/regionserver/HStore.java       |   58 +-
 .../MetricsRegionServerWrapperImpl.java         |  145 ++-
 .../regionserver/MobCompactionStoreScanner.java |   66 ++
 .../regionserver/MobReferenceOnlyFilter.java    |   42 +
 .../hbase/regionserver/MobStoreScanner.java     |   85 ++
 .../regionserver/ReversedMobStoreScanner.java   |   85 ++
 .../hadoop/hbase/regionserver/StoreFile.java    |   55 +-
 .../hbase/regionserver/StoreFileInfo.java       |   30 +-
 .../hbase/regionserver/StoreFileScanner.java    |    2 +-
 .../hadoop/hbase/regionserver/StoreScanner.java |    6 +-
 .../compactions/CompactionRequest.java          |   19 +
 .../regionserver/compactions/Compactor.java     |   34 +-
 .../compactions/DefaultCompactor.java           |   31 +-
 .../compactions/StripeCompactor.java            |    3 +-
 .../hadoop/hbase/snapshot/ExportSnapshot.java   |   23 +-
 .../hbase/snapshot/RestoreSnapshotHelper.java   |   90 +-
 .../hadoop/hbase/snapshot/SnapshotInfo.java     |   49 +-
 .../hadoop/hbase/snapshot/SnapshotManifest.java |  171 ++-
 .../hbase/snapshot/SnapshotReferenceUtil.java   |   12 +-
 .../hadoop/hbase/util/HFileArchiveUtil.java     |    4 +-
 .../hbase/util/hbck/HFileCorruptionChecker.java |  197 ++++
 .../apache/hadoop/hbase/TestAcidGuarantees.java |   66 +-
 .../client/TestMobCloneSnapshotFromClient.java  |  252 +++++
 .../TestMobRestoreSnapshotFromClient.java       |  306 ++++++
 .../TestMobSnapshotCloneIndependence.java       |  435 ++++++++
 .../hbase/client/TestMobSnapshotFromClient.java |  305 ++++++
 .../apache/hadoop/hbase/mob/MobTestUtil.java    |   87 ++
 .../hadoop/hbase/mob/TestCachedMobFile.java     |  154 +++
 .../hbase/mob/TestDefaultMobStoreFlusher.java   |  190 ++++
 .../hbase/mob/TestExpiredMobFileCleaner.java    |  175 +++
 .../hbase/mob/TestMobDataBlockEncoding.java     |  135 +++
 .../apache/hadoop/hbase/mob/TestMobFile.java    |  124 +++
 .../hadoop/hbase/mob/TestMobFileCache.java      |  202 ++++
 .../hadoop/hbase/mob/TestMobFileLink.java       |   66 ++
 .../hadoop/hbase/mob/TestMobFileName.java       |   79 ++
 .../hbase/mob/compactions/TestMobCompactor.java | 1026 ++++++++++++++++++
 .../TestPartitionedMobCompactionRequest.java    |   60 +
 .../TestPartitionedMobCompactor.java            |  440 ++++++++
 .../hbase/mob/mapreduce/TestMobSweepJob.java    |  169 +++
 .../hbase/mob/mapreduce/TestMobSweepMapper.java |  120 ++
 .../mob/mapreduce/TestMobSweepReducer.java      |  219 ++++
 .../hbase/mob/mapreduce/TestMobSweeper.java     |  368 +++++++
 .../MetricsRegionServerWrapperStub.java         |   70 ++
 .../hbase/regionserver/TestDeleteMobTable.java  |  270 +++++
 .../hbase/regionserver/TestHMobStore.java       |  557 ++++++++++
 .../regionserver/TestMobStoreCompaction.java    |  460 ++++++++
 .../hbase/regionserver/TestMobStoreScanner.java |  512 +++++++++
 .../regionserver/TestRegionServerMetrics.java   |   64 ++
 .../hbase/regionserver/TestStoreFileInfo.java   |    6 +-
 .../hbase/snapshot/MobSnapshotTestingUtils.java |  347 ++++++
 .../hbase/snapshot/SnapshotTestingUtils.java    |    9 +-
 .../hbase/snapshot/TestMobExportSnapshot.java   |  439 ++++++++
 .../TestMobFlushSnapshotFromClient.java         |  547 ++++++++++
 .../TestMobRestoreFlushSnapshotFromClient.java  |  211 ++++
 .../snapshot/TestMobRestoreSnapshotHelper.java  |  160 +++
 .../snapshot/TestMobSecureExportSnapshot.java   |   53 +
 .../util/LoadTestDataGeneratorWithMOB.java      |   73 ++
 .../apache/hadoop/hbase/util/LoadTestTool.java  |   17 +
 .../apache/hadoop/hbase/util/TestHBaseFsck.java |  109 ++
 hbase-shell/src/main/ruby/hbase/admin.rb        |   24 +
 hbase-shell/src/main/ruby/shell.rb              |    2 +
 .../src/main/ruby/shell/commands/compact_mob.rb |   42 +
 .../ruby/shell/commands/major_compact_mob.rb    |   42 +
 src/main/asciidoc/_chapters/hbase_mob.adoc      |  236 ++++
 src/main/asciidoc/book.adoc                     |    1 +
 128 files changed, 17864 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --cc 
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 36f6fb5,d1c44b4..0de2762
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@@ -165,8 -169,8 +168,8 @@@ import com.google.protobuf.TextFormat
  /**
   * Protobufs utility.
   */
[email protected](value="DP_CREATE_CLASSLOADER_INSIDE_DO_PRIVILEGED",
--  justification="None. Address sometime.")
[email protected](
++  value="DP_CREATE_CLASSLOADER_INSIDE_DO_PRIVILEGED", justification="None. 
Address sometime.")
  @InterfaceAudience.Private // TODO: some clients (Hive, etc) use this class
  public final class ProtobufUtil {
  
@@@ -1585,7 -1623,7 +1588,7 @@@
    throws IOException {
      CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder()
          .setCall(call).setRegion(
--            RequestConverter.buildRegionSpecifier(REGION_NAME, 
HConstants.EMPTY_BYTE_ARRAY)).build();
++           RequestConverter.buildRegionSpecifier(REGION_NAME, 
HConstants.EMPTY_BYTE_ARRAY)).build();
      try {
        CoprocessorServiceResponse response =
            client.execMasterService(null, request);
@@@ -1887,7 -1925,7 +1890,8 @@@
      if (proto.getType() != AccessControlProtos.Permission.Type.Global) {
        return toTablePermission(proto);
      } else {
--      List<Permission.Action> actions = 
toPermissionActions(proto.getGlobalPermission().getActionList());
++      List<Permission.Action> actions = toPermissionActions(
++        proto.getGlobalPermission().getActionList());
        return new Permission(actions.toArray(new 
Permission.Action[actions.size()]));
      }
    }
@@@ -1954,7 -1992,7 +1958,7 @@@
          AccessControlProtos.NamespacePermission.Builder builder =
              AccessControlProtos.NamespacePermission.newBuilder();
          
builder.setNamespaceName(ByteString.copyFromUtf8(tablePerm.getNamespace()));
--        Permission.Action actions[] = perm.getActions();
++        Permission.Action[] actions = perm.getActions();
          if (actions != null) {
            for (Permission.Action a : actions) {
              builder.addAction(toPermissionAction(a));
@@@ -2997,6 -3027,29 +3001,30 @@@
      return desc.build();
    }
  
+   /**
+    * This version of protobuf's mergeDelimitedFrom avoid the hard-coded 64MB 
limit for decoding
+    * buffers
+    * @param builder current message builder
+    * @param in Inputsream with delimited protobuf data
+    * @throws IOException
+    */
 -  public static void mergeDelimitedFrom(Message.Builder builder, InputStream 
in) throws IOException {
++  public static void mergeDelimitedFrom(Message.Builder builder, InputStream 
in)
++    throws IOException {
+     // This used to be builder.mergeDelimitedFrom(in);
+     // but is replaced to allow us to bump the protobuf size limit.
+     final int firstByte = in.read();
+     if (firstByte == -1) {
+       // bail out. (was return false;)
+     } else {
+       final int size = CodedInputStream.readRawVarint32(firstByte, in);
+       final InputStream limitedInput = new LimitInputStream(in, size);
+       final CodedInputStream codedInput = 
CodedInputStream.newInstance(limitedInput);
+       codedInput.setSizeLimit(size);
+       builder.mergeFrom(codedInput);
+       codedInput.checkLastTagWas(0);
+     }
+   }
+ 
    public static ReplicationLoadSink toReplicationLoadSink(
        ClusterStatusProtos.ReplicationLoadSink cls) {
      return new ReplicationLoadSink(cls.getAgeOfLastAppliedOp(), 
cls.getTimeStampsOfLastAppliedOp());

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java.rej
----------------------------------------------------------------------
diff --cc 
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java.rej
index 0000000,0000000..7cd2dad
new file mode 100644
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java.rej
@@@ -1,0 -1,0 +1,16 @@@
++--- 
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++++ 
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
++@@ -124,12 +126,12 @@
++ import 
org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
++ import 
org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
++ import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
+++import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
++ import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
++ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
++ import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
++ import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
++ import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
++-import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
++ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
++ import org.apache.hadoop.hbase.quotas.QuotaScope;
++ import org.apache.hadoop.hbase.quotas.QuotaType;

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
----------------------------------------------------------------------
diff --cc 
hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
index fab2a49,26c55bb..2ae8f4d
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
@@@ -259,6 -259,36 +259,38 @@@ public class MetricsRegionServerSourceI
            .addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE, 
MAJOR_COMPACTED_CELLS_SIZE_DESC),
                rsWrap.getMajorCompactedCellsSize())
  
 -          .addCounter(Interns.info(CELLS_COUNT_COMPACTED_FROM_MOB, 
CELLS_COUNT_COMPACTED_FROM_MOB_DESC),
++          .addCounter(
++              Interns.info(CELLS_COUNT_COMPACTED_FROM_MOB, 
CELLS_COUNT_COMPACTED_FROM_MOB_DESC),
+               rsWrap.getCellsCountCompactedFromMob())
+           .addCounter(Interns.info(CELLS_COUNT_COMPACTED_TO_MOB, 
CELLS_COUNT_COMPACTED_TO_MOB_DESC),
+               rsWrap.getCellsCountCompactedToMob())
 -          .addCounter(Interns.info(CELLS_SIZE_COMPACTED_FROM_MOB, 
CELLS_SIZE_COMPACTED_FROM_MOB_DESC),
++          .addCounter(
++              Interns.info(CELLS_SIZE_COMPACTED_FROM_MOB, 
CELLS_SIZE_COMPACTED_FROM_MOB_DESC),
+               rsWrap.getCellsSizeCompactedFromMob())
+           .addCounter(Interns.info(CELLS_SIZE_COMPACTED_TO_MOB, 
CELLS_SIZE_COMPACTED_TO_MOB_DESC),
+               rsWrap.getCellsSizeCompactedToMob())
+           .addCounter(Interns.info(MOB_FLUSH_COUNT, MOB_FLUSH_COUNT_DESC),
+               rsWrap.getMobFlushCount())
+           .addCounter(Interns.info(MOB_FLUSHED_CELLS_COUNT, 
MOB_FLUSHED_CELLS_COUNT_DESC),
+               rsWrap.getMobFlushedCellsCount())
+           .addCounter(Interns.info(MOB_FLUSHED_CELLS_SIZE, 
MOB_FLUSHED_CELLS_SIZE_DESC),
+               rsWrap.getMobFlushedCellsSize())
+           .addCounter(Interns.info(MOB_SCAN_CELLS_COUNT, 
MOB_SCAN_CELLS_COUNT_DESC),
+               rsWrap.getMobScanCellsCount())
+           .addCounter(Interns.info(MOB_SCAN_CELLS_SIZE, 
MOB_SCAN_CELLS_SIZE_DESC),
+               rsWrap.getMobScanCellsSize())
+           .addGauge(Interns.info(MOB_FILE_CACHE_COUNT, 
MOB_FILE_CACHE_COUNT_DESC),
+               rsWrap.getMobFileCacheCount())
+           .addCounter(Interns.info(MOB_FILE_CACHE_ACCESS_COUNT, 
MOB_FILE_CACHE_ACCESS_COUNT_DESC),
+               rsWrap.getMobFileCacheAccessCount())
+           .addCounter(Interns.info(MOB_FILE_CACHE_MISS_COUNT, 
MOB_FILE_CACHE_MISS_COUNT_DESC),
+               rsWrap.getMobFileCacheMissCount())
+           .addCounter(
+               Interns.info(MOB_FILE_CACHE_EVICTED_COUNT, 
MOB_FILE_CACHE_EVICTED_COUNT_DESC),
+               rsWrap.getMobFileCacheEvictedCount())
+           .addGauge(Interns.info(MOB_FILE_CACHE_HIT_PERCENT, 
MOB_FILE_CACHE_HIT_PERCENT_DESC),
+               rsWrap.getMobFileCacheHitPercent())
+ 
            .addCounter(Interns.info(HEDGED_READS, HEDGED_READS_DESC), 
rsWrap.getHedgedReadOps())
            .addCounter(Interns.info(HEDGED_READ_WINS, HEDGED_READ_WINS_DESC),
                rsWrap.getHedgedReadWins())

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MobNoKillMonkeyFactory.java
----------------------------------------------------------------------
diff --cc 
hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MobNoKillMonkeyFactory.java
index 0000000,f832a2e..ba7f67f
mode 000000,100644..100644
--- 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MobNoKillMonkeyFactory.java
+++ 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MobNoKillMonkeyFactory.java
@@@ -1,0 -1,70 +1,67 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.chaos.factories;
+ 
+ import org.apache.hadoop.hbase.chaos.actions.*;
+ import org.apache.hadoop.hbase.chaos.monkies.ChaosMonkey;
+ import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
+ import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
+ import org.apache.hadoop.hbase.chaos.policies.TwoConcurrentActionPolicy;
+ 
+ /**
+  * This is a copy of NoKillMonkeyFactory that also does mob compactions.
+  */
+ public class MobNoKillMonkeyFactory extends MonkeyFactory {
+   @Override public ChaosMonkey build() {
+     Action[] actions1 = new Action[] {
 -            new CompactMobAction(tableName, 
MonkeyConstants.DEFAULT_PERIODIC_ACTION1_PERIOD),
 -            new CompactTableAction(tableName, 
MonkeyConstants.DEFAULT_PERIODIC_ACTION1_PERIOD),
 -            new CompactRandomRegionOfTableAction(tableName,
 -                    MonkeyConstants.DEFAULT_COMPACT_RANDOM_REGION_RATIO),
 -            new FlushTableAction(tableName),
 -            new FlushRandomRegionOfTableAction(tableName),
 -            new MoveRandomRegionOfTableAction(tableName)
++      new CompactMobAction(tableName, 
MonkeyConstants.DEFAULT_PERIODIC_ACTION1_PERIOD),
++      new CompactTableAction(tableName, 
MonkeyConstants.DEFAULT_PERIODIC_ACTION1_PERIOD),
++      new CompactRandomRegionOfTableAction(tableName,
++        MonkeyConstants.DEFAULT_COMPACT_RANDOM_REGION_RATIO),
++      new FlushTableAction(tableName),
++      new FlushRandomRegionOfTableAction(tableName),
++      new MoveRandomRegionOfTableAction(tableName)
+     };
+ 
+     Action[] actions2 = new Action[] {
 -            new SplitRandomRegionOfTableAction(tableName),
 -            new MergeRandomAdjacentRegionsOfTableAction(tableName),
 -            new SnapshotTableAction(tableName),
 -            new AddColumnAction(tableName),
 -            new RemoveColumnAction(tableName, columnFamilies),
 -            new ChangeEncodingAction(tableName),
 -            new ChangeCompressionAction(tableName),
 -            new ChangeBloomFilterAction(tableName),
 -            new ChangeVersionsAction(tableName)
++      new SplitRandomRegionOfTableAction(tableName),
++      new MergeRandomAdjacentRegionsOfTableAction(tableName),
++      new SnapshotTableAction(tableName),
++      new AddColumnAction(tableName),
++      new RemoveColumnAction(tableName, columnFamilies),
++      new ChangeEncodingAction(tableName),
++      new ChangeCompressionAction(tableName),
++      new ChangeBloomFilterAction(tableName),
++      new ChangeVersionsAction(tableName)
+     };
+ 
+     Action[] actions3 = new Action[] {
 -            new 
MoveRegionsOfTableAction(MonkeyConstants.DEFAULT_MOVE_REGIONS_SLEEP_TIME,
 -                    MonkeyConstants.DEFAULT_MOVE_REGIONS_MAX_TIME,
 -                    tableName),
 -            new 
MoveRandomRegionOfTableAction(MonkeyConstants.DEFAULT_RESTART_ACTIVE_MASTER_SLEEP_TIME,
 -                    tableName),
 -    };
++      new 
MoveRegionsOfTableAction(MonkeyConstants.DEFAULT_MOVE_REGIONS_SLEEP_TIME,
++        MonkeyConstants.DEFAULT_MOVE_REGIONS_MAX_TIME, tableName),
++      new 
MoveRandomRegionOfTableAction(MonkeyConstants.DEFAULT_RESTART_ACTIVE_MASTER_SLEEP_TIME,
++        tableName), };
+ 
 -    Action[] actions4 = new Action[] {
 -            new DumpClusterStatusAction()
 -    };
++    Action[] actions4 = new Action[] { new DumpClusterStatusAction() };
+ 
+     return new PolicyBasedChaosMonkey(util,
 -            new 
TwoConcurrentActionPolicy(MonkeyConstants.DEFAULT_PERIODIC_ACTION1_PERIOD, 
actions1, actions2),
 -            new 
PeriodicRandomActionPolicy(MonkeyConstants.DEFAULT_PERIODIC_ACTION2_PERIOD,actions3),
 -            new 
PeriodicRandomActionPolicy(MonkeyConstants.DEFAULT_PERIODIC_ACTION4_PERIOD,actions4));
++      new 
TwoConcurrentActionPolicy(MonkeyConstants.DEFAULT_PERIODIC_ACTION1_PERIOD, 
actions1,
++        actions2),
++      new 
PeriodicRandomActionPolicy(MonkeyConstants.DEFAULT_PERIODIC_ACTION2_PERIOD,actions3),
++      new 
PeriodicRandomActionPolicy(MonkeyConstants.DEFAULT_PERIODIC_ACTION4_PERIOD,actions4));
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java
----------------------------------------------------------------------
diff --cc 
hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java
index 2f65251,f3d535b..069e8af
--- 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java
+++ 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java
@@@ -69,7 -69,8 +69,9 @@@ public abstract class MonkeyFactory 
    public static final String SERVER_KILLING = "serverKilling";
    public static final String STRESS_AM = "stressAM";
    public static final String NO_KILL = "noKill";
 +  public static final String MASTER_KILLING = "masterKilling";
+   public static final String MOB_NO_KILL = "mobNoKill";
+   public static final String MOB_SLOW_DETERMINISTIC = "mobSlowDeterministic";
  
    public static Map<String, MonkeyFactory> FACTORIES = 
ImmutableMap.<String,MonkeyFactory>builder()
      .put(CALM, new CalmMonkeyFactory())
@@@ -78,7 -79,8 +80,9 @@@
      .put(SERVER_KILLING, new ServerKillingMonkeyFactory())
      .put(STRESS_AM, new StressAssignmentManagerMonkeyFactory())
      .put(NO_KILL, new NoKillMonkeyFactory())
 +    .put(MASTER_KILLING, new MasterKillingMonkeyFactory())
+     .put(MOB_NO_KILL, new MobNoKillMonkeyFactory())
+     .put(MOB_SLOW_DETERMINISTIC, new MobNoKillMonkeyFactory())
      .build();
  
    public static MonkeyFactory getFactory(String factoryName) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
index 025053e,dc12762..2818d88
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
@@@ -313,8 -332,11 +332,11 @@@ public class HFilePrettyPrinter extend
    private void scanKeysValues(Path file, KeyValueStatsCollector fileStats,
        HFileScanner scanner,  byte[] row) throws IOException {
      Cell pCell = null;
+     FileSystem fs = FileSystem.get(getConf());
+     Set<String> foundMobFiles = new 
LinkedHashSet<String>(FOUND_MOB_FILES_CACHE_CAPACITY);
+     Set<String> missingMobFiles = new 
LinkedHashSet<String>(MISSING_MOB_FILES_CACHE_CAPACITY);
      do {
 -      Cell cell = scanner.getKeyValue();
 +      Cell cell = scanner.getCell();
        if (row != null && row.length != 0) {
          int result = CellComparator.COMPARATOR.compareRows(cell, row, 0, 
row.length);
          if (result > 0) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
index 0000000,7ca3362..8c9bc2b
mode 000000,100644..100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
@@@ -1,0 -1,107 +1,107 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hbase.master;
+ 
+ import java.io.IOException;
+ import java.util.Map;
+ import java.util.concurrent.TimeUnit;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.HColumnDescriptor;
+ import org.apache.hadoop.hbase.HTableDescriptor;
+ import org.apache.hadoop.hbase.ScheduledChore;
+ import org.apache.hadoop.hbase.TableDescriptors;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
+ import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
+ import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner;
+ import org.apache.hadoop.hbase.mob.MobConstants;
+ import org.apache.hadoop.hbase.mob.MobUtils;
+ 
+ /**
+  * The Class ExpiredMobFileCleanerChore for running cleaner regularly to 
remove the expired
+  * mob files.
+  */
+ @InterfaceAudience.Private
+ public class ExpiredMobFileCleanerChore extends ScheduledChore {
+ 
+   private static final Log LOG = 
LogFactory.getLog(ExpiredMobFileCleanerChore.class);
+   private final HMaster master;
+   private TableLockManager tableLockManager;
+   private ExpiredMobFileCleaner cleaner;
+ 
+   public ExpiredMobFileCleanerChore(HMaster master) {
+     super(master.getServerName() + "-ExpiredMobFileCleanerChore", master, 
master.getConfiguration()
+       .getInt(MobConstants.MOB_CLEANER_PERIOD, 
MobConstants.DEFAULT_MOB_CLEANER_PERIOD), master
+       .getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
+         MobConstants.DEFAULT_MOB_CLEANER_PERIOD), TimeUnit.SECONDS);
+     this.master = master;
+     this.tableLockManager = master.getTableLockManager();
+     cleaner = new ExpiredMobFileCleaner();
+     cleaner.setConf(master.getConfiguration());
+   }
+ 
+   @Override
+   protected void chore() {
+     try {
+       TableDescriptors htds = master.getTableDescriptors();
+       Map<String, HTableDescriptor> map = htds.getAll();
+       for (HTableDescriptor htd : map.values()) {
+         for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
+           if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
+             // clean only for mob-enabled column.
+             // obtain a read table lock before cleaning, synchronize with 
MobFileCompactionChore.
+             boolean tableLocked = false;
+             TableLock lock = null;
+             try {
+               // the tableLockManager might be null in testing. In that case, 
it is lock-free.
+               if (tableLockManager != null) {
+                 lock = 
tableLockManager.readLock(MobUtils.getTableLockName(htd.getTableName()),
+                   "Run ExpiredMobFileCleanerChore");
+                 lock.acquire();
+               }
+               tableLocked = true;
+               
cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd);
+             } catch (LockTimeoutException e) {
+               LOG.info("Fail to acquire the lock because of timeout, maybe a"
+                 + " MobFileCompactor is running", e);
+             } catch (Exception e) {
+               LOG.error(
+                 "Fail to clean the expired mob files for the column " + 
hcd.getNameAsString()
+                   + " in the table " + htd.getNameAsString(), e);
+             } finally {
+               if (lock != null && tableLocked) {
+                 try {
+                   lock.release();
+                 } catch (IOException e) {
+                   LOG.error(
+                     "Fail to release the read lock for the table " + 
htd.getNameAsString(), e);
+                 }
+               }
+             }
+           }
+         }
+       }
+     } catch (Exception e) {
+       LOG.error("Fail to clean the expired mob files", e);
+     }
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index bc52edb,6cc229a..f7d839b
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@@ -101,10 -102,8 +102,11 @@@ import org.apache.hadoop.hbase.master.p
  import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
  import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
  import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
 +import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
 +import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
 +import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
  import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+ import org.apache.hadoop.hbase.mob.MobConstants;
  import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@@ -789,11 -793,20 +801,26 @@@ public class HMaster extends HRegionSer
      // master initialization. See HBASE-5916.
      
this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
  
 +    // Check and set the znode ACLs if needed in case we are overtaking a 
non-secure configuration
 +    status.setStatus("Checking ZNode ACLs");
 +    zooKeeper.checkAndSetZNodeAcls();
 +
 +    status.setStatus("Calling postStartMaster coprocessors");
++
+     this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
+     getChoreService().scheduleChore(expiredMobFileCleanerChore);
+ 
+     int mobCompactionPeriod = 
conf.getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
+       MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD);
+     if (mobCompactionPeriod > 0) {
+       this.mobCompactChore = new MobCompactionChore(this, 
mobCompactionPeriod);
+       getChoreService().scheduleChore(mobCompactChore);
+     } else {
+       LOG
+         .info("The period is " + mobCompactionPeriod + " seconds, 
MobCompactionChore is disabled");
+     }
+     this.mobCompactThread = new MasterMobCompactionThread(this);
+ 
      if (this.cpHost != null) {
        // don't let cp initialization errors kill the master
        try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
index 3718a5a,5088901..9b95ae7
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@@ -615,6 -591,22 +617,24 @@@ public class MasterFileSystem 
              + ")");
        }
      }
++
+     // archive and delete mob files
+     if (hasMob) {
+       Path mobTableDir =
+           FSUtils.getTableDir(new Path(getRootDir(), 
MobConstants.MOB_DIR_NAME), region.getTable());
+       HRegionInfo mobRegionInfo = 
MobUtils.getMobRegionInfo(region.getTable());
+       Path mobFamilyDir =
+           new Path(mobTableDir,
+               new Path(mobRegionInfo.getEncodedName(), 
Bytes.toString(familyName)));
+       // archive mob family store files
+       MobUtils.archiveMobStoreFiles(conf, fs, mobRegionInfo, mobFamilyDir, 
familyName);
++
+       if (!fs.delete(mobFamilyDir, true)) {
+         throw new IOException("Could not delete mob store files for family "
+             + Bytes.toString(familyName) + " from FileSystem region "
+             + mobRegionInfo.getRegionNameAsString() + "(" + 
mobRegionInfo.getEncodedName() + ")");
+       }
+     }
    }
  
    public void stop() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java
index 0000000,d1f58ba..f8a5c15
mode 000000,100644..100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java
@@@ -1,0 -1,184 +1,184 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.master;
+ 
+ import java.io.IOException;
+ import java.util.List;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.RejectedExecutionException;
+ import java.util.concurrent.SynchronousQueue;
+ import java.util.concurrent.ThreadFactory;
+ import java.util.concurrent.ThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.hbase.HColumnDescriptor;
+ import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.mob.MobUtils;
+ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+ 
+ /**
+  * The mob compaction thread used in {@link MasterRpcServices}
+  */
+ @InterfaceAudience.Private
+ public class MasterMobCompactionThread {
+   static final Log LOG = LogFactory.getLog(MasterMobCompactionThread.class);
+   private final HMaster master;
+   private final Configuration conf;
+   private final ExecutorService mobCompactorPool;
+   private final ExecutorService masterMobPool;
+ 
+   public MasterMobCompactionThread(HMaster master) {
+     this.master = master;
+     this.conf = master.getConfiguration();
+     final String n = Thread.currentThread().getName();
+     // this pool is used to run the mob compaction
+     this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,
+       new SynchronousQueue<Runnable>(), new ThreadFactory() {
+         @Override
+         public Thread newThread(Runnable r) {
+           Thread t = new Thread(r);
+           t.setName(n + "-MasterMobCompaction-" + 
EnvironmentEdgeManager.currentTime());
+           return t;
+         }
+       });
+     ((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true);
+     // this pool is used in the mob compaction to compact the mob files by 
partitions
+     // in parallel
+     this.mobCompactorPool = MobUtils
+       .createMobCompactorThreadPool(master.getConfiguration());
+   }
+ 
+   /**
+    * Requests mob compaction
+    * @param conf The Configuration
+    * @param fs The file system
+    * @param tableName The table the compact
+    * @param columns The column descriptors
+    * @param tableLockManager The tableLock manager
+    * @param allFiles Whether add all mob files into the compaction.
+    */
+   public void requestMobCompaction(Configuration conf, FileSystem fs, 
TableName tableName,
+     List<HColumnDescriptor> columns, TableLockManager tableLockManager, 
boolean allFiles)
+     throws IOException {
+     master.reportMobCompactionStart(tableName);
+     try {
+       masterMobPool.execute(new CompactionRunner(fs, tableName, columns, 
tableLockManager,
+         allFiles, mobCompactorPool));
+     } catch (RejectedExecutionException e) {
+       // in case the request is rejected by the pool
+       try {
+         master.reportMobCompactionEnd(tableName);
+       } catch (IOException e1) {
+         LOG.error("Failed to mark end of mob compation", e1);
+       }
+       throw e;
+     }
+     if (LOG.isDebugEnabled()) {
+       LOG.debug("The mob compaction is requested for the columns " + columns
+         + " of the table " + tableName.getNameAsString());
+     }
+   }
+ 
+   private class CompactionRunner implements Runnable {
+     private FileSystem fs;
+     private TableName tableName;
+     private List<HColumnDescriptor> hcds;
+     private TableLockManager tableLockManager;
+     private boolean allFiles;
+     private ExecutorService pool;
+ 
+     public CompactionRunner(FileSystem fs, TableName tableName, 
List<HColumnDescriptor> hcds,
+       TableLockManager tableLockManager, boolean allFiles, ExecutorService 
pool) {
+       super();
+       this.fs = fs;
+       this.tableName = tableName;
+       this.hcds = hcds;
+       this.tableLockManager = tableLockManager;
+       this.allFiles = allFiles;
+       this.pool = pool;
+     }
+ 
+     @Override
+     public void run() {
+       try {
+         for (HColumnDescriptor hcd : hcds) {
+           MobUtils.doMobCompaction(conf, fs, tableName, hcd, pool, 
tableLockManager,
+             allFiles);
+         }
+       } catch (IOException e) {
+         LOG.error("Failed to perform the mob compaction", e);
+       } finally {
+         try {
+           master.reportMobCompactionEnd(tableName);
+         } catch (IOException e) {
+           LOG.error("Failed to mark end of mob compation", e);
+         }
+       }
+     }
+   }
+ 
+   /**
+    * Only interrupt once it's done with a run through the work loop.
+    */
+   private void interruptIfNecessary() {
+     mobCompactorPool.shutdown();
+     masterMobPool.shutdown();
+   }
+ 
+   /**
+    * Wait for all the threads finish.
+    */
+   private void join() {
+     waitFor(mobCompactorPool, "Mob Compaction Thread");
+     waitFor(masterMobPool, "Region Server Mob Compaction Thread");
+   }
+ 
+   /**
+    * Closes the MasterMobCompactionThread.
+    */
+   public void close() {
+     interruptIfNecessary();
+     join();
+   }
+ 
+   /**
+    * Wait for thread finish.
+    * @param t the thread to wait
+    * @param name the thread name.
+    */
+   private void waitFor(ExecutorService t, String name) {
+     boolean done = false;
+     while (!done) {
+       try {
+         done = t.awaitTermination(60, TimeUnit.SECONDS);
+         LOG.info("Waiting for " + name + " to finish...");
+         if (!done) {
+           t.shutdownNow();
+         }
+       } catch (InterruptedException ie) {
+         LOG.warn("Interrupted waiting for " + name + " to finish...");
+       }
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index c828880,7d025db..302d215
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@@ -1394,6 -1365,109 +1400,104 @@@ public class MasterRpcServices extends 
      return response.build();
    }
  
+   /**
+    * Compact a region on the master.
+    *
+    * @param controller the RPC controller
+    * @param request the request
+    * @throws ServiceException
+    */
+   @Override
+   @QosPriority(priority=HConstants.ADMIN_QOS)
+   public CompactRegionResponse compactRegion(final RpcController controller,
+     final CompactRegionRequest request) throws ServiceException {
+     try {
+       master.checkInitialized();
+       byte[] regionName = request.getRegion().getValue().toByteArray();
+       TableName tableName = HRegionInfo.getTable(regionName);
+       // if the region is a mob region, do the mob file compaction.
+       if (MobUtils.isMobRegionName(tableName, regionName)) {
+         return compactMob(request, tableName);
+       } else {
+         return super.compactRegion(controller, request);
+       }
+     } catch (IOException ie) {
+       throw new ServiceException(ie);
+     }
+   }
+ 
+   @Override
+   @QosPriority(priority=HConstants.ADMIN_QOS)
+   public GetRegionInfoResponse getRegionInfo(final RpcController controller,
+     final GetRegionInfoRequest request) throws ServiceException {
 -    try {
 -      master.checkInitialized();
 -      byte[] regionName = request.getRegion().getValue().toByteArray();
 -      TableName tableName = HRegionInfo.getTable(regionName);
 -      if (MobUtils.isMobRegionName(tableName, regionName)) {
 -        // a dummy region info contains the compaction state.
 -        HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(tableName);
 -        GetRegionInfoResponse.Builder builder = 
GetRegionInfoResponse.newBuilder();
 -        builder.setRegionInfo(HRegionInfo.convert(mobRegionInfo));
 -        if (request.hasCompactionState() && request.getCompactionState()) {
 -          builder.setCompactionState(master.getMobCompactionState(tableName));
 -        }
 -        return builder.build();
 -      } else {
 -        return super.getRegionInfo(controller, request);
++    byte[] regionName = request.getRegion().getValue().toByteArray();
++    TableName tableName = HRegionInfo.getTable(regionName);
++    if (MobUtils.isMobRegionName(tableName, regionName)) {
++      // a dummy region info contains the compaction state.
++      HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(tableName);
++      GetRegionInfoResponse.Builder builder = 
GetRegionInfoResponse.newBuilder();
++      builder.setRegionInfo(HRegionInfo.convert(mobRegionInfo));
++      if (request.hasCompactionState() && request.getCompactionState()) {
++        builder.setCompactionState(master.getMobCompactionState(tableName));
+       }
 -    } catch (IOException ie) {
 -      throw new ServiceException(ie);
++      return builder.build();
++    } else {
++      return super.getRegionInfo(controller, request);
+     }
+   }
+ 
+   /**
+    * Compacts the mob files in the current table.
+    * @param request the request.
+    * @param tableName the current table name.
+    * @return The response of the mob file compaction.
+    * @throws IOException
+    */
+   private CompactRegionResponse compactMob(final CompactRegionRequest request,
+     TableName tableName) throws IOException {
+     if (!master.getTableStateManager().isTableState(tableName, 
TableState.State.ENABLED)) {
+       throw new DoNotRetryIOException("Table " + tableName + " is not 
enabled");
+     }
+     boolean allFiles = false;
+     List<HColumnDescriptor> compactedColumns = new 
ArrayList<HColumnDescriptor>();
+     HColumnDescriptor[] hcds = 
master.getTableDescriptors().get(tableName).getColumnFamilies();
+     byte[] family = null;
+     if (request.hasFamily()) {
+       family = request.getFamily().toByteArray();
+       for (HColumnDescriptor hcd : hcds) {
+         if (Bytes.equals(family, hcd.getName())) {
+           if (!hcd.isMobEnabled()) {
+             LOG.error("Column family " + hcd.getNameAsString() + " is not a 
mob column family");
+             throw new DoNotRetryIOException("Column family " + 
hcd.getNameAsString()
+                     + " is not a mob column family");
+           }
+           compactedColumns.add(hcd);
+         }
+       }
+     } else {
+       for (HColumnDescriptor hcd : hcds) {
+         if (hcd.isMobEnabled()) {
+           compactedColumns.add(hcd);
+         }
+       }
+     }
+     if (compactedColumns.isEmpty()) {
+       LOG.error("No mob column families are assigned in the mob compaction");
+       throw new DoNotRetryIOException(
+               "No mob column families are assigned in the mob compaction");
+     }
+     if (request.hasMajor() && request.getMajor()) {
+       allFiles = true;
+     }
+     String familyLogMsg = (family != null) ? Bytes.toString(family) : "";
+     if (LOG.isTraceEnabled()) {
+       LOG.trace("User-triggered mob compaction requested for table: "
+               + tableName.getNameAsString() + " for column family: " + 
familyLogMsg);
+     }
+     master.requestMobCompaction(tableName, compactedColumns, allFiles);
+     return CompactRegionResponse.newBuilder().build();
+   }
+ 
    @Override
    public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
        IsBalancerEnabledRequest request) throws ServiceException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java
index 0000000,f122955..4b956e6
mode 000000,100644..100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java
@@@ -1,0 -1,93 +1,93 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.master;
+ 
+ import java.util.Map;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.TimeUnit;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.HColumnDescriptor;
+ import org.apache.hadoop.hbase.HTableDescriptor;
+ import org.apache.hadoop.hbase.ScheduledChore;
+ import org.apache.hadoop.hbase.TableDescriptors;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.client.TableState;
+ import org.apache.hadoop.hbase.mob.MobUtils;
+ 
+ /**
+  * The Class MobCompactChore for running compaction regularly to merge small 
mob files.
+  */
+ @InterfaceAudience.Private
+ public class MobCompactionChore extends ScheduledChore {
+ 
+   private static final Log LOG = LogFactory.getLog(MobCompactionChore.class);
+   private HMaster master;
+   private TableLockManager tableLockManager;
+   private ExecutorService pool;
+ 
+   public MobCompactionChore(HMaster master, int period) {
+     // use the period as initial delay.
+     super(master.getServerName() + "-MobCompactionChore", master, period, 
period, TimeUnit.SECONDS);
+     this.master = master;
+     this.tableLockManager = master.getTableLockManager();
+     this.pool = 
MobUtils.createMobCompactorThreadPool(master.getConfiguration());
+   }
+ 
+   @Override
+   protected void chore() {
+     try {
+       TableDescriptors htds = master.getTableDescriptors();
+       Map<String, HTableDescriptor> map = htds.getAll();
+       for (HTableDescriptor htd : map.values()) {
+         if (!master.getTableStateManager().isTableState(htd.getTableName(),
+           TableState.State.ENABLED)) {
+           continue;
+         }
+         boolean reported = false;
+         try {
+           for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
+             if (!hcd.isMobEnabled()) {
+               continue;
+             }
+             if (!reported) {
+               master.reportMobCompactionStart(htd.getTableName());
+               reported = true;
+             }
+             MobUtils.doMobCompaction(master.getConfiguration(), 
master.getFileSystem(),
+               htd.getTableName(), hcd, pool, tableLockManager, false);
+           }
+         } finally {
+           if (reported) {
+             master.reportMobCompactionEnd(htd.getTableName());
+           }
+         }
+       }
+     } catch (Exception e) {
+       LOG.error("Failed to compact mob files", e);
+     }
+   }
+ 
+   @Override
+   protected void cleanup() {
+     super.cleanup();
+     pool.shutdown();
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
index 6e52ec2,fb9754a..75edfab
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
@@@ -284,11 -284,8 +285,14 @@@ public class DeleteColumnFamilyProcedur
        throw new InvalidFamilyOperationException("Family '" + 
getColumnFamilyName()
            + "' does not exist, so it cannot be deleted");
      }
 +
 +    if (unmodifiedHTableDescriptor.getColumnFamilies().length == 1) {
 +      throw new InvalidFamilyOperationException("Family '" + 
getColumnFamilyName()
 +        + "' is the only column family in the table, so it cannot be 
deleted");
 +    }
++
+     // whether mob family
+     hasMob = unmodifiedHTableDescriptor.getFamily(familyName).isMobEnabled();
    }
  
    /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
index 60212e8,45b7b51..c0a5091
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
@@@ -18,8 -18,8 +18,8 @@@
  
  package org.apache.hadoop.hbase.master.procedure;
  
--import java.io.InputStream;
  import java.io.IOException;
++import java.io.InputStream;
  import java.io.OutputStream;
  import java.security.PrivilegedExceptionAction;
  import java.util.ArrayList;
@@@ -27,16 -27,12 +27,17 @@@ import java.util.List
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
 -import org.apache.hadoop.hbase.*;
--import org.apache.hadoop.hbase.classification.InterfaceAudience;
--import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.hbase.HRegionInfo;
++import org.apache.hadoop.hbase.HTableDescriptor;
++import org.apache.hadoop.hbase.MetaTableAccessor;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.TableNotDisabledException;
 +import org.apache.hadoop.hbase.TableNotFoundException;
- import org.apache.hadoop.hbase.HRegionInfo;
  import org.apache.hadoop.hbase.backup.HFileArchiver;
- import org.apache.hadoop.hbase.MetaTableAccessor;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
  import org.apache.hadoop.hbase.client.ClusterConnection;
  import org.apache.hadoop.hbase.client.Delete;
  import org.apache.hadoop.hbase.client.Result;
@@@ -44,15 -40,17 +45,17 @@@ import org.apache.hadoop.hbase.client.R
  import org.apache.hadoop.hbase.client.Scan;
  import org.apache.hadoop.hbase.client.Table;
  import org.apache.hadoop.hbase.exceptions.HBaseException;
 -import org.apache.hadoop.hbase.mob.MobConstants;
 -import org.apache.hadoop.hbase.mob.MobUtils;
--import org.apache.hadoop.hbase.regionserver.HRegion;
  import org.apache.hadoop.hbase.master.AssignmentManager;
  import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
  import org.apache.hadoop.hbase.master.MasterFileSystem;
++import org.apache.hadoop.hbase.mob.MobConstants;
++import org.apache.hadoop.hbase.mob.MobUtils;
++import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
++import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
  import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
  import 
org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
--import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
--import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
++import org.apache.hadoop.hbase.regionserver.HRegion;
  import org.apache.hadoop.hbase.util.FSUtils;
  import org.apache.hadoop.security.UserGroupInformation;
  
@@@ -403,8 -425,6 +428,8 @@@ public class DeleteTableProcedur
  
    protected static void deleteTableStates(final MasterProcedureEnv env, final 
TableName tableName)
        throws IOException {
 -    
ProcedureSyncWait.getMasterQuotaManager(env).removeTableFromNamespaceQuota(tableName);
 +    if (!tableName.isSystemTable()) {
 +      
ProcedureSyncWait.getMasterQuotaManager(env).removeTableFromNamespaceQuota(tableName);
 +    }
    }
- }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
index 0000000,457eb6c..7c4d6fe
mode 000000,100644..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
@@@ -1,0 -1,114 +1,114 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.mob;
+ 
+ import java.io.IOException;
+ import java.util.concurrent.atomic.AtomicLong;
+ 
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+ import org.apache.hadoop.hbase.regionserver.BloomType;
+ import org.apache.hadoop.hbase.regionserver.StoreFile;
+ 
+ /**
+  * Cached mob file.
+  */
+ @InterfaceAudience.Private
+ public class CachedMobFile extends MobFile implements 
Comparable<CachedMobFile> {
+ 
+   private long accessCount;
+   private AtomicLong referenceCount = new AtomicLong(0);
+ 
+   public CachedMobFile(StoreFile sf) {
+     super(sf);
+   }
+ 
+   public static CachedMobFile create(FileSystem fs, Path path, Configuration 
conf,
+       CacheConfig cacheConf) throws IOException {
+     StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+     return new CachedMobFile(sf);
+   }
+ 
+   public void access(long accessCount) {
+     this.accessCount = accessCount;
+   }
+ 
+   public int compareTo(CachedMobFile that) {
+     if (this.accessCount == that.accessCount) return 0;
+     return this.accessCount < that.accessCount ? 1 : -1;
+   }
+ 
+   @Override
+   public boolean equals(Object obj) {
+     if (this == obj) {
+       return true;
+     }
+     if (obj == null) {
+       return false;
+     }
+     if (!(obj instanceof CachedMobFile)) {
+       return false;
+     }
+     return compareTo((CachedMobFile) obj) == 0;
+   }
+ 
+   @Override
+   public int hashCode() {
+     return (int)(accessCount ^ (accessCount >>> 32));
+   }
+ 
+   /**
+    * Opens the mob file if it's not opened yet and increases the reference.
+    * It's not thread-safe. Use MobFileCache.openFile() instead.
+    * The reader of the mob file is just opened when it's not opened no matter 
how many times
+    * this open() method is invoked.
+    * The reference is a counter that how many times this reader is 
referenced. When the
+    * reference is 0, this reader is closed.
+    */
+   @Override
+   public void open() throws IOException {
+     super.open();
+     referenceCount.incrementAndGet();
+   }
+ 
+   /**
+    * Decreases the reference of the underlying reader for the mob file.
+    * It's not thread-safe. Use MobFileCache.closeFile() instead.
+    * This underlying reader isn't closed until the reference is 0.
+    */
+   @Override
+   public void close() throws IOException {
+     long refs = referenceCount.decrementAndGet();
+     if (refs == 0) {
+       super.close();
+     }
+   }
+ 
+   /**
+    * Gets the reference of the current mob file.
+    * Internal usage, currently it's for testing.
+    * @return The reference of the current mob file.
+    */
+   public long getReferenceCount() {
+     return this.referenceCount.longValue();
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
----------------------------------------------------------------------
diff --cc 
hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index 0000000,c4d2a45..1670f7b
mode 000000,100644..100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@@ -1,0 -1,302 +1,300 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.mob;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Date;
+ import java.util.List;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hbase.Cell;
+ import org.apache.hadoop.hbase.CellUtil;
+ import org.apache.hadoop.hbase.KeyValue;
+ import org.apache.hadoop.hbase.KeyValueUtil;
+ import org.apache.hadoop.hbase.Tag;
+ import org.apache.hadoop.hbase.TagType;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.client.Scan;
 -import org.apache.hadoop.hbase.regionserver.*;
++import org.apache.hadoop.hbase.regionserver.HMobStore;
++import org.apache.hadoop.hbase.regionserver.HStore;
++import org.apache.hadoop.hbase.regionserver.InternalScanner;
++import org.apache.hadoop.hbase.regionserver.MobCompactionStoreScanner;
++import org.apache.hadoop.hbase.regionserver.ScanType;
++import org.apache.hadoop.hbase.regionserver.ScannerContext;
++import org.apache.hadoop.hbase.regionserver.Store;
++import org.apache.hadoop.hbase.regionserver.StoreFile;
+ import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
++import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+ import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
+ import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
+ import org.apache.hadoop.hbase.util.Bytes;
+ 
+ /**
+  * Compact passed set of files in the mob-enabled column family.
+  */
+ @InterfaceAudience.Private
+ public class DefaultMobStoreCompactor extends DefaultCompactor {
+ 
+   private static final Log LOG = 
LogFactory.getLog(DefaultMobStoreCompactor.class);
+   private long mobSizeThreshold;
+   private HMobStore mobStore;
+   public DefaultMobStoreCompactor(Configuration conf, Store store) {
+     super(conf, store);
+     // The mob cells reside in the mob-enabled column family which is held by 
HMobStore.
+     // During the compaction, the compactor reads the cells from the mob 
files and
+     // probably creates new mob files. All of these operations are included 
in HMobStore,
+     // so we need to cast the Store to HMobStore.
+     if (!(store instanceof HMobStore)) {
+       throw new IllegalArgumentException("The store " + store + " is not a 
HMobStore");
+     }
+     mobStore = (HMobStore) store;
+     mobSizeThreshold = store.getFamily().getMobThreshold();
+   }
+ 
+   /**
+    * Creates a writer for a new file in a temporary directory.
+    * @param fd The file details.
+    * @param smallestReadPoint The smallest mvcc readPoint across all the 
scanners in this region.
+    * @return Writer for a new StoreFile in the tmp dir.
+    * @throws IOException
+    */
+   @Override
+   protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) 
throws IOException {
+     // make this writer with tags always because of possible new cells with 
tags.
+     StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, 
this.compactionCompression,
+       true, true, true);
+     return writer;
+   }
+ 
+   @Override
+   protected InternalScanner createScanner(Store store, List<StoreFileScanner> 
scanners,
+       ScanType scanType, long smallestReadPoint, long earliestPutTs) throws 
IOException {
+     Scan scan = new Scan();
+     scan.setMaxVersions(store.getFamily().getMaxVersions());
+     if (scanType == ScanType.COMPACT_DROP_DELETES) {
+       // In major compaction, we need to write the delete markers to del 
files, so we have to
+       // retain the them in scanning.
+       scanType = ScanType.COMPACT_RETAIN_DELETES;
+       return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, 
scanners,
+           scanType, smallestReadPoint, earliestPutTs, true);
+     } else {
+       return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, 
scanners,
+           scanType, smallestReadPoint, earliestPutTs, false);
+     }
+   }
+ 
+   // TODO refactor to take advantage of the throughput controller.
+ 
+   /**
+    * Performs compaction on a column family with the mob flag enabled.
+    * This is for when the mob threshold size has changed or if the mob
+    * column family mode has been toggled via an alter table statement.
+    * Compacts the files by the following rules.
+    * 1. If the cell has a mob reference tag, the cell's value is the path of 
the mob file.
+    * <ol>
+    * <li>
+    * If the value size of a cell is larger than the threshold, this cell is 
regarded as a mob,
+    * directly copy the (with mob tag) cell into the new store file.
+    * </li>
+    * <li>
+    * Otherwise, retrieve the mob cell from the mob file, and writes a copy of 
the cell into
+    * the new store file.
+    * </li>
+    * </ol>
+    * 2. If the cell doesn't have a reference tag.
+    * <ol>
+    * <li>
+    * If the value size of a cell is larger than the threshold, this cell is 
regarded as a mob,
+    * write this cell to a mob file, and write the path of this mob file to 
the store file.
+    * </li>
+    * <li>
+    * Otherwise, directly write this cell into the store file.
+    * </li>
+    * </ol>
+    * In the mob compaction, the {@link MobCompactionStoreScanner} is used as 
a scanner
+    * which could output the normal cells and delete markers together when 
required.
+    * After the major compaction on the normal hfiles, we have a guarantee 
that we have purged all
+    * deleted or old version mob refs, and the delete markers are written to a 
del file with the
+    * suffix _del. Because of this, it is safe to use the del file in the mob 
compaction.
+    * The mob compaction doesn't take place in the normal hfiles, it occurs 
directly in the
+    * mob files. When the small mob files are merged into bigger ones, the del 
file is added into
+    * the scanner to filter the deleted cells.
+    * @param fd File details
+    * @param scanner Where to read from.
+    * @param writer Where to write to.
+    * @param smallestReadPoint Smallest read point.
+    * @param cleanSeqId When true, remove seqId(used to be mvcc) value which 
is <= smallestReadPoint
+    * @param throughputController The compaction throughput controller.
+    * @param major Is a major compaction.
+    * @return Whether compaction ended; false if it was interrupted for any 
reason.
+    */
+   @Override
+   protected boolean performCompaction(FileDetails fd, InternalScanner 
scanner, CellSink writer,
+       long smallestReadPoint, boolean cleanSeqId,
+       CompactionThroughputController throughputController,  boolean major) 
throws IOException {
+     if (!(scanner instanceof MobCompactionStoreScanner)) {
+       throw new IllegalArgumentException(
 -          "The scanner should be an instance of MobCompactionStoreScanner");
++        "The scanner should be an instance of MobCompactionStoreScanner");
+     }
+     MobCompactionStoreScanner compactionScanner = (MobCompactionStoreScanner) 
scanner;
+     int bytesWritten = 0;
+     // Since scanner.next() can return 'false' but still be delivering data,
+     // we have to use a do/while loop.
+     List<Cell> cells = new ArrayList<Cell>();
+     // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
+     int closeCheckInterval = HStore.getCloseCheckInterval();
+     boolean hasMore;
+     Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), 
store.getColumnFamilyName());
+     byte[] fileName = null;
 -    StoreFile.Writer mobFileWriter = null;
 -    StoreFile.Writer delFileWriter = null;
 -    long mobCells = 0;
 -    long deleteMarkersCount = 0;
 -    Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, 
store.getTableName()
 -            .getName());
 -    long cellsCountCompactedToMob = 0;
 -    long cellsCountCompactedFromMob = 0;
 -    long cellsSizeCompactedToMob = 0;
 -    long cellsSizeCompactedFromMob = 0;
++    StoreFile.Writer mobFileWriter = null, delFileWriter = null;
++    long mobCells = 0, deleteMarkersCount = 0;
++    Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, 
store.getTableName().getName());
++    long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0;
++    long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
+     try {
+       try {
+         // If the mob file writer could not be created, directly write the 
cell to the store file.
+         mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), 
fd.maxKeyCount,
+             store.getFamily().getCompression(), 
store.getRegionInfo().getStartKey());
+         fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
+       } catch (IOException e) {
 -        LOG.error(
 -            "Failed to create mob writer, "
 -                + "we will continue the compaction by writing MOB cells 
directly in store files",
 -            e);
++        LOG.error("Failed to create mob writer, "
++               + "we will continue the compaction by writing MOB cells 
directly in store files", e);
+       }
+       delFileWriter = mobStore.createDelFileWriterInTmp(new 
Date(fd.latestPutTs), fd.maxKeyCount,
+           store.getFamily().getCompression(), 
store.getRegionInfo().getStartKey());
+       ScannerContext scannerContext =
+               
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
+       do {
+         hasMore = compactionScanner.next(cells, scannerContext);
 -        // output to writer:
+         for (Cell c : cells) {
+           if (compactionScanner.isOutputDeleteMarkers() && 
CellUtil.isDelete(c)) {
+             delFileWriter.append(c);
+             deleteMarkersCount++;
+           } else if (mobFileWriter == null || c.getTypeByte() != 
KeyValue.Type.Put.getCode()) {
+             // If the mob file writer is null or the kv type is not put, 
directly write the cell
+             // to the store file.
+             writer.append(c);
+           } else if (MobUtils.isMobReferenceCell(c)) {
+             if (MobUtils.hasValidMobRefCellValue(c)) {
+               int size = MobUtils.getMobValueLength(c);
+               if (size > mobSizeThreshold) {
+                 // If the value size is larger than the threshold, it's 
regarded as a mob. Since
+                 // its value is already in the mob file, directly write this 
cell to the store file
+                 writer.append(c);
+               } else {
+                 // If the value is not larger than the threshold, it's not 
regarded a mob. Retrieve
+                 // the mob cell from the mob file, and write it back to the 
store file.
+                 Cell mobCell = mobStore.resolve(c, false);
+                 if (mobCell.getValueLength() != 0) {
+                   // put the mob data back to the store file
+                   CellUtil.setSequenceId(mobCell, c.getSequenceId());
+                   writer.append(mobCell);
+                   cellsCountCompactedFromMob++;
+                   cellsSizeCompactedFromMob += mobCell.getValueLength();
+                 } else {
+                   // If the value of a file is empty, there might be issues 
when retrieving,
+                   // directly write the cell to the store file, and leave it 
to be handled by the
+                   // next compaction.
+                   writer.append(c);
+                 }
+               }
+             } else {
+               LOG.warn("The value format of the KeyValue " + c
+                   + " is wrong, its length is less than " + Bytes.SIZEOF_INT);
+               writer.append(c);
+             }
+           } else if (c.getValueLength() <= mobSizeThreshold) {
 -            // If the value size of a cell is not larger than the threshold, 
directly write it to
 -            // the store file.
++            //If value size of a cell is not larger than the threshold, 
directly write to store file
+             writer.append(c);
+           } else {
+             // If the value size of a cell is larger than the threshold, it's 
regarded as a mob,
+             // write this cell to a mob file, and write the path to the store 
file.
+             mobCells++;
+             // append the original keyValue in the mob file.
+             mobFileWriter.append(c);
+             KeyValue reference = MobUtils.createMobRefKeyValue(c, fileName, 
tableNameTag);
+             // write the cell whose value is the path of a mob file to the 
store file.
+             writer.append(reference);
+             cellsCountCompactedToMob++;
+             cellsSizeCompactedToMob += c.getValueLength();
+           }
+           ++progress.currentCompactedKVs;
 -
+           // check periodically to see if a system stop is requested
+           if (closeCheckInterval > 0) {
+             bytesWritten += KeyValueUtil.length(c);
+             if (bytesWritten > closeCheckInterval) {
+               bytesWritten = 0;
+               if (!store.areWritesEnabled()) {
+                 progress.cancel();
+                 return false;
+               }
+             }
+           }
+         }
+         cells.clear();
+       } while (hasMore);
+     } finally {
+       if (mobFileWriter != null) {
+         mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
+         mobFileWriter.close();
+       }
+       if (delFileWriter != null) {
+         delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount);
+         delFileWriter.close();
+       }
+     }
+     if (mobFileWriter != null) {
+       if (mobCells > 0) {
+         // If the mob file is not empty, commit it.
+         mobStore.commitFile(mobFileWriter.getPath(), path);
+       } else {
+         try {
+           // If the mob file is empty, delete it instead of committing.
+           store.getFileSystem().delete(mobFileWriter.getPath(), true);
+         } catch (IOException e) {
+           LOG.error("Failed to delete the temp mob file", e);
+         }
+       }
+     }
+     if (delFileWriter != null) {
+       if (deleteMarkersCount > 0) {
+         // If the del file is not empty, commit it.
+         // If the commit fails, the compaction is re-performed again.
+         mobStore.commitFile(delFileWriter.getPath(), path);
+       } else {
+         try {
+           // If the del file is empty, delete it instead of committing.
+           store.getFileSystem().delete(delFileWriter.getPath(), true);
+         } catch (IOException e) {
+           LOG.error("Failed to delete the temp del file", e);
+         }
+       }
+     }
+     mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
+     mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
+     mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
+     mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
+     progress.complete();
+     return true;
+   }
+ }

Reply via email to