HBASE-11339 Merge remote-tracking branch 'apache/hbase-11339' (Jingcheng Du)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/493f36c8 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/493f36c8 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/493f36c8 Branch: refs/heads/master Commit: 493f36c899bc990de0400fbf777a2bf29c5c60e3 Parents: 2073954 7ddae39 Author: Jonathan M Hsieh <[email protected]> Authored: Wed Jul 22 12:49:42 2015 -0700 Committer: Jonathan M Hsieh <[email protected]> Committed: Wed Jul 22 12:51:11 2015 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hbase/HColumnDescriptor.java | 58 +- .../org/apache/hadoop/hbase/client/Admin.java | 52 + .../apache/hadoop/hbase/client/HBaseAdmin.java | 102 ++ .../hbase/ipc/AsyncServerResponseHandler.java | 3 +- .../apache/hadoop/hbase/ipc/RpcClientImpl.java | 2 +- .../hadoop/hbase/protobuf/ProtobufUtil.java | 38 +- .../hadoop/hbase/protobuf/ProtobufUtil.java.rej | 16 + .../hadoop/hbase/TestHColumnDescriptor.java | 18 + .../java/org/apache/hadoop/hbase/TagType.java | 5 + .../apache/hadoop/hbase/util/PrettyPrinter.java | 10 + .../src/main/resources/hbase-default.xml | 117 ++ .../regionserver/MetricsRegionServerSource.java | 32 + .../MetricsRegionServerWrapper.java | 70 ++ .../MetricsRegionServerSourceImpl.java | 32 + .../hbase/IntegrationTestAcidGuarantees.java | 3 +- .../hbase/IntegrationTestIngestWithMOB.java | 156 +++ .../hbase/chaos/actions/CompactMobAction.java | 65 ++ .../chaos/factories/MobNoKillMonkeyFactory.java | 67 ++ .../MobSlowDeterministicMonkeyFactory.java | 156 +++ .../hbase/chaos/factories/MonkeyFactory.java | 4 + .../org/apache/hadoop/hbase/io/HFileLink.java | 23 +- .../hbase/io/hfile/HFilePrettyPrinter.java | 100 +- .../master/ExpiredMobFileCleanerChore.java | 107 ++ .../org/apache/hadoop/hbase/master/HMaster.java | 101 ++ .../hadoop/hbase/master/MasterFileSystem.java | 22 +- .../hbase/master/MasterMobCompactionThread.java | 184 ++++ .../hadoop/hbase/master/MasterRpcServices.java | 104 ++ .../hadoop/hbase/master/MobCompactionChore.java | 93 ++ .../hbase/master/cleaner/HFileLinkCleaner.java | 6 + .../procedure/AddColumnFamilyProcedure.java | 4 +- .../procedure/DeleteColumnFamilyProcedure.java | 6 +- .../master/procedure/DeleteTableProcedure.java | 43 +- .../procedure/MasterDDLOperationHelper.java | 5 +- .../master/procedure/ModifyTableProcedure.java | 3 +- .../snapshot/DisabledTableSnapshotHandler.java | 8 + .../master/snapshot/MasterSnapshotVerifier.java | 12 +- .../apache/hadoop/hbase/mob/CachedMobFile.java | 114 ++ .../hbase/mob/DefaultMobStoreCompactor.java | 300 +++++ .../hbase/mob/DefaultMobStoreFlusher.java | 226 ++++ .../hadoop/hbase/mob/ExpiredMobFileCleaner.java | 120 ++ .../apache/hadoop/hbase/mob/MobCacheConfig.java | 63 ++ .../apache/hadoop/hbase/mob/MobConstants.java | 121 +++ .../org/apache/hadoop/hbase/mob/MobFile.java | 152 +++ .../apache/hadoop/hbase/mob/MobFileCache.java | 325 ++++++ .../apache/hadoop/hbase/mob/MobFileName.java | 169 +++ .../apache/hadoop/hbase/mob/MobStoreEngine.java | 48 + .../org/apache/hadoop/hbase/mob/MobUtils.java | 898 +++++++++++++++ .../mob/compactions/MobCompactionRequest.java | 64 ++ .../hbase/mob/compactions/MobCompactor.java | 90 ++ .../PartitionedMobCompactionRequest.java | 146 +++ .../compactions/PartitionedMobCompactor.java | 679 ++++++++++++ .../hbase/mob/mapreduce/MemStoreWrapper.java | 185 ++++ .../mapreduce/MobFilePathHashPartitioner.java | 41 + .../hadoop/hbase/mob/mapreduce/SweepJob.java | 603 ++++++++++ .../mob/mapreduce/SweepJobNodeTracker.java | 100 ++ .../hadoop/hbase/mob/mapreduce/SweepMapper.java | 87 ++ .../hbase/mob/mapreduce/SweepReducer.java | 472 ++++++++ .../hadoop/hbase/mob/mapreduce/Sweeper.java | 119 ++ .../hbase/regionserver/DefaultStoreEngine.java | 20 +- .../hbase/regionserver/DefaultStoreFlusher.java | 2 +- .../hadoop/hbase/regionserver/HMobStore.java | 585 ++++++++++ .../hadoop/hbase/regionserver/HRegion.java | 30 +- .../hbase/regionserver/HRegionServer.java | 6 + .../hadoop/hbase/regionserver/HStore.java | 58 +- .../MetricsRegionServerWrapperImpl.java | 145 ++- .../regionserver/MobCompactionStoreScanner.java | 66 ++ .../regionserver/MobReferenceOnlyFilter.java | 42 + .../hbase/regionserver/MobStoreScanner.java | 85 ++ .../regionserver/ReversedMobStoreScanner.java | 85 ++ .../hadoop/hbase/regionserver/StoreFile.java | 55 +- .../hbase/regionserver/StoreFileInfo.java | 30 +- .../hbase/regionserver/StoreFileScanner.java | 2 +- .../hadoop/hbase/regionserver/StoreScanner.java | 6 +- .../compactions/CompactionRequest.java | 19 + .../regionserver/compactions/Compactor.java | 34 +- .../compactions/DefaultCompactor.java | 31 +- .../compactions/StripeCompactor.java | 3 +- .../hadoop/hbase/snapshot/ExportSnapshot.java | 23 +- .../hbase/snapshot/RestoreSnapshotHelper.java | 90 +- .../hadoop/hbase/snapshot/SnapshotInfo.java | 49 +- .../hadoop/hbase/snapshot/SnapshotManifest.java | 171 ++- .../hbase/snapshot/SnapshotReferenceUtil.java | 12 +- .../hadoop/hbase/util/HFileArchiveUtil.java | 4 +- .../hbase/util/hbck/HFileCorruptionChecker.java | 197 ++++ .../apache/hadoop/hbase/TestAcidGuarantees.java | 66 +- .../client/TestMobCloneSnapshotFromClient.java | 252 +++++ .../TestMobRestoreSnapshotFromClient.java | 306 ++++++ .../TestMobSnapshotCloneIndependence.java | 435 ++++++++ .../hbase/client/TestMobSnapshotFromClient.java | 305 ++++++ .../apache/hadoop/hbase/mob/MobTestUtil.java | 87 ++ .../hadoop/hbase/mob/TestCachedMobFile.java | 154 +++ .../hbase/mob/TestDefaultMobStoreFlusher.java | 190 ++++ .../hbase/mob/TestExpiredMobFileCleaner.java | 175 +++ .../hbase/mob/TestMobDataBlockEncoding.java | 135 +++ .../apache/hadoop/hbase/mob/TestMobFile.java | 124 +++ .../hadoop/hbase/mob/TestMobFileCache.java | 202 ++++ .../hadoop/hbase/mob/TestMobFileLink.java | 66 ++ .../hadoop/hbase/mob/TestMobFileName.java | 79 ++ .../hbase/mob/compactions/TestMobCompactor.java | 1026 ++++++++++++++++++ .../TestPartitionedMobCompactionRequest.java | 60 + .../TestPartitionedMobCompactor.java | 440 ++++++++ .../hbase/mob/mapreduce/TestMobSweepJob.java | 169 +++ .../hbase/mob/mapreduce/TestMobSweepMapper.java | 120 ++ .../mob/mapreduce/TestMobSweepReducer.java | 219 ++++ .../hbase/mob/mapreduce/TestMobSweeper.java | 368 +++++++ .../MetricsRegionServerWrapperStub.java | 70 ++ .../hbase/regionserver/TestDeleteMobTable.java | 270 +++++ .../hbase/regionserver/TestHMobStore.java | 557 ++++++++++ .../regionserver/TestMobStoreCompaction.java | 460 ++++++++ .../hbase/regionserver/TestMobStoreScanner.java | 512 +++++++++ .../regionserver/TestRegionServerMetrics.java | 64 ++ .../hbase/regionserver/TestStoreFileInfo.java | 6 +- .../hbase/snapshot/MobSnapshotTestingUtils.java | 347 ++++++ .../hbase/snapshot/SnapshotTestingUtils.java | 9 +- .../hbase/snapshot/TestMobExportSnapshot.java | 439 ++++++++ .../TestMobFlushSnapshotFromClient.java | 547 ++++++++++ .../TestMobRestoreFlushSnapshotFromClient.java | 211 ++++ .../snapshot/TestMobRestoreSnapshotHelper.java | 160 +++ .../snapshot/TestMobSecureExportSnapshot.java | 53 + .../util/LoadTestDataGeneratorWithMOB.java | 73 ++ .../apache/hadoop/hbase/util/LoadTestTool.java | 17 + .../apache/hadoop/hbase/util/TestHBaseFsck.java | 109 ++ hbase-shell/src/main/ruby/hbase/admin.rb | 24 + hbase-shell/src/main/ruby/shell.rb | 2 + .../src/main/ruby/shell/commands/compact_mob.rb | 42 + .../ruby/shell/commands/major_compact_mob.rb | 42 + src/main/asciidoc/_chapters/hbase_mob.adoc | 236 ++++ src/main/asciidoc/book.adoc | 1 + 128 files changed, 17864 insertions(+), 164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --cc hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 36f6fb5,d1c44b4..0de2762 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@@ -165,8 -169,8 +168,8 @@@ import com.google.protobuf.TextFormat /** * Protobufs utility. */ [email protected](value="DP_CREATE_CLASSLOADER_INSIDE_DO_PRIVILEGED", -- justification="None. Address sometime.") [email protected]( ++ value="DP_CREATE_CLASSLOADER_INSIDE_DO_PRIVILEGED", justification="None. Address sometime.") @InterfaceAudience.Private // TODO: some clients (Hive, etc) use this class public final class ProtobufUtil { @@@ -1585,7 -1623,7 +1588,7 @@@ throws IOException { CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder() .setCall(call).setRegion( -- RequestConverter.buildRegionSpecifier(REGION_NAME, HConstants.EMPTY_BYTE_ARRAY)).build(); ++ RequestConverter.buildRegionSpecifier(REGION_NAME, HConstants.EMPTY_BYTE_ARRAY)).build(); try { CoprocessorServiceResponse response = client.execMasterService(null, request); @@@ -1887,7 -1925,7 +1890,8 @@@ if (proto.getType() != AccessControlProtos.Permission.Type.Global) { return toTablePermission(proto); } else { -- List<Permission.Action> actions = toPermissionActions(proto.getGlobalPermission().getActionList()); ++ List<Permission.Action> actions = toPermissionActions( ++ proto.getGlobalPermission().getActionList()); return new Permission(actions.toArray(new Permission.Action[actions.size()])); } } @@@ -1954,7 -1992,7 +1958,7 @@@ AccessControlProtos.NamespacePermission.Builder builder = AccessControlProtos.NamespacePermission.newBuilder(); builder.setNamespaceName(ByteString.copyFromUtf8(tablePerm.getNamespace())); -- Permission.Action actions[] = perm.getActions(); ++ Permission.Action[] actions = perm.getActions(); if (actions != null) { for (Permission.Action a : actions) { builder.addAction(toPermissionAction(a)); @@@ -2997,6 -3027,29 +3001,30 @@@ return desc.build(); } + /** + * This version of protobuf's mergeDelimitedFrom avoid the hard-coded 64MB limit for decoding + * buffers + * @param builder current message builder + * @param in Inputsream with delimited protobuf data + * @throws IOException + */ - public static void mergeDelimitedFrom(Message.Builder builder, InputStream in) throws IOException { ++ public static void mergeDelimitedFrom(Message.Builder builder, InputStream in) ++ throws IOException { + // This used to be builder.mergeDelimitedFrom(in); + // but is replaced to allow us to bump the protobuf size limit. + final int firstByte = in.read(); + if (firstByte == -1) { + // bail out. (was return false;) + } else { + final int size = CodedInputStream.readRawVarint32(firstByte, in); + final InputStream limitedInput = new LimitInputStream(in, size); + final CodedInputStream codedInput = CodedInputStream.newInstance(limitedInput); + codedInput.setSizeLimit(size); + builder.mergeFrom(codedInput); + codedInput.checkLastTagWas(0); + } + } + public static ReplicationLoadSink toReplicationLoadSink( ClusterStatusProtos.ReplicationLoadSink cls) { return new ReplicationLoadSink(cls.getAgeOfLastAppliedOp(), cls.getTimeStampsOfLastAppliedOp()); http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java.rej ---------------------------------------------------------------------- diff --cc hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java.rej index 0000000,0000000..7cd2dad new file mode 100644 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java.rej @@@ -1,0 -1,0 +1,16 @@@ ++--- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ++@@ -124,12 +126,12 @@ ++ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; ++ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; ++ import org.apache.hadoop.hbase.protobuf.generated.WALProtos; +++import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; ++ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; ++ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; ++ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; ++ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; ++ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; ++-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; ++ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; ++ import org.apache.hadoop.hbase.quotas.QuotaScope; ++ import org.apache.hadoop.hbase.quotas.QuotaType; http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-common/src/main/resources/hbase-default.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java ---------------------------------------------------------------------- diff --cc hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index fab2a49,26c55bb..2ae8f4d --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@@ -259,6 -259,36 +259,38 @@@ public class MetricsRegionServerSourceI .addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE, MAJOR_COMPACTED_CELLS_SIZE_DESC), rsWrap.getMajorCompactedCellsSize()) - .addCounter(Interns.info(CELLS_COUNT_COMPACTED_FROM_MOB, CELLS_COUNT_COMPACTED_FROM_MOB_DESC), ++ .addCounter( ++ Interns.info(CELLS_COUNT_COMPACTED_FROM_MOB, CELLS_COUNT_COMPACTED_FROM_MOB_DESC), + rsWrap.getCellsCountCompactedFromMob()) + .addCounter(Interns.info(CELLS_COUNT_COMPACTED_TO_MOB, CELLS_COUNT_COMPACTED_TO_MOB_DESC), + rsWrap.getCellsCountCompactedToMob()) - .addCounter(Interns.info(CELLS_SIZE_COMPACTED_FROM_MOB, CELLS_SIZE_COMPACTED_FROM_MOB_DESC), ++ .addCounter( ++ Interns.info(CELLS_SIZE_COMPACTED_FROM_MOB, CELLS_SIZE_COMPACTED_FROM_MOB_DESC), + rsWrap.getCellsSizeCompactedFromMob()) + .addCounter(Interns.info(CELLS_SIZE_COMPACTED_TO_MOB, CELLS_SIZE_COMPACTED_TO_MOB_DESC), + rsWrap.getCellsSizeCompactedToMob()) + .addCounter(Interns.info(MOB_FLUSH_COUNT, MOB_FLUSH_COUNT_DESC), + rsWrap.getMobFlushCount()) + .addCounter(Interns.info(MOB_FLUSHED_CELLS_COUNT, MOB_FLUSHED_CELLS_COUNT_DESC), + rsWrap.getMobFlushedCellsCount()) + .addCounter(Interns.info(MOB_FLUSHED_CELLS_SIZE, MOB_FLUSHED_CELLS_SIZE_DESC), + rsWrap.getMobFlushedCellsSize()) + .addCounter(Interns.info(MOB_SCAN_CELLS_COUNT, MOB_SCAN_CELLS_COUNT_DESC), + rsWrap.getMobScanCellsCount()) + .addCounter(Interns.info(MOB_SCAN_CELLS_SIZE, MOB_SCAN_CELLS_SIZE_DESC), + rsWrap.getMobScanCellsSize()) + .addGauge(Interns.info(MOB_FILE_CACHE_COUNT, MOB_FILE_CACHE_COUNT_DESC), + rsWrap.getMobFileCacheCount()) + .addCounter(Interns.info(MOB_FILE_CACHE_ACCESS_COUNT, MOB_FILE_CACHE_ACCESS_COUNT_DESC), + rsWrap.getMobFileCacheAccessCount()) + .addCounter(Interns.info(MOB_FILE_CACHE_MISS_COUNT, MOB_FILE_CACHE_MISS_COUNT_DESC), + rsWrap.getMobFileCacheMissCount()) + .addCounter( + Interns.info(MOB_FILE_CACHE_EVICTED_COUNT, MOB_FILE_CACHE_EVICTED_COUNT_DESC), + rsWrap.getMobFileCacheEvictedCount()) + .addGauge(Interns.info(MOB_FILE_CACHE_HIT_PERCENT, MOB_FILE_CACHE_HIT_PERCENT_DESC), + rsWrap.getMobFileCacheHitPercent()) + .addCounter(Interns.info(HEDGED_READS, HEDGED_READS_DESC), rsWrap.getHedgedReadOps()) .addCounter(Interns.info(HEDGED_READ_WINS, HEDGED_READ_WINS_DESC), rsWrap.getHedgedReadWins()) http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MobNoKillMonkeyFactory.java ---------------------------------------------------------------------- diff --cc hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MobNoKillMonkeyFactory.java index 0000000,f832a2e..ba7f67f mode 000000,100644..100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MobNoKillMonkeyFactory.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MobNoKillMonkeyFactory.java @@@ -1,0 -1,70 +1,67 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.chaos.factories; + + import org.apache.hadoop.hbase.chaos.actions.*; + import org.apache.hadoop.hbase.chaos.monkies.ChaosMonkey; + import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; + import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy; + import org.apache.hadoop.hbase.chaos.policies.TwoConcurrentActionPolicy; + + /** + * This is a copy of NoKillMonkeyFactory that also does mob compactions. + */ + public class MobNoKillMonkeyFactory extends MonkeyFactory { + @Override public ChaosMonkey build() { + Action[] actions1 = new Action[] { - new CompactMobAction(tableName, MonkeyConstants.DEFAULT_PERIODIC_ACTION1_PERIOD), - new CompactTableAction(tableName, MonkeyConstants.DEFAULT_PERIODIC_ACTION1_PERIOD), - new CompactRandomRegionOfTableAction(tableName, - MonkeyConstants.DEFAULT_COMPACT_RANDOM_REGION_RATIO), - new FlushTableAction(tableName), - new FlushRandomRegionOfTableAction(tableName), - new MoveRandomRegionOfTableAction(tableName) ++ new CompactMobAction(tableName, MonkeyConstants.DEFAULT_PERIODIC_ACTION1_PERIOD), ++ new CompactTableAction(tableName, MonkeyConstants.DEFAULT_PERIODIC_ACTION1_PERIOD), ++ new CompactRandomRegionOfTableAction(tableName, ++ MonkeyConstants.DEFAULT_COMPACT_RANDOM_REGION_RATIO), ++ new FlushTableAction(tableName), ++ new FlushRandomRegionOfTableAction(tableName), ++ new MoveRandomRegionOfTableAction(tableName) + }; + + Action[] actions2 = new Action[] { - new SplitRandomRegionOfTableAction(tableName), - new MergeRandomAdjacentRegionsOfTableAction(tableName), - new SnapshotTableAction(tableName), - new AddColumnAction(tableName), - new RemoveColumnAction(tableName, columnFamilies), - new ChangeEncodingAction(tableName), - new ChangeCompressionAction(tableName), - new ChangeBloomFilterAction(tableName), - new ChangeVersionsAction(tableName) ++ new SplitRandomRegionOfTableAction(tableName), ++ new MergeRandomAdjacentRegionsOfTableAction(tableName), ++ new SnapshotTableAction(tableName), ++ new AddColumnAction(tableName), ++ new RemoveColumnAction(tableName, columnFamilies), ++ new ChangeEncodingAction(tableName), ++ new ChangeCompressionAction(tableName), ++ new ChangeBloomFilterAction(tableName), ++ new ChangeVersionsAction(tableName) + }; + + Action[] actions3 = new Action[] { - new MoveRegionsOfTableAction(MonkeyConstants.DEFAULT_MOVE_REGIONS_SLEEP_TIME, - MonkeyConstants.DEFAULT_MOVE_REGIONS_MAX_TIME, - tableName), - new MoveRandomRegionOfTableAction(MonkeyConstants.DEFAULT_RESTART_ACTIVE_MASTER_SLEEP_TIME, - tableName), - }; ++ new MoveRegionsOfTableAction(MonkeyConstants.DEFAULT_MOVE_REGIONS_SLEEP_TIME, ++ MonkeyConstants.DEFAULT_MOVE_REGIONS_MAX_TIME, tableName), ++ new MoveRandomRegionOfTableAction(MonkeyConstants.DEFAULT_RESTART_ACTIVE_MASTER_SLEEP_TIME, ++ tableName), }; + - Action[] actions4 = new Action[] { - new DumpClusterStatusAction() - }; ++ Action[] actions4 = new Action[] { new DumpClusterStatusAction() }; + + return new PolicyBasedChaosMonkey(util, - new TwoConcurrentActionPolicy(MonkeyConstants.DEFAULT_PERIODIC_ACTION1_PERIOD, actions1, actions2), - new PeriodicRandomActionPolicy(MonkeyConstants.DEFAULT_PERIODIC_ACTION2_PERIOD,actions3), - new PeriodicRandomActionPolicy(MonkeyConstants.DEFAULT_PERIODIC_ACTION4_PERIOD,actions4)); ++ new TwoConcurrentActionPolicy(MonkeyConstants.DEFAULT_PERIODIC_ACTION1_PERIOD, actions1, ++ actions2), ++ new PeriodicRandomActionPolicy(MonkeyConstants.DEFAULT_PERIODIC_ACTION2_PERIOD,actions3), ++ new PeriodicRandomActionPolicy(MonkeyConstants.DEFAULT_PERIODIC_ACTION4_PERIOD,actions4)); + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java ---------------------------------------------------------------------- diff --cc hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java index 2f65251,f3d535b..069e8af --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java @@@ -69,7 -69,8 +69,9 @@@ public abstract class MonkeyFactory public static final String SERVER_KILLING = "serverKilling"; public static final String STRESS_AM = "stressAM"; public static final String NO_KILL = "noKill"; + public static final String MASTER_KILLING = "masterKilling"; + public static final String MOB_NO_KILL = "mobNoKill"; + public static final String MOB_SLOW_DETERMINISTIC = "mobSlowDeterministic"; public static Map<String, MonkeyFactory> FACTORIES = ImmutableMap.<String,MonkeyFactory>builder() .put(CALM, new CalmMonkeyFactory()) @@@ -78,7 -79,8 +80,9 @@@ .put(SERVER_KILLING, new ServerKillingMonkeyFactory()) .put(STRESS_AM, new StressAssignmentManagerMonkeyFactory()) .put(NO_KILL, new NoKillMonkeyFactory()) + .put(MASTER_KILLING, new MasterKillingMonkeyFactory()) + .put(MOB_NO_KILL, new MobNoKillMonkeyFactory()) + .put(MOB_SLOW_DETERMINISTIC, new MobNoKillMonkeyFactory()) .build(); public static MonkeyFactory getFactory(String factoryName) { http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java index 025053e,dc12762..2818d88 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java @@@ -313,8 -332,11 +332,11 @@@ public class HFilePrettyPrinter extend private void scanKeysValues(Path file, KeyValueStatsCollector fileStats, HFileScanner scanner, byte[] row) throws IOException { Cell pCell = null; + FileSystem fs = FileSystem.get(getConf()); + Set<String> foundMobFiles = new LinkedHashSet<String>(FOUND_MOB_FILES_CACHE_CAPACITY); + Set<String> missingMobFiles = new LinkedHashSet<String>(MISSING_MOB_FILES_CACHE_CAPACITY); do { - Cell cell = scanner.getKeyValue(); + Cell cell = scanner.getCell(); if (row != null && row.length != 0) { int result = CellComparator.COMPARATOR.compareRows(cell, row, 0, row.length); if (result > 0) { http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java index 0000000,7ca3362..8c9bc2b mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java @@@ -1,0 -1,107 +1,107 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.hadoop.hbase.master; + + import java.io.IOException; + import java.util.Map; + import java.util.concurrent.TimeUnit; + + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.hbase.HColumnDescriptor; + import org.apache.hadoop.hbase.HTableDescriptor; + import org.apache.hadoop.hbase.ScheduledChore; + import org.apache.hadoop.hbase.TableDescriptors; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.exceptions.LockTimeoutException; + import org.apache.hadoop.hbase.master.TableLockManager.TableLock; + import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner; + import org.apache.hadoop.hbase.mob.MobConstants; + import org.apache.hadoop.hbase.mob.MobUtils; + + /** + * The Class ExpiredMobFileCleanerChore for running cleaner regularly to remove the expired + * mob files. + */ + @InterfaceAudience.Private + public class ExpiredMobFileCleanerChore extends ScheduledChore { + + private static final Log LOG = LogFactory.getLog(ExpiredMobFileCleanerChore.class); + private final HMaster master; + private TableLockManager tableLockManager; + private ExpiredMobFileCleaner cleaner; + + public ExpiredMobFileCleanerChore(HMaster master) { + super(master.getServerName() + "-ExpiredMobFileCleanerChore", master, master.getConfiguration() + .getInt(MobConstants.MOB_CLEANER_PERIOD, MobConstants.DEFAULT_MOB_CLEANER_PERIOD), master + .getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD, + MobConstants.DEFAULT_MOB_CLEANER_PERIOD), TimeUnit.SECONDS); + this.master = master; + this.tableLockManager = master.getTableLockManager(); + cleaner = new ExpiredMobFileCleaner(); + cleaner.setConf(master.getConfiguration()); + } + + @Override + protected void chore() { + try { + TableDescriptors htds = master.getTableDescriptors(); + Map<String, HTableDescriptor> map = htds.getAll(); + for (HTableDescriptor htd : map.values()) { + for (HColumnDescriptor hcd : htd.getColumnFamilies()) { + if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) { + // clean only for mob-enabled column. + // obtain a read table lock before cleaning, synchronize with MobFileCompactionChore. + boolean tableLocked = false; + TableLock lock = null; + try { + // the tableLockManager might be null in testing. In that case, it is lock-free. + if (tableLockManager != null) { + lock = tableLockManager.readLock(MobUtils.getTableLockName(htd.getTableName()), + "Run ExpiredMobFileCleanerChore"); + lock.acquire(); + } + tableLocked = true; + cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd); + } catch (LockTimeoutException e) { + LOG.info("Fail to acquire the lock because of timeout, maybe a" + + " MobFileCompactor is running", e); + } catch (Exception e) { + LOG.error( + "Fail to clean the expired mob files for the column " + hcd.getNameAsString() + + " in the table " + htd.getNameAsString(), e); + } finally { + if (lock != null && tableLocked) { + try { + lock.release(); + } catch (IOException e) { + LOG.error( + "Fail to release the read lock for the table " + htd.getNameAsString(), e); + } + } + } + } + } + } + } catch (Exception e) { + LOG.error("Fail to clean the expired mob files", e); + } + } + + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index bc52edb,6cc229a..f7d839b --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@@ -101,10 -102,8 +102,11 @@@ import org.apache.hadoop.hbase.master.p import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure; +import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; +import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore; +import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; + import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@@ -789,11 -793,20 +801,26 @@@ public class HMaster extends HRegionSer // master initialization. See HBASE-5916. this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer(); + // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration + status.setStatus("Checking ZNode ACLs"); + zooKeeper.checkAndSetZNodeAcls(); + + status.setStatus("Calling postStartMaster coprocessors"); ++ + this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this); + getChoreService().scheduleChore(expiredMobFileCleanerChore); + + int mobCompactionPeriod = conf.getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD, + MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD); + if (mobCompactionPeriod > 0) { + this.mobCompactChore = new MobCompactionChore(this, mobCompactionPeriod); + getChoreService().scheduleChore(mobCompactChore); + } else { + LOG + .info("The period is " + mobCompactionPeriod + " seconds, MobCompactionChore is disabled"); + } + this.mobCompactThread = new MasterMobCompactionThread(this); + if (this.cpHost != null) { // don't let cp initialization errors kill the master try { http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 3718a5a,5088901..9b95ae7 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@@ -615,6 -591,22 +617,24 @@@ public class MasterFileSystem + ")"); } } ++ + // archive and delete mob files + if (hasMob) { + Path mobTableDir = + FSUtils.getTableDir(new Path(getRootDir(), MobConstants.MOB_DIR_NAME), region.getTable()); + HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(region.getTable()); + Path mobFamilyDir = + new Path(mobTableDir, + new Path(mobRegionInfo.getEncodedName(), Bytes.toString(familyName))); + // archive mob family store files + MobUtils.archiveMobStoreFiles(conf, fs, mobRegionInfo, mobFamilyDir, familyName); ++ + if (!fs.delete(mobFamilyDir, true)) { + throw new IOException("Could not delete mob store files for family " + + Bytes.toString(familyName) + " from FileSystem region " + + mobRegionInfo.getRegionNameAsString() + "(" + mobRegionInfo.getEncodedName() + ")"); + } + } } public void stop() { http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java index 0000000,d1f58ba..f8a5c15 mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java @@@ -1,0 -1,184 +1,184 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.master; + + import java.io.IOException; + import java.util.List; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.RejectedExecutionException; + import java.util.concurrent.SynchronousQueue; + import java.util.concurrent.ThreadFactory; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.hbase.HColumnDescriptor; + import org.apache.hadoop.hbase.TableName; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.mob.MobUtils; + import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + + /** + * The mob compaction thread used in {@link MasterRpcServices} + */ + @InterfaceAudience.Private + public class MasterMobCompactionThread { + static final Log LOG = LogFactory.getLog(MasterMobCompactionThread.class); + private final HMaster master; + private final Configuration conf; + private final ExecutorService mobCompactorPool; + private final ExecutorService masterMobPool; + + public MasterMobCompactionThread(HMaster master) { + this.master = master; + this.conf = master.getConfiguration(); + final String n = Thread.currentThread().getName(); + // this pool is used to run the mob compaction + this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime()); + return t; + } + }); + ((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true); + // this pool is used in the mob compaction to compact the mob files by partitions + // in parallel + this.mobCompactorPool = MobUtils + .createMobCompactorThreadPool(master.getConfiguration()); + } + + /** + * Requests mob compaction + * @param conf The Configuration + * @param fs The file system + * @param tableName The table the compact + * @param columns The column descriptors + * @param tableLockManager The tableLock manager + * @param allFiles Whether add all mob files into the compaction. + */ + public void requestMobCompaction(Configuration conf, FileSystem fs, TableName tableName, + List<HColumnDescriptor> columns, TableLockManager tableLockManager, boolean allFiles) + throws IOException { + master.reportMobCompactionStart(tableName); + try { + masterMobPool.execute(new CompactionRunner(fs, tableName, columns, tableLockManager, + allFiles, mobCompactorPool)); + } catch (RejectedExecutionException e) { + // in case the request is rejected by the pool + try { + master.reportMobCompactionEnd(tableName); + } catch (IOException e1) { + LOG.error("Failed to mark end of mob compation", e1); + } + throw e; + } + if (LOG.isDebugEnabled()) { + LOG.debug("The mob compaction is requested for the columns " + columns + + " of the table " + tableName.getNameAsString()); + } + } + + private class CompactionRunner implements Runnable { + private FileSystem fs; + private TableName tableName; + private List<HColumnDescriptor> hcds; + private TableLockManager tableLockManager; + private boolean allFiles; + private ExecutorService pool; + + public CompactionRunner(FileSystem fs, TableName tableName, List<HColumnDescriptor> hcds, + TableLockManager tableLockManager, boolean allFiles, ExecutorService pool) { + super(); + this.fs = fs; + this.tableName = tableName; + this.hcds = hcds; + this.tableLockManager = tableLockManager; + this.allFiles = allFiles; + this.pool = pool; + } + + @Override + public void run() { + try { + for (HColumnDescriptor hcd : hcds) { + MobUtils.doMobCompaction(conf, fs, tableName, hcd, pool, tableLockManager, + allFiles); + } + } catch (IOException e) { + LOG.error("Failed to perform the mob compaction", e); + } finally { + try { + master.reportMobCompactionEnd(tableName); + } catch (IOException e) { + LOG.error("Failed to mark end of mob compation", e); + } + } + } + } + + /** + * Only interrupt once it's done with a run through the work loop. + */ + private void interruptIfNecessary() { + mobCompactorPool.shutdown(); + masterMobPool.shutdown(); + } + + /** + * Wait for all the threads finish. + */ + private void join() { + waitFor(mobCompactorPool, "Mob Compaction Thread"); + waitFor(masterMobPool, "Region Server Mob Compaction Thread"); + } + + /** + * Closes the MasterMobCompactionThread. + */ + public void close() { + interruptIfNecessary(); + join(); + } + + /** + * Wait for thread finish. + * @param t the thread to wait + * @param name the thread name. + */ + private void waitFor(ExecutorService t, String name) { + boolean done = false; + while (!done) { + try { + done = t.awaitTermination(60, TimeUnit.SECONDS); + LOG.info("Waiting for " + name + " to finish..."); + if (!done) { + t.shutdownNow(); + } + } catch (InterruptedException ie) { + LOG.warn("Interrupted waiting for " + name + " to finish..."); + } + } + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index c828880,7d025db..302d215 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@@ -1394,6 -1365,109 +1400,104 @@@ public class MasterRpcServices extends return response.build(); } + /** + * Compact a region on the master. + * + * @param controller the RPC controller + * @param request the request + * @throws ServiceException + */ + @Override + @QosPriority(priority=HConstants.ADMIN_QOS) + public CompactRegionResponse compactRegion(final RpcController controller, + final CompactRegionRequest request) throws ServiceException { + try { + master.checkInitialized(); + byte[] regionName = request.getRegion().getValue().toByteArray(); + TableName tableName = HRegionInfo.getTable(regionName); + // if the region is a mob region, do the mob file compaction. + if (MobUtils.isMobRegionName(tableName, regionName)) { + return compactMob(request, tableName); + } else { + return super.compactRegion(controller, request); + } + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + @Override + @QosPriority(priority=HConstants.ADMIN_QOS) + public GetRegionInfoResponse getRegionInfo(final RpcController controller, + final GetRegionInfoRequest request) throws ServiceException { - try { - master.checkInitialized(); - byte[] regionName = request.getRegion().getValue().toByteArray(); - TableName tableName = HRegionInfo.getTable(regionName); - if (MobUtils.isMobRegionName(tableName, regionName)) { - // a dummy region info contains the compaction state. - HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(tableName); - GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); - builder.setRegionInfo(HRegionInfo.convert(mobRegionInfo)); - if (request.hasCompactionState() && request.getCompactionState()) { - builder.setCompactionState(master.getMobCompactionState(tableName)); - } - return builder.build(); - } else { - return super.getRegionInfo(controller, request); ++ byte[] regionName = request.getRegion().getValue().toByteArray(); ++ TableName tableName = HRegionInfo.getTable(regionName); ++ if (MobUtils.isMobRegionName(tableName, regionName)) { ++ // a dummy region info contains the compaction state. ++ HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(tableName); ++ GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); ++ builder.setRegionInfo(HRegionInfo.convert(mobRegionInfo)); ++ if (request.hasCompactionState() && request.getCompactionState()) { ++ builder.setCompactionState(master.getMobCompactionState(tableName)); + } - } catch (IOException ie) { - throw new ServiceException(ie); ++ return builder.build(); ++ } else { ++ return super.getRegionInfo(controller, request); + } + } + + /** + * Compacts the mob files in the current table. + * @param request the request. + * @param tableName the current table name. + * @return The response of the mob file compaction. + * @throws IOException + */ + private CompactRegionResponse compactMob(final CompactRegionRequest request, + TableName tableName) throws IOException { + if (!master.getTableStateManager().isTableState(tableName, TableState.State.ENABLED)) { + throw new DoNotRetryIOException("Table " + tableName + " is not enabled"); + } + boolean allFiles = false; + List<HColumnDescriptor> compactedColumns = new ArrayList<HColumnDescriptor>(); + HColumnDescriptor[] hcds = master.getTableDescriptors().get(tableName).getColumnFamilies(); + byte[] family = null; + if (request.hasFamily()) { + family = request.getFamily().toByteArray(); + for (HColumnDescriptor hcd : hcds) { + if (Bytes.equals(family, hcd.getName())) { + if (!hcd.isMobEnabled()) { + LOG.error("Column family " + hcd.getNameAsString() + " is not a mob column family"); + throw new DoNotRetryIOException("Column family " + hcd.getNameAsString() + + " is not a mob column family"); + } + compactedColumns.add(hcd); + } + } + } else { + for (HColumnDescriptor hcd : hcds) { + if (hcd.isMobEnabled()) { + compactedColumns.add(hcd); + } + } + } + if (compactedColumns.isEmpty()) { + LOG.error("No mob column families are assigned in the mob compaction"); + throw new DoNotRetryIOException( + "No mob column families are assigned in the mob compaction"); + } + if (request.hasMajor() && request.getMajor()) { + allFiles = true; + } + String familyLogMsg = (family != null) ? Bytes.toString(family) : ""; + if (LOG.isTraceEnabled()) { + LOG.trace("User-triggered mob compaction requested for table: " + + tableName.getNameAsString() + " for column family: " + familyLogMsg); + } + master.requestMobCompaction(tableName, compactedColumns, allFiles); + return CompactRegionResponse.newBuilder().build(); + } + @Override public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller, IsBalancerEnabledRequest request) throws ServiceException { http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java index 0000000,f122955..4b956e6 mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java @@@ -1,0 -1,93 +1,93 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.master; + + import java.util.Map; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.TimeUnit; + + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.hbase.HColumnDescriptor; + import org.apache.hadoop.hbase.HTableDescriptor; + import org.apache.hadoop.hbase.ScheduledChore; + import org.apache.hadoop.hbase.TableDescriptors; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.client.TableState; + import org.apache.hadoop.hbase.mob.MobUtils; + + /** + * The Class MobCompactChore for running compaction regularly to merge small mob files. + */ + @InterfaceAudience.Private + public class MobCompactionChore extends ScheduledChore { + + private static final Log LOG = LogFactory.getLog(MobCompactionChore.class); + private HMaster master; + private TableLockManager tableLockManager; + private ExecutorService pool; + + public MobCompactionChore(HMaster master, int period) { + // use the period as initial delay. + super(master.getServerName() + "-MobCompactionChore", master, period, period, TimeUnit.SECONDS); + this.master = master; + this.tableLockManager = master.getTableLockManager(); + this.pool = MobUtils.createMobCompactorThreadPool(master.getConfiguration()); + } + + @Override + protected void chore() { + try { + TableDescriptors htds = master.getTableDescriptors(); + Map<String, HTableDescriptor> map = htds.getAll(); + for (HTableDescriptor htd : map.values()) { + if (!master.getTableStateManager().isTableState(htd.getTableName(), + TableState.State.ENABLED)) { + continue; + } + boolean reported = false; + try { + for (HColumnDescriptor hcd : htd.getColumnFamilies()) { + if (!hcd.isMobEnabled()) { + continue; + } + if (!reported) { + master.reportMobCompactionStart(htd.getTableName()); + reported = true; + } + MobUtils.doMobCompaction(master.getConfiguration(), master.getFileSystem(), + htd.getTableName(), hcd, pool, tableLockManager, false); + } + } finally { + if (reported) { + master.reportMobCompactionEnd(htd.getTableName()); + } + } + } + } catch (Exception e) { + LOG.error("Failed to compact mob files", e); + } + } + + @Override + protected void cleanup() { + super.cleanup(); + pool.shutdown(); + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java index 6e52ec2,fb9754a..75edfab --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java @@@ -284,11 -284,8 +285,14 @@@ public class DeleteColumnFamilyProcedur throw new InvalidFamilyOperationException("Family '" + getColumnFamilyName() + "' does not exist, so it cannot be deleted"); } + + if (unmodifiedHTableDescriptor.getColumnFamilies().length == 1) { + throw new InvalidFamilyOperationException("Family '" + getColumnFamilyName() + + "' is the only column family in the table, so it cannot be deleted"); + } ++ + // whether mob family + hasMob = unmodifiedHTableDescriptor.getFamily(familyName).isMobEnabled(); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java index 60212e8,45b7b51..c0a5091 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java @@@ -18,8 -18,8 +18,8 @@@ package org.apache.hadoop.hbase.master.procedure; --import java.io.InputStream; import java.io.IOException; ++import java.io.InputStream; import java.io.OutputStream; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@@ -27,16 -27,12 +27,17 @@@ import java.util.List import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.*; --import org.apache.hadoop.hbase.classification.InterfaceAudience; --import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; ++import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; ++import org.apache.hadoop.hbase.HRegionInfo; ++import org.apache.hadoop.hbase.HTableDescriptor; ++import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotDisabledException; +import org.apache.hadoop.hbase.TableNotFoundException; - import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.backup.HFileArchiver; - import org.apache.hadoop.hbase.MetaTableAccessor; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Result; @@@ -44,15 -40,17 +45,17 @@@ import org.apache.hadoop.hbase.client.R import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.exceptions.HBaseException; -import org.apache.hadoop.hbase.mob.MobConstants; -import org.apache.hadoop.hbase.mob.MobUtils; --import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; ++import org.apache.hadoop.hbase.mob.MobConstants; ++import org.apache.hadoop.hbase.mob.MobUtils; ++import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; ++import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState; --import org.apache.hadoop.hbase.protobuf.ProtobufUtil; --import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; ++import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.security.UserGroupInformation; @@@ -403,8 -425,6 +428,8 @@@ public class DeleteTableProcedur protected static void deleteTableStates(final MasterProcedureEnv env, final TableName tableName) throws IOException { - ProcedureSyncWait.getMasterQuotaManager(env).removeTableFromNamespaceQuota(tableName); + if (!tableName.isSystemTable()) { + ProcedureSyncWait.getMasterQuotaManager(env).removeTableFromNamespaceQuota(tableName); + } } - } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java index 0000000,457eb6c..7c4d6fe mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java @@@ -1,0 -1,114 +1,114 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob; + + import java.io.IOException; + import java.util.concurrent.atomic.AtomicLong; + -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.io.hfile.CacheConfig; + import org.apache.hadoop.hbase.regionserver.BloomType; + import org.apache.hadoop.hbase.regionserver.StoreFile; + + /** + * Cached mob file. + */ + @InterfaceAudience.Private + public class CachedMobFile extends MobFile implements Comparable<CachedMobFile> { + + private long accessCount; + private AtomicLong referenceCount = new AtomicLong(0); + + public CachedMobFile(StoreFile sf) { + super(sf); + } + + public static CachedMobFile create(FileSystem fs, Path path, Configuration conf, + CacheConfig cacheConf) throws IOException { + StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE); + return new CachedMobFile(sf); + } + + public void access(long accessCount) { + this.accessCount = accessCount; + } + + public int compareTo(CachedMobFile that) { + if (this.accessCount == that.accessCount) return 0; + return this.accessCount < that.accessCount ? 1 : -1; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof CachedMobFile)) { + return false; + } + return compareTo((CachedMobFile) obj) == 0; + } + + @Override + public int hashCode() { + return (int)(accessCount ^ (accessCount >>> 32)); + } + + /** + * Opens the mob file if it's not opened yet and increases the reference. + * It's not thread-safe. Use MobFileCache.openFile() instead. + * The reader of the mob file is just opened when it's not opened no matter how many times + * this open() method is invoked. + * The reference is a counter that how many times this reader is referenced. When the + * reference is 0, this reader is closed. + */ + @Override + public void open() throws IOException { + super.open(); + referenceCount.incrementAndGet(); + } + + /** + * Decreases the reference of the underlying reader for the mob file. + * It's not thread-safe. Use MobFileCache.closeFile() instead. + * This underlying reader isn't closed until the reference is 0. + */ + @Override + public void close() throws IOException { + long refs = referenceCount.decrementAndGet(); + if (refs == 0) { + super.close(); + } + } + + /** + * Gets the reference of the current mob file. + * Internal usage, currently it's for testing. + * @return The reference of the current mob file. + */ + public long getReferenceCount() { + return this.referenceCount.longValue(); + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 0000000,c4d2a45..1670f7b mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@@ -1,0 -1,302 +1,300 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob; + + import java.io.IOException; + import java.util.ArrayList; + import java.util.Date; + import java.util.List; + + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hbase.Cell; + import org.apache.hadoop.hbase.CellUtil; + import org.apache.hadoop.hbase.KeyValue; + import org.apache.hadoop.hbase.KeyValueUtil; + import org.apache.hadoop.hbase.Tag; + import org.apache.hadoop.hbase.TagType; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.regionserver.*; ++import org.apache.hadoop.hbase.regionserver.HMobStore; ++import org.apache.hadoop.hbase.regionserver.HStore; ++import org.apache.hadoop.hbase.regionserver.InternalScanner; ++import org.apache.hadoop.hbase.regionserver.MobCompactionStoreScanner; ++import org.apache.hadoop.hbase.regionserver.ScanType; ++import org.apache.hadoop.hbase.regionserver.ScannerContext; ++import org.apache.hadoop.hbase.regionserver.Store; ++import org.apache.hadoop.hbase.regionserver.StoreFile; + import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; ++import org.apache.hadoop.hbase.regionserver.StoreFileScanner; + import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; + import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; + import org.apache.hadoop.hbase.util.Bytes; + + /** + * Compact passed set of files in the mob-enabled column family. + */ + @InterfaceAudience.Private + public class DefaultMobStoreCompactor extends DefaultCompactor { + + private static final Log LOG = LogFactory.getLog(DefaultMobStoreCompactor.class); + private long mobSizeThreshold; + private HMobStore mobStore; + public DefaultMobStoreCompactor(Configuration conf, Store store) { + super(conf, store); + // The mob cells reside in the mob-enabled column family which is held by HMobStore. + // During the compaction, the compactor reads the cells from the mob files and + // probably creates new mob files. All of these operations are included in HMobStore, + // so we need to cast the Store to HMobStore. + if (!(store instanceof HMobStore)) { + throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); + } + mobStore = (HMobStore) store; + mobSizeThreshold = store.getFamily().getMobThreshold(); + } + + /** + * Creates a writer for a new file in a temporary directory. + * @param fd The file details. + * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region. + * @return Writer for a new StoreFile in the tmp dir. + * @throws IOException + */ + @Override + protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException { + // make this writer with tags always because of possible new cells with tags. + StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, + true, true, true); + return writer; + } + + @Override + protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners, + ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { + Scan scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + if (scanType == ScanType.COMPACT_DROP_DELETES) { + // In major compaction, we need to write the delete markers to del files, so we have to + // retain the them in scanning. + scanType = ScanType.COMPACT_RETAIN_DELETES; + return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners, + scanType, smallestReadPoint, earliestPutTs, true); + } else { + return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners, + scanType, smallestReadPoint, earliestPutTs, false); + } + } + + // TODO refactor to take advantage of the throughput controller. + + /** + * Performs compaction on a column family with the mob flag enabled. + * This is for when the mob threshold size has changed or if the mob + * column family mode has been toggled via an alter table statement. + * Compacts the files by the following rules. + * 1. If the cell has a mob reference tag, the cell's value is the path of the mob file. + * <ol> + * <li> + * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, + * directly copy the (with mob tag) cell into the new store file. + * </li> + * <li> + * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into + * the new store file. + * </li> + * </ol> + * 2. If the cell doesn't have a reference tag. + * <ol> + * <li> + * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, + * write this cell to a mob file, and write the path of this mob file to the store file. + * </li> + * <li> + * Otherwise, directly write this cell into the store file. + * </li> + * </ol> + * In the mob compaction, the {@link MobCompactionStoreScanner} is used as a scanner + * which could output the normal cells and delete markers together when required. + * After the major compaction on the normal hfiles, we have a guarantee that we have purged all + * deleted or old version mob refs, and the delete markers are written to a del file with the + * suffix _del. Because of this, it is safe to use the del file in the mob compaction. + * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the + * mob files. When the small mob files are merged into bigger ones, the del file is added into + * the scanner to filter the deleted cells. + * @param fd File details + * @param scanner Where to read from. + * @param writer Where to write to. + * @param smallestReadPoint Smallest read point. + * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint + * @param throughputController The compaction throughput controller. + * @param major Is a major compaction. + * @return Whether compaction ended; false if it was interrupted for any reason. + */ + @Override + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, + long smallestReadPoint, boolean cleanSeqId, + CompactionThroughputController throughputController, boolean major) throws IOException { + if (!(scanner instanceof MobCompactionStoreScanner)) { + throw new IllegalArgumentException( - "The scanner should be an instance of MobCompactionStoreScanner"); ++ "The scanner should be an instance of MobCompactionStoreScanner"); + } + MobCompactionStoreScanner compactionScanner = (MobCompactionStoreScanner) scanner; + int bytesWritten = 0; + // Since scanner.next() can return 'false' but still be delivering data, + // we have to use a do/while loop. + List<Cell> cells = new ArrayList<Cell>(); + // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME + int closeCheckInterval = HStore.getCloseCheckInterval(); + boolean hasMore; + Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); + byte[] fileName = null; - StoreFile.Writer mobFileWriter = null; - StoreFile.Writer delFileWriter = null; - long mobCells = 0; - long deleteMarkersCount = 0; - Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName() - .getName()); - long cellsCountCompactedToMob = 0; - long cellsCountCompactedFromMob = 0; - long cellsSizeCompactedToMob = 0; - long cellsSizeCompactedFromMob = 0; ++ StoreFile.Writer mobFileWriter = null, delFileWriter = null; ++ long mobCells = 0, deleteMarkersCount = 0; ++ Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName().getName()); ++ long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; ++ long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; + try { + try { + // If the mob file writer could not be created, directly write the cell to the store file. + mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, + store.getFamily().getCompression(), store.getRegionInfo().getStartKey()); + fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); + } catch (IOException e) { - LOG.error( - "Failed to create mob writer, " - + "we will continue the compaction by writing MOB cells directly in store files", - e); ++ LOG.error("Failed to create mob writer, " ++ + "we will continue the compaction by writing MOB cells directly in store files", e); + } + delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, + store.getFamily().getCompression(), store.getRegionInfo().getStartKey()); + ScannerContext scannerContext = + ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + do { + hasMore = compactionScanner.next(cells, scannerContext); - // output to writer: + for (Cell c : cells) { + if (compactionScanner.isOutputDeleteMarkers() && CellUtil.isDelete(c)) { + delFileWriter.append(c); + deleteMarkersCount++; + } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) { + // If the mob file writer is null or the kv type is not put, directly write the cell + // to the store file. + writer.append(c); + } else if (MobUtils.isMobReferenceCell(c)) { + if (MobUtils.hasValidMobRefCellValue(c)) { + int size = MobUtils.getMobValueLength(c); + if (size > mobSizeThreshold) { + // If the value size is larger than the threshold, it's regarded as a mob. Since + // its value is already in the mob file, directly write this cell to the store file + writer.append(c); + } else { + // If the value is not larger than the threshold, it's not regarded a mob. Retrieve + // the mob cell from the mob file, and write it back to the store file. + Cell mobCell = mobStore.resolve(c, false); + if (mobCell.getValueLength() != 0) { + // put the mob data back to the store file + CellUtil.setSequenceId(mobCell, c.getSequenceId()); + writer.append(mobCell); + cellsCountCompactedFromMob++; + cellsSizeCompactedFromMob += mobCell.getValueLength(); + } else { + // If the value of a file is empty, there might be issues when retrieving, + // directly write the cell to the store file, and leave it to be handled by the + // next compaction. + writer.append(c); + } + } + } else { + LOG.warn("The value format of the KeyValue " + c + + " is wrong, its length is less than " + Bytes.SIZEOF_INT); + writer.append(c); + } + } else if (c.getValueLength() <= mobSizeThreshold) { - // If the value size of a cell is not larger than the threshold, directly write it to - // the store file. ++ //If value size of a cell is not larger than the threshold, directly write to store file + writer.append(c); + } else { + // If the value size of a cell is larger than the threshold, it's regarded as a mob, + // write this cell to a mob file, and write the path to the store file. + mobCells++; + // append the original keyValue in the mob file. + mobFileWriter.append(c); + KeyValue reference = MobUtils.createMobRefKeyValue(c, fileName, tableNameTag); + // write the cell whose value is the path of a mob file to the store file. + writer.append(reference); + cellsCountCompactedToMob++; + cellsSizeCompactedToMob += c.getValueLength(); + } + ++progress.currentCompactedKVs; - + // check periodically to see if a system stop is requested + if (closeCheckInterval > 0) { + bytesWritten += KeyValueUtil.length(c); + if (bytesWritten > closeCheckInterval) { + bytesWritten = 0; + if (!store.areWritesEnabled()) { + progress.cancel(); + return false; + } + } + } + } + cells.clear(); + } while (hasMore); + } finally { + if (mobFileWriter != null) { + mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); + mobFileWriter.close(); + } + if (delFileWriter != null) { + delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount); + delFileWriter.close(); + } + } + if (mobFileWriter != null) { + if (mobCells > 0) { + // If the mob file is not empty, commit it. + mobStore.commitFile(mobFileWriter.getPath(), path); + } else { + try { + // If the mob file is empty, delete it instead of committing. + store.getFileSystem().delete(mobFileWriter.getPath(), true); + } catch (IOException e) { + LOG.error("Failed to delete the temp mob file", e); + } + } + } + if (delFileWriter != null) { + if (deleteMarkersCount > 0) { + // If the del file is not empty, commit it. + // If the commit fails, the compaction is re-performed again. + mobStore.commitFile(delFileWriter.getPath(), path); + } else { + try { + // If the del file is empty, delete it instead of committing. + store.getFileSystem().delete(delFileWriter.getPath(), true); + } catch (IOException e) { + LOG.error("Failed to delete the temp del file", e); + } + } + } + mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); + mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); + mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); + mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); + progress.complete(); + return true; + } + }
