http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index fcc93db,91c406c..17c1ee3 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@@ -44,16 -42,14 +43,19 @@@ import org.apache.hadoop.hbase.exceptio import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.procedure.MasterProcedureManager; + import org.apache.hadoop.hbase.procedure2.Procedure; + import org.apache.hadoop.hbase.procedure2.ProcedureResult; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.generated.*; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse; + import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; @@@ -1311,108 -1352,11 +1358,116 @@@ public class MasterRpcServices extends return response.build(); } + /** + * Compact a region on the master. + * + * @param controller the RPC controller + * @param request the request + * @throws ServiceException + */ + @Override + @QosPriority(priority=HConstants.ADMIN_QOS) + public CompactRegionResponse compactRegion(final RpcController controller, + final CompactRegionRequest request) throws ServiceException { + try { + master.checkInitialized(); + byte[] regionName = request.getRegion().getValue().toByteArray(); + TableName tableName = HRegionInfo.getTable(regionName); + // if the region is a mob region, do the mob file compaction. + if (MobUtils.isMobRegionName(tableName, regionName)) { + return compactMob(request, tableName); + } else { + return super.compactRegion(controller, request); + } + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + @Override + @QosPriority(priority=HConstants.ADMIN_QOS) + public GetRegionInfoResponse getRegionInfo(final RpcController controller, + final GetRegionInfoRequest request) throws ServiceException { + try { + master.checkInitialized(); + byte[] regionName = request.getRegion().getValue().toByteArray(); + TableName tableName = HRegionInfo.getTable(regionName); + if (MobUtils.isMobRegionName(tableName, regionName)) { + // a dummy region info contains the compaction state. + HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(tableName); + GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); + builder.setRegionInfo(HRegionInfo.convert(mobRegionInfo)); + if (request.hasCompactionState() && request.getCompactionState()) { + builder.setCompactionState(master.getMobCompactionState(tableName)); + } + return builder.build(); + } else { + return super.getRegionInfo(controller, request); + } + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + /** + * Compacts the mob files in the current table. + * @param request the request. + * @param tableName the current table name. + * @return The response of the mob file compaction. + * @throws IOException + */ + private CompactRegionResponse compactMob(final CompactRegionRequest request, + TableName tableName) throws IOException { + if (!master.getTableStateManager().isTableState(tableName, TableState.State.ENABLED)) { + throw new DoNotRetryIOException("Table " + tableName + " is not enabled"); + } + boolean isForceAllFiles = false; + List<HColumnDescriptor> compactedColumns = new ArrayList<HColumnDescriptor>(); + HColumnDescriptor[] hcds = master.getTableDescriptors().get(tableName).getColumnFamilies(); + byte[] family = null; + if (request.hasFamily()) { + family = request.getFamily().toByteArray(); + for (HColumnDescriptor hcd : hcds) { + if (Bytes.equals(family, hcd.getName())) { + if (!hcd.isMobEnabled()) { + LOG.error("Column family " + hcd.getName() + " is not a mob column family"); + throw new DoNotRetryIOException("Column family " + hcd.getName() - + " is not a mob column family"); ++ + " is not a mob column family"); + } + compactedColumns.add(hcd); + } + } + } else { + for (HColumnDescriptor hcd : hcds) { + if (hcd.isMobEnabled()) { + compactedColumns.add(hcd); + } + } + } + if (compactedColumns.isEmpty()) { + LOG.error("No mob column families are assigned in the mob file compaction"); + throw new DoNotRetryIOException( - "No mob column families are assigned in the mob file compaction"); ++ "No mob column families are assigned in the mob file compaction"); + } + if (request.hasMajor() && request.getMajor()) { + isForceAllFiles = true; + } + String familyLogMsg = (family != null) ? Bytes.toString(family) : ""; + if (LOG.isTraceEnabled()) { + LOG.trace("User-triggered mob file compaction requested for table: " - + tableName.getNameAsString() + " for column family: " + familyLogMsg); ++ + tableName.getNameAsString() + " for column family: " + familyLogMsg); + } + master.mobFileCompactThread.requestMobFileCompaction(master.getConfiguration(), - master.getFileSystem(), tableName, compactedColumns, - master.getTableLockManager(), isForceAllFiles); ++ master.getFileSystem(), tableName, compactedColumns, ++ master.getTableLockManager(), isForceAllFiles); + return CompactRegionResponse.newBuilder().build(); + } ++ + @Override + public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller, + IsBalancerEnabledRequest request) throws ServiceException { + IsBalancerEnabledResponse.Builder response = IsBalancerEnabledResponse.newBuilder(); + response.setEnabled(master.isBalancerOn()); + return response.build(); + } }
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java index 0664a55,d729cfa..cbff5dd --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java @@@ -27,9 -27,15 +27,10 @@@ import org.apache.commons.logging.Log import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CoordinatedStateException; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java index 0000000,2582a1e..7809e55 mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java @@@ -1,0 -1,422 +1,450 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.hadoop.hbase.master.procedure; + + import java.io.InputStream; + import java.io.IOException; + import java.io.OutputStream; + import java.security.PrivilegedExceptionAction; + import java.util.ArrayList; + import java.util.List; + + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; ++import org.apache.hadoop.hbase.*; + import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.FileStatus; + import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotDisabledException; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.HRegionInfo; + import org.apache.hadoop.hbase.backup.HFileArchiver; -import org.apache.hadoop.hbase.MetaTableAccessor; + import org.apache.hadoop.hbase.client.ClusterConnection; + import org.apache.hadoop.hbase.client.Delete; + import org.apache.hadoop.hbase.client.Result; + import org.apache.hadoop.hbase.client.ResultScanner; + import org.apache.hadoop.hbase.client.Scan; + import org.apache.hadoop.hbase.client.Table; + import org.apache.hadoop.hbase.exceptions.HBaseException; ++import org.apache.hadoop.hbase.mob.MobConstants; ++import org.apache.hadoop.hbase.mob.MobUtils; + import org.apache.hadoop.hbase.regionserver.HRegion; + import org.apache.hadoop.hbase.master.AssignmentManager; + import org.apache.hadoop.hbase.master.MasterCoprocessorHost; + import org.apache.hadoop.hbase.master.MasterFileSystem; + import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; + import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos; + import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState; + import org.apache.hadoop.hbase.protobuf.ProtobufUtil; + import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; + import org.apache.hadoop.hbase.quotas.MasterQuotaManager; + import org.apache.hadoop.hbase.util.FSUtils; + import org.apache.hadoop.security.UserGroupInformation; + + @InterfaceAudience.Private + public class DeleteTableProcedure + extends StateMachineProcedure<MasterProcedureEnv, DeleteTableState> + implements TableProcedureInterface { + private static final Log LOG = LogFactory.getLog(DeleteTableProcedure.class); + + private List<HRegionInfo> regions; + private UserGroupInformation user; + private TableName tableName; + + // used for compatibility with old clients + private final ProcedurePrepareLatch syncLatch; + + public DeleteTableProcedure() { + // Required by the Procedure framework to create the procedure on replay + syncLatch = null; + } + + public DeleteTableProcedure(final MasterProcedureEnv env, final TableName tableName) + throws IOException { + this(env, tableName, null); + } + + public DeleteTableProcedure(final MasterProcedureEnv env, final TableName tableName, + final ProcedurePrepareLatch syncLatch) throws IOException { + this.tableName = tableName; + this.user = env.getRequestUser().getUGI(); + + // used for compatibility with clients without procedures + // they need a sync TableNotFoundException, TableNotDisabledException, ... + this.syncLatch = syncLatch; + } + + @Override + protected Flow executeFromState(final MasterProcedureEnv env, DeleteTableState state) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + " execute state=" + state); + } + try { + switch (state) { + case DELETE_TABLE_PRE_OPERATION: + // Verify if we can delete the table + boolean deletable = prepareDelete(env); + ProcedurePrepareLatch.releaseLatch(syncLatch, this); + if (!deletable) { + assert isFailed() : "the delete should have an exception here"; + return Flow.NO_MORE_STATE; + } + + // TODO: Move out... in the acquireLock() + LOG.debug("waiting for '" + getTableName() + "' regions in transition"); + regions = ProcedureSyncWait.getRegionsFromMeta(env, getTableName()); + assert regions != null && !regions.isEmpty() : "unexpected 0 regions"; + ProcedureSyncWait.waitRegionInTransition(env, regions); + + // Call coprocessors + preDelete(env); + + setNextState(DeleteTableState.DELETE_TABLE_REMOVE_FROM_META); + break; + case DELETE_TABLE_REMOVE_FROM_META: + LOG.debug("delete '" + getTableName() + "' regions from META"); + DeleteTableProcedure.deleteFromMeta(env, getTableName(), regions); + setNextState(DeleteTableState.DELETE_TABLE_CLEAR_FS_LAYOUT); + break; + case DELETE_TABLE_CLEAR_FS_LAYOUT: + LOG.debug("delete '" + getTableName() + "' from filesystem"); + DeleteTableProcedure.deleteFromFs(env, getTableName(), regions, true); + setNextState(DeleteTableState.DELETE_TABLE_UPDATE_DESC_CACHE); + regions = null; + break; + case DELETE_TABLE_UPDATE_DESC_CACHE: + LOG.debug("delete '" + getTableName() + "' descriptor"); + DeleteTableProcedure.deleteTableDescriptorCache(env, getTableName()); + setNextState(DeleteTableState.DELETE_TABLE_UNASSIGN_REGIONS); + break; + case DELETE_TABLE_UNASSIGN_REGIONS: + LOG.debug("delete '" + getTableName() + "' assignment state"); + DeleteTableProcedure.deleteAssignmentState(env, getTableName()); + setNextState(DeleteTableState.DELETE_TABLE_POST_OPERATION); + break; + case DELETE_TABLE_POST_OPERATION: + postDelete(env); + LOG.debug("delete '" + getTableName() + "' completed"); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } catch (HBaseException|IOException e) { + LOG.warn("Retriable error trying to delete table=" + getTableName() + " state=" + state, e); + } catch (InterruptedException e) { + // if the interrupt is real, the executor will be stopped. + LOG.warn("Interrupted trying to delete table=" + getTableName() + " state=" + state, e); + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState(final MasterProcedureEnv env, final DeleteTableState state) { + if (state == DeleteTableState.DELETE_TABLE_PRE_OPERATION) { + // nothing to rollback, pre-delete is just table-state checks. + // We can fail if the table does not exist or is not disabled. + ProcedurePrepareLatch.releaseLatch(syncLatch, this); + return; + } + + // The delete doesn't have a rollback. The execution will succeed, at some point. + throw new UnsupportedOperationException("unhandled state=" + state); + } + + @Override + protected DeleteTableState getState(final int stateId) { + return DeleteTableState.valueOf(stateId); + } + + @Override + protected int getStateId(final DeleteTableState state) { + return state.getNumber(); + } + + @Override + protected DeleteTableState getInitialState() { + return DeleteTableState.DELETE_TABLE_PRE_OPERATION; + } + + @Override + public TableName getTableName() { + return tableName; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.DELETE; + } + + @Override + public boolean abort(final MasterProcedureEnv env) { + // TODO: We may be able to abort if the procedure is not started yet. + return false; + } + + @Override + protected boolean acquireLock(final MasterProcedureEnv env) { + if (!env.isInitialized()) return false; + return env.getProcedureQueue().tryAcquireTableWrite(getTableName(), "delete table"); + } + + @Override + protected void releaseLock(final MasterProcedureEnv env) { + env.getProcedureQueue().releaseTableWrite(getTableName()); + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" (table="); + sb.append(getTableName()); + sb.append(") user="); + sb.append(user); + } + + @Override + public void serializeStateData(final OutputStream stream) throws IOException { + super.serializeStateData(stream); + + MasterProcedureProtos.DeleteTableStateData.Builder state = + MasterProcedureProtos.DeleteTableStateData.newBuilder() + .setUserInfo(MasterProcedureUtil.toProtoUserInfo(this.user)) + .setTableName(ProtobufUtil.toProtoTableName(tableName)); + if (regions != null) { + for (HRegionInfo hri: regions) { + state.addRegionInfo(HRegionInfo.convert(hri)); + } + } + state.build().writeDelimitedTo(stream); + } + + @Override + public void deserializeStateData(final InputStream stream) throws IOException { + super.deserializeStateData(stream); + + MasterProcedureProtos.DeleteTableStateData state = + MasterProcedureProtos.DeleteTableStateData.parseDelimitedFrom(stream); + user = MasterProcedureUtil.toUserInfo(state.getUserInfo()); + tableName = ProtobufUtil.toTableName(state.getTableName()); + if (state.getRegionInfoCount() == 0) { + regions = null; + } else { + regions = new ArrayList<HRegionInfo>(state.getRegionInfoCount()); + for (HBaseProtos.RegionInfo hri: state.getRegionInfoList()) { + regions.add(HRegionInfo.convert(hri)); + } + } + } + + private boolean prepareDelete(final MasterProcedureEnv env) throws IOException { + try { + env.getMasterServices().checkTableModifiable(tableName); + } catch (TableNotFoundException|TableNotDisabledException e) { + setFailure("master-delete-table", e); + return false; + } + return true; + } + + private boolean preDelete(final MasterProcedureEnv env) + throws IOException, InterruptedException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + final TableName tableName = this.tableName; + user.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + cpHost.preDeleteTableHandler(tableName); + return null; + } + }); + } + return true; + } + + private void postDelete(final MasterProcedureEnv env) + throws IOException, InterruptedException { + deleteTableStates(env, tableName); + + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + final TableName tableName = this.tableName; + user.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + cpHost.postDeleteTableHandler(tableName); + return null; + } + }); + } + } + + protected static void deleteFromFs(final MasterProcedureEnv env, + final TableName tableName, final List<HRegionInfo> regions, + final boolean archive) throws IOException { + final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + final FileSystem fs = mfs.getFileSystem(); + final Path tempdir = mfs.getTempDir(); + + final Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), tableName); + final Path tempTableDir = FSUtils.getTableDir(tempdir, tableName); + + if (fs.exists(tableDir)) { + // Ensure temp exists + if (!fs.exists(tempdir) && !fs.mkdirs(tempdir)) { + throw new IOException("HBase temp directory '" + tempdir + "' creation failure."); + } + + // Ensure parent exists + if (!fs.exists(tempTableDir.getParent()) && !fs.mkdirs(tempTableDir.getParent())) { + throw new IOException("HBase temp directory '" + tempdir + "' creation failure."); + } + + // Move the table in /hbase/.tmp + if (!fs.rename(tableDir, tempTableDir)) { + if (fs.exists(tempTableDir)) { + // TODO + // what's in this dir? something old? probably something manual from the user... + // let's get rid of this stuff... + FileStatus[] files = fs.listStatus(tempdir); + if (files != null && files.length > 0) { + for (int i = 0; i < files.length; ++i) { + if (!files[i].isDir()) continue; + HFileArchiver.archiveRegion(fs, mfs.getRootDir(), tempTableDir, files[i].getPath()); + } + } + fs.delete(tempdir, true); + } + throw new IOException("Unable to move '" + tableDir + "' to temp '" + tempTableDir + "'"); + } + } + + // Archive regions from FS (temp directory) + if (archive) { + for (HRegionInfo hri : regions) { + LOG.debug("Archiving region " + hri.getRegionNameAsString() + " from FS"); + HFileArchiver.archiveRegion(fs, mfs.getRootDir(), + tempTableDir, HRegion.getRegionDir(tempTableDir, hri.getEncodedName())); + } + LOG.debug("Table '" + tableName + "' archived!"); + } + ++ // Archive the mob data if there is a mob-enabled column ++ HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(tableName); ++ HColumnDescriptor[] hcds = htd.getColumnFamilies(); ++ boolean hasMob = false; ++ for (HColumnDescriptor hcd : hcds) { ++ if (hcd.isMobEnabled()) { ++ hasMob = true; ++ break; ++ } ++ } ++ Path mobTableDir = null; ++ if (hasMob) { ++ // Archive mob data ++ mobTableDir = FSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME), ++ tableName); ++ Path regionDir = ++ new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName()); ++ if (fs.exists(regionDir)) { ++ HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, regionDir); ++ } ++ } ++ ++ + // Delete table directory from FS (temp directory) + if (!fs.delete(tempTableDir, true) && fs.exists(tempTableDir)) { + throw new IOException("Couldn't delete " + tempTableDir); + } ++ ++ // Delete the table directory where the mob files are saved ++ if (hasMob && mobTableDir != null && fs.exists(mobTableDir)) { ++ if (!fs.delete(mobTableDir, true)) { ++ LOG.error("Couldn't delete " + mobTableDir); ++ } ++ } + } + + /** + * There may be items for this table still up in hbase:meta in the case where the + * info:regioninfo column was empty because of some write error. Remove ALL rows from hbase:meta + * that have to do with this table. See HBASE-12980. + * @throws IOException + */ + private static void cleanAnyRemainingRows(final MasterProcedureEnv env, + final TableName tableName) throws IOException { + ClusterConnection connection = env.getMasterServices().getConnection(); + Scan tableScan = MetaTableAccessor.getScanForTableName(connection, tableName); + try (Table metaTable = + connection.getTable(TableName.META_TABLE_NAME)) { + List<Delete> deletes = new ArrayList<Delete>(); + try (ResultScanner resScanner = metaTable.getScanner(tableScan)) { + for (Result result : resScanner) { + deletes.add(new Delete(result.getRow())); + } + } + if (!deletes.isEmpty()) { + LOG.warn("Deleting some vestigal " + deletes.size() + " rows of " + tableName + + " from " + TableName.META_TABLE_NAME); + metaTable.delete(deletes); + } + } + } + + protected static void deleteFromMeta(final MasterProcedureEnv env, + final TableName tableName, List<HRegionInfo> regions) throws IOException { + MetaTableAccessor.deleteRegions(env.getMasterServices().getConnection(), regions); + + // Clean any remaining rows for this table. + cleanAnyRemainingRows(env, tableName); + } + + protected static void deleteAssignmentState(final MasterProcedureEnv env, + final TableName tableName) throws IOException { + AssignmentManager am = env.getMasterServices().getAssignmentManager(); + + // Clean up regions of the table in RegionStates. + LOG.debug("Removing '" + tableName + "' from region states."); + am.getRegionStates().tableDeleted(tableName); + + // If entry for this table states, remove it. + LOG.debug("Marking '" + tableName + "' as deleted."); + am.getTableStateManager().setDeletedTable(tableName); + } + + protected static void deleteTableDescriptorCache(final MasterProcedureEnv env, + final TableName tableName) throws IOException { + LOG.debug("Removing '" + tableName + "' descriptor."); + env.getMasterServices().getTableDescriptors().remove(tableName); + } + + protected static void deleteTableStates(final MasterProcedureEnv env, final TableName tableName) + throws IOException { + getMasterQuotaManager(env).removeTableFromNamespaceQuota(tableName); + } + + private static MasterQuotaManager getMasterQuotaManager(final MasterProcedureEnv env) + throws IOException { + return ProcedureSyncWait.waitFor(env, "quota manager to be available", + new ProcedureSyncWait.Predicate<MasterQuotaManager>() { + @Override + public MasterQuotaManager evaluate() throws IOException { + return env.getMasterServices().getMasterQuotaManager(); + } + }); + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java index c2abc7c,0000000..d54dca4 mode 100644,000000..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java @@@ -1,308 -1,0 +1,304 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.client.Scan; - import org.apache.hadoop.hbase.regionserver.HMobStore; - import org.apache.hadoop.hbase.regionserver.HStore; - import org.apache.hadoop.hbase.regionserver.InternalScanner; - import org.apache.hadoop.hbase.regionserver.MobCompactionStoreScanner; - import org.apache.hadoop.hbase.regionserver.ScanType; - import org.apache.hadoop.hbase.regionserver.Store; - import org.apache.hadoop.hbase.regionserver.StoreFile; ++import org.apache.hadoop.hbase.regionserver.*; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; - import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Compact passed set of files in the mob-enabled column family. + */ [email protected] +public class DefaultMobCompactor extends DefaultCompactor { + + private static final Log LOG = LogFactory.getLog(DefaultMobCompactor.class); + private long mobSizeThreshold; + private HMobStore mobStore; + public DefaultMobCompactor(Configuration conf, Store store) { + super(conf, store); + // The mob cells reside in the mob-enabled column family which is held by HMobStore. + // During the compaction, the compactor reads the cells from the mob files and + // probably creates new mob files. All of these operations are included in HMobStore, + // so we need to cast the Store to HMobStore. + if (!(store instanceof HMobStore)) { + throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); + } + mobStore = (HMobStore) store; + mobSizeThreshold = store.getFamily().getMobThreshold(); + } + + /** + * Creates a writer for a new file in a temporary directory. + * @param fd The file details. + * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region. + * @return Writer for a new StoreFile in the tmp dir. + * @throws IOException + */ + @Override + protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException { + // make this writer with tags always because of possible new cells with tags. + StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, + true, fd.maxMVCCReadpoint >= smallestReadPoint, true); + return writer; + } + + @Override + protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners, + ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { + Scan scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + if (scanType == ScanType.COMPACT_DROP_DELETES) { + scanType = ScanType.COMPACT_RETAIN_DELETES; + return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners, + scanType, smallestReadPoint, earliestPutTs, true); + } else { + return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners, + scanType, smallestReadPoint, earliestPutTs, false); + } + } + + // TODO refactor to take advantage of the throughput controller. + + /** + * Performs compaction on a column family with the mob flag enabled. + * This is for when the mob threshold size has changed or if the mob + * column family mode has been toggled via an alter table statement. + * Compacts the files by the following rules. + * 1. If the cell has a mob reference tag, the cell's value is the path of the mob file. + * <ol> + * <li> + * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, + * directly copy the (with mob tag) cell into the new store file. + * </li> + * <li> + * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into + * the new store file. + * </li> + * </ol> + * 2. If the cell doesn't have a reference tag. + * <ol> + * <li> + * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, + * write this cell to a mob file, and write the path of this mob file to the store file. + * </li> + * <li> + * Otherwise, directly write this cell into the store file. + * </li> + * </ol> + * In the mob compaction, the {@link MobCompactionStoreScanner} is used as a scanner + * which could output the normal cells and delete markers together when required. + * After the major compaction on the normal hfiles, we have a guarantee that we have purged all + * deleted or old version mob refs, and the delete markers are written to a del file with the + * suffix _del. Because of this, it is safe to use the del file in the mob compaction. + * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the + * mob files. When the small mob files are merged into bigger ones, the del file is added into + * the scanner to filter the deleted cells. + * @param fd File details + * @param scanner Where to read from. + * @param writer Where to write to. + * @param smallestReadPoint Smallest read point. + * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint + * @param major Is a major compaction. + * @return Whether compaction ended; false if it was interrupted for any reason. + */ + @Override + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, + long smallestReadPoint, boolean cleanSeqId, + CompactionThroughputController throughputController, boolean major) throws IOException { + if (!(scanner instanceof MobCompactionStoreScanner)) { + throw new IllegalArgumentException( + "The scanner should be an instance of MobCompactionStoreScanner"); + } + MobCompactionStoreScanner compactionScanner = (MobCompactionStoreScanner) scanner; + int bytesWritten = 0; + // Since scanner.next() can return 'false' but still be delivering data, + // we have to use a do/while loop. + List<Cell> cells = new ArrayList<Cell>(); + // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME + int closeCheckInterval = HStore.getCloseCheckInterval(); + boolean hasMore; + Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); + byte[] fileName = null; + StoreFile.Writer mobFileWriter = null; + StoreFile.Writer delFileWriter = null; + long mobCells = 0; + long deleteMarkersCount = 0; + Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName() + .getName()); + long mobCompactedIntoMobCellsCount = 0; + long mobCompactedFromMobCellsCount = 0; + long mobCompactedIntoMobCellsSize = 0; + long mobCompactedFromMobCellsSize = 0; + try { + try { + // If the mob file writer could not be created, directly write the cell to the store file. + mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, + store.getFamily().getCompression(), store.getRegionInfo().getStartKey()); + fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); + } catch (IOException e) { + LOG.error( + "Fail to create mob writer, " + + "we will continue the compaction by writing MOB cells directly in store files", + e); + } + delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, + store.getFamily().getCompression(), store.getRegionInfo().getStartKey()); ++ ScannerContext scannerContext = ++ ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); ++ ++ + do { - hasMore = compactionScanner.next(cells, compactionKVMax); ++ hasMore = compactionScanner.next(cells, scannerContext); + // output to writer: + for (Cell c : cells) { + if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { + CellUtil.setSequenceId(c, 0); + } + if (compactionScanner.isOutputDeleteMarkers() && CellUtil.isDelete(c)) { + delFileWriter.append(c); + deleteMarkersCount++; + } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) { + // If the mob file writer is null or the kv type is not put, directly write the cell + // to the store file. + writer.append(c); + } else if (MobUtils.isMobReferenceCell(c)) { + if (MobUtils.hasValidMobRefCellValue(c)) { + int size = MobUtils.getMobValueLength(c); + if (size > mobSizeThreshold) { + // If the value size is larger than the threshold, it's regarded as a mob. Since + // its value is already in the mob file, directly write this cell to the store file + writer.append(c); + } else { + // If the value is not larger than the threshold, it's not regarded a mob. Retrieve + // the mob cell from the mob file, and write it back to the store file. + Cell mobCell = mobStore.resolve(c, false); + if (mobCell.getValueLength() != 0) { + // put the mob data back to the store file - // KeyValue mobKv = KeyValueUtil.ensureKeyValue(cell); + CellUtil.setSequenceId(mobCell, c.getSequenceId()); + writer.append(mobCell); + mobCompactedFromMobCellsCount++; + mobCompactedFromMobCellsSize += mobCell.getValueLength(); + } else { + // If the value of a file is empty, there might be issues when retrieving, + // directly write the cell to the store file, and leave it to be handled by the + // next compaction. + writer.append(c); + } + } + } else { + LOG.warn("The value format of the KeyValue " + c + + " is wrong, its length is less than " + Bytes.SIZEOF_INT); + writer.append(c); + } + } else if (c.getValueLength() <= mobSizeThreshold) { + // If the value size of a cell is not larger than the threshold, directly write it to + // the store file. + writer.append(c); + } else { + // If the value size of a cell is larger than the threshold, it's regarded as a mob, + // write this cell to a mob file, and write the path to the store file. + mobCells++; + // append the original keyValue in the mob file. + mobFileWriter.append(c); + KeyValue reference = MobUtils.createMobRefKeyValue(c, fileName, tableNameTag); + // write the cell whose value is the path of a mob file to the store file. + writer.append(reference); + mobCompactedIntoMobCellsCount++; + mobCompactedIntoMobCellsSize += c.getValueLength(); + } + ++progress.currentCompactedKVs; + + // check periodically to see if a system stop is requested + if (closeCheckInterval > 0) { + bytesWritten += KeyValueUtil.length(c); + if (bytesWritten > closeCheckInterval) { + bytesWritten = 0; + if (!store.areWritesEnabled()) { + progress.cancel(); + return false; + } + } + } + } + cells.clear(); + } while (hasMore); + } finally { + if (mobFileWriter != null) { + mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); + mobFileWriter.close(); + } + if (delFileWriter != null) { + delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount); + delFileWriter.close(); + } + } + if (mobFileWriter != null) { + if (mobCells > 0) { + // If the mob file is not empty, commit it. + mobStore.commitFile(mobFileWriter.getPath(), path); + } else { + try { + // If the mob file is empty, delete it instead of committing. + store.getFileSystem().delete(mobFileWriter.getPath(), true); + } catch (IOException e) { + LOG.error("Fail to delete the temp mob file", e); + } + } + } + if (delFileWriter != null) { + if (deleteMarkersCount > 0) { + // If the del file is not empty, commit it. + // If the commit fails, the compaction is re-performed again. + mobStore.commitFile(delFileWriter.getPath(), path); + } else { + try { + // If the del file is empty, delete it instead of committing. + store.getFileSystem().delete(delFileWriter.getPath(), true); + } catch (IOException e) { + LOG.error("Fail to delete the temp del file", e); + } + } + } + mobStore.updateMobCompactedFromMobCellsCount(mobCompactedFromMobCellsCount); + mobStore.updateMobCompactedIntoMobCellsCount(mobCompactedIntoMobCellsCount); + mobStore.updateMobCompactedFromMobCellsSize(mobCompactedFromMobCellsSize); + mobStore.updateMobCompactedIntoMobCellsSize(mobCompactedIntoMobCellsSize); + progress.complete(); + return true; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index 00b3421,0000000..44387f5 mode 100644,000000..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@@ -1,222 -1,0 +1,222 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; - import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; - import org.apache.hadoop.hbase.regionserver.HMobStore; - import org.apache.hadoop.hbase.regionserver.InternalScanner; - import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; - import org.apache.hadoop.hbase.regionserver.Store; - import org.apache.hadoop.hbase.regionserver.StoreFile; ++import org.apache.hadoop.hbase.regionserver.*; +import org.apache.hadoop.hbase.util.Bytes; ++import org.apache.hadoop.util.StringUtils; + +/** + * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher. + * If the store is not a mob store, the flusher flushes the MemStore the same with + * DefaultStoreFlusher, + * If the store is a mob store, the flusher flushes the MemStore into two places. + * One is the store files of HBase, the other is the mob files. + * <ol> + * <li>Cells that are not PUT type or have the delete mark will be directly flushed to HBase.</li> + * <li>If the size of a cell value is larger than a threshold, it'll be flushed + * to a mob file, another cell with the path of this file will be flushed to HBase.</li> + * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to + * HBase directly.</li> + * </ol> + * + */ [email protected] +public class DefaultMobStoreFlusher extends DefaultStoreFlusher { + + private static final Log LOG = LogFactory.getLog(DefaultMobStoreFlusher.class); + private final Object flushLock = new Object(); + private long mobCellValueSizeThreshold = 0; + private Path targetPath; + private HMobStore mobStore; + + public DefaultMobStoreFlusher(Configuration conf, Store store) throws IOException { + super(conf, store); + mobCellValueSizeThreshold = store.getFamily().getMobThreshold(); + this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(), + store.getColumnFamilyName()); + if (!this.store.getFileSystem().exists(targetPath)) { + this.store.getFileSystem().mkdirs(targetPath); + } + this.mobStore = (HMobStore) store; + } + + /** + * Flushes the snapshot of the MemStore. + * If this store is not a mob store, flush the cells in the snapshot to store files of HBase. + * If the store is a mob one, the flusher flushes the MemStore into two places. + * One is the store files of HBase, the other is the mob files. + * <ol> + * <li>Cells that are not PUT type or have the delete mark will be directly flushed to + * HBase.</li> + * <li>If the size of a cell value is larger than a threshold, it'll be + * flushed to a mob file, another cell with the path of this file will be flushed to HBase.</li> + * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to + * HBase directly.</li> + * </ol> + */ + @Override + public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, + MonitoredTask status) throws IOException { + ArrayList<Path> result = new ArrayList<Path>(); + int cellsCount = snapshot.getCellsCount(); + if (cellsCount == 0) return result; // don't flush if there are no entries + + // Use a store scanner to find which rows to flush. + long smallestReadPoint = store.getSmallestReadPoint(); + InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint); + if (scanner == null) { + return result; // NULL scanner returned from coprocessor hooks means skip normal processing + } + StoreFile.Writer writer; + try { + // TODO: We can fail in the below block before we complete adding this flush to + // list of store files. Add cleanup of anything put on filesystem if we fail. + synchronized (flushLock) { + status.setStatus("Flushing " + store + ": creating writer"); + // Write the map out to the disk + writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(), + false, true, true); + writer.setTimeRangeTracker(snapshot.getTimeRangeTracker()); + try { + // It's a mob store, flush the cells in a mob way. This is the difference of flushing + // between a normal and a mob store. + performMobFlush(snapshot, cacheFlushId, scanner, writer, status); + } finally { + finalizeWriter(writer, cacheFlushId, status); + } + } + } finally { + scanner.close(); + } + LOG.info("Flushed, sequenceid=" + cacheFlushId + ", memsize=" - + snapshot.getSize() + ", hasBloomFilter=" + writer.hasGeneralBloom() - + ", into tmp file " + writer.getPath()); ++ + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getSize(), "", 1) + ++ ", hasBloomFilter=" + writer.hasGeneralBloom() + ++ ", into tmp file " + writer.getPath()); + result.add(writer.getPath()); + return result; + } + + /** + * Flushes the cells in the mob store. + * <ol>In the mob store, the cells with PUT type might have or have no mob tags. + * <li>If a cell does not have a mob tag, flushing the cell to different files depends + * on the value length. If the length is larger than a threshold, it's flushed to a + * mob file and the mob file is flushed to a store file in HBase. Otherwise, directly + * flush the cell to a store file in HBase.</li> + * <li>If a cell have a mob tag, its value is a mob file name, directly flush it + * to a store file in HBase.</li> + * </ol> + * @param snapshot Memstore snapshot. + * @param cacheFlushId Log cache flush sequence number. + * @param scanner The scanner of memstore snapshot. + * @param writer The store file writer. + * @param status Task that represents the flush operation and may be updated with status. + * @throws IOException + */ + protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, + InternalScanner scanner, StoreFile.Writer writer, MonitoredTask status) throws IOException { + StoreFile.Writer mobFileWriter = null; + int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, + HConstants.COMPACTION_KV_MAX_DEFAULT); + long mobCount = 0; + long mobSize = 0; + long time = snapshot.getTimeRangeTracker().getMaximumTimestamp(); + mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(), + store.getFamily().getCompression(), store.getRegionInfo().getStartKey()); + // the target path is {tableName}/.mob/{cfName}/mobFiles + // the relative path is mobFiles + byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); + try { + Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName() + .getName()); + List<Cell> cells = new ArrayList<Cell>(); + boolean hasMore; ++ ScannerContext scannerContext = ++ ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); ++ + do { - hasMore = scanner.next(cells, compactionKVMax); ++ hasMore = scanner.next(cells, scannerContext); + if (!cells.isEmpty()) { + for (Cell c : cells) { + // If we know that this KV is going to be included always, then let us + // set its memstoreTS to 0. This will help us save space when writing to + // disk. + KeyValue kv = KeyValueUtil.ensureKeyValue(c); + if (kv.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(kv) + || kv.getTypeByte() != KeyValue.Type.Put.getCode()) { + writer.append(kv); + } else { + // append the original keyValue in the mob file. + mobFileWriter.append(kv); + mobSize += kv.getValueLength(); + mobCount++; + + // append the tags to the KeyValue. + // The key is same, the value is the filename of the mob file + KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag); + writer.append(reference); + } + } + cells.clear(); + } + } while (hasMore); + } finally { + status.setStatus("Flushing mob file " + store + ": appending metadata"); + mobFileWriter.appendMetadata(cacheFlushId, false, mobCount); + status.setStatus("Flushing mob file " + store + ": closing flushed file"); + mobFileWriter.close(); + } + + if (mobCount > 0) { + // commit the mob file from temp folder to target folder. + // If the mob file is committed successfully but the store file is not, + // the committed mob file will be handled by the sweep tool as an unused + // file. + mobStore.commitFile(mobFileWriter.getPath(), targetPath); + mobStore.updateMobFlushCount(); + mobStore.updateMobFlushedCellsCount(mobCount); + mobStore.updateMobFlushedCellsSize(mobSize); + } else { + try { + // If the mob file is empty, delete it instead of committing. + store.getFileSystem().delete(mobFileWriter.getPath(), true); + } catch (IOException e) { + LOG.error("Fail to delete the temp mob file", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java index 0778ac1,0000000..718b513 mode 100644,000000..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java @@@ -1,646 -1,0 +1,643 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.filecompactions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; - import org.apache.hadoop.hbase.client.HTable; - import org.apache.hadoop.hbase.client.Scan; ++import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType; +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition; +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartitionId; - import org.apache.hadoop.hbase.regionserver.BloomType; - import org.apache.hadoop.hbase.regionserver.HStore; - import org.apache.hadoop.hbase.regionserver.ScanInfo; - import org.apache.hadoop.hbase.regionserver.ScanType; - import org.apache.hadoop.hbase.regionserver.StoreFile; ++import org.apache.hadoop.hbase.regionserver.*; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; - import org.apache.hadoop.hbase.regionserver.StoreFileInfo; - import org.apache.hadoop.hbase.regionserver.StoreFileScanner; - import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +/** + * An implementation of {@link MobFileCompactor} that compacts the mob files in partitions. + */ [email protected] +public class PartitionedMobFileCompactor extends MobFileCompactor { + + private static final Log LOG = LogFactory.getLog(PartitionedMobFileCompactor.class); + protected long mergeableSize; + protected int delFileMaxCount; + /** The number of files compacted in a batch */ + protected int compactionBatchSize; + protected int compactionKVMax; + + private Path tempPath; + private Path bulkloadPath; + private CacheConfig compactionCacheConfig; + private Tag tableNameTag; + + public PartitionedMobFileCompactor(Configuration conf, FileSystem fs, TableName tableName, + HColumnDescriptor column, ExecutorService pool) { + super(conf, fs, tableName, column, pool); + mergeableSize = conf.getLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD); + delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT, + MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); + // default is 100 + compactionBatchSize = conf.getInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE); + tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME); + bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path( + tableName.getNamespaceAsString(), tableName.getQualifierAsString()))); + compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, + HConstants.COMPACTION_KV_MAX_DEFAULT); + Configuration copyOfConf = new Configuration(conf); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + compactionCacheConfig = new CacheConfig(copyOfConf); + tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName()); + } + + @Override + public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles) throws IOException { + if (files == null || files.isEmpty()) { + LOG.info("No candidate mob files"); + return null; + } + LOG.info("isForceAllFiles: " + isForceAllFiles); + // find the files to compact. + PartitionedMobFileCompactionRequest request = select(files, isForceAllFiles); + // compact the files. + return performCompaction(request); + } + + /** + * Selects the compacted mob/del files. + * Iterates the candidates to find out all the del files and small mob files. + * @param candidates All the candidates. + * @param isForceAllFiles Whether add all mob files into the compaction. + * @return A compaction request. + * @throws IOException + */ + protected PartitionedMobFileCompactionRequest select(List<FileStatus> candidates, + boolean isForceAllFiles) throws IOException { + Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>(); + Map<CompactionPartitionId, CompactionPartition> filesToCompact = + new HashMap<CompactionPartitionId, CompactionPartition>(); + int selectedFileCount = 0; + int irrelevantFileCount = 0; + for (FileStatus file : candidates) { + if (!file.isFile()) { + irrelevantFileCount++; + continue; + } + // group the del files and small files. + FileStatus linkedFile = file; + if (HFileLink.isHFileLink(file.getPath())) { + HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); + linkedFile = getLinkedFileStatus(link); + if (linkedFile == null) { + // If the linked file cannot be found, regard it as an irrelevantFileCount file + irrelevantFileCount++; + continue; + } + } + if (StoreFileInfo.isDelFile(linkedFile.getPath())) { + allDelFiles.add(file); + } else if (isForceAllFiles || linkedFile.getLen() < mergeableSize) { + // add all files if isForceAllFiles is true, + // otherwise add the small files to the merge pool + MobFileName fileName = MobFileName.create(linkedFile.getPath().getName()); + CompactionPartitionId id = new CompactionPartitionId(fileName.getStartKey(), + fileName.getDate()); + CompactionPartition compactionPartition = filesToCompact.get(id); + if (compactionPartition == null) { + compactionPartition = new CompactionPartition(id); + compactionPartition.addFile(file); + filesToCompact.put(id, compactionPartition); + } else { + compactionPartition.addFile(file); + } + selectedFileCount++; + } + } + PartitionedMobFileCompactionRequest request = new PartitionedMobFileCompactionRequest( + filesToCompact.values(), allDelFiles); + if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) { + // all the files are selected + request.setCompactionType(CompactionType.ALL_FILES); + } + LOG.info("The compaction type is " + request.getCompactionType() + ", the request has " + + allDelFiles.size() + " del files, " + selectedFileCount + " selected files, and " + + irrelevantFileCount + " irrelevant files"); + return request; + } + + /** + * Performs the compaction on the selected files. + * <ol> + * <li>Compacts the del files.</li> + * <li>Compacts the selected small mob files and all the del files.</li> + * <li>If all the candidates are selected, delete the del files.</li> + * </ol> + * @param request The compaction request. + * @return The paths of new mob files generated in the compaction. + * @throws IOException + */ + protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request) + throws IOException { + // merge the del files + List<Path> delFilePaths = new ArrayList<Path>(); + for (FileStatus delFile : request.delFiles) { + delFilePaths.add(delFile.getPath()); + } + List<Path> newDelPaths = compactDelFiles(request, delFilePaths); + List<StoreFile> newDelFiles = new ArrayList<StoreFile>(); + for (Path newDelPath : newDelPaths) { + StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE); + newDelFiles.add(sf); + } + LOG.info("After merging, there are " + newDelFiles.size() + " del files"); + // compact the mob files by partitions. + List<Path> paths = compactMobFiles(request, newDelFiles); + LOG.info("After compaction, there are " + paths.size() + " mob files"); + // archive the del files if all the mob files are selected. + if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) { + LOG.info("After a mob file compaction with all files selected, archiving the del files " + + newDelFiles); + try { + MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles); + } catch (IOException e) { + LOG.error("Failed to archive the del files " + newDelFiles, e); + } + } + return paths; + } + + /** + * Compacts the selected small mob files and all the del files. + * @param request The compaction request. + * @param delFiles The del files. + * @return The paths of new mob files after compactions. + * @throws IOException + */ + protected List<Path> compactMobFiles(final PartitionedMobFileCompactionRequest request, + final List<StoreFile> delFiles) throws IOException { + Collection<CompactionPartition> partitions = request.compactionPartitions; + if (partitions == null || partitions.isEmpty()) { + LOG.info("No partitions of mob files"); + return Collections.emptyList(); + } + List<Path> paths = new ArrayList<Path>(); - final HTable table = new HTable(conf, tableName); ++ Connection c = ConnectionFactory.createConnection(conf); ++ final Table table = c.getTable(tableName); + try { + Map<CompactionPartitionId, Future<List<Path>>> results = + new HashMap<CompactionPartitionId, Future<List<Path>>>(); + // compact the mob files by partitions in parallel. + for (final CompactionPartition partition : partitions) { + results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() { + @Override + public List<Path> call() throws Exception { + LOG.info("Compacting mob files for partition " + partition.getPartitionId()); + return compactMobFilePartition(request, partition, delFiles, table); + } + })); + } + // compact the partitions in parallel. + boolean hasFailure = false; + for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) { + try { + paths.addAll(result.getValue().get()); + } catch (Exception e) { + // just log the error + LOG.error("Failed to compact the partition " + result.getKey(), e); + hasFailure = true; + } + } + if (hasFailure) { + // if any partition fails in the compaction, directly throw an exception. + throw new IOException("Failed to compact the partitions"); + } + } finally { + try { + table.close(); + } catch (IOException e) { + LOG.error("Failed to close the HTable", e); + } + } + return paths; + } + + /** + * Compacts a partition of selected small mob files and all the del files. + * @param request The compaction request. + * @param partition A compaction partition. + * @param delFiles The del files. + * @param table The current table. + * @return The paths of new mob files after compactions. + * @throws IOException + */ + private List<Path> compactMobFilePartition(PartitionedMobFileCompactionRequest request, - CompactionPartition partition, List<StoreFile> delFiles, HTable table) throws IOException { ++ CompactionPartition partition, List<StoreFile> delFiles, Table table) throws IOException { + List<Path> newFiles = new ArrayList<Path>(); + List<FileStatus> files = partition.listFiles(); + int offset = 0; + Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString()); + Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString()); + while (offset < files.size()) { + int batch = compactionBatchSize; + if (files.size() - offset < compactionBatchSize) { + batch = files.size() - offset; + } + if (batch == 1 && delFiles.isEmpty()) { + // only one file left and no del files, do not compact it, + // and directly add it to the new files. + newFiles.add(files.get(offset).getPath()); + offset++; + continue; + } + // clean the bulkload directory to avoid loading old files. + fs.delete(bulkloadPathOfPartition, true); + // add the selected mob files and del files into filesToCompact + List<StoreFile> filesToCompact = new ArrayList<StoreFile>(); + for (int i = offset; i < batch + offset; i++) { + StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig, + BloomType.NONE); + filesToCompact.add(sf); + } + filesToCompact.addAll(delFiles); + // compact the mob files in a batch. + compactMobFilesInBatch(request, partition, table, filesToCompact, batch, + bulkloadPathOfPartition, bulkloadColumnPath, newFiles); + // move to the next batch. + offset += batch; + } + LOG.info("Compaction is finished. The number of mob files is changed from " + files.size() + + " to " + newFiles.size()); + return newFiles; + } + + /** + * Compacts a partition of selected small mob files and all the del files in a batch. + * @param request The compaction request. + * @param partition A compaction partition. + * @param table The current table. + * @param filesToCompact The files to be compacted. + * @param batch The number of mob files to be compacted in a batch. + * @param bulkloadPathOfPartition The directory where the bulkload column of the current + * partition is saved. + * @param bulkloadColumnPath The directory where the bulkload files of current partition + * are saved. + * @param newFiles The paths of new mob files after compactions. + * @throws IOException + */ + private void compactMobFilesInBatch(PartitionedMobFileCompactionRequest request, - CompactionPartition partition, HTable table, List<StoreFile> filesToCompact, int batch, ++ CompactionPartition partition, Table table, List<StoreFile> filesToCompact, int batch, + Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles) + throws IOException { + // open scanner to the selected mob files and del files. + StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES); + // the mob files to be compacted, not include the del files. + List<StoreFile> mobFilesToCompact = filesToCompact.subList(0, batch); + // Pair(maxSeqId, cellsCount) + Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact); + // open writers for the mob files and new ref store files. + Writer writer = null; + Writer refFileWriter = null; + Path filePath = null; + Path refFilePath = null; + long mobCells = 0; + try { + writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(), + tempPath, Long.MAX_VALUE, column.getCompactionCompression(), partition.getPartitionId() + .getStartKey(), compactionCacheConfig); + filePath = writer.getPath(); + byte[] fileName = Bytes.toBytes(filePath.getName()); + // create a temp file and open a writer for it in the bulkloadPath + refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo + .getSecond().longValue(), compactionCacheConfig); + refFilePath = refFileWriter.getPath(); + List<Cell> cells = new ArrayList<Cell>(); + boolean hasMore = false; ++ ScannerContext scannerContext = ++ ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + do { - hasMore = scanner.next(cells, compactionKVMax); ++ hasMore = scanner.next(cells, scannerContext); + for (Cell cell : cells) { + // TODO remove this after the new code are introduced. + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + // write the mob cell to the mob file. + writer.append(kv); + // write the new reference cell to the store file. + KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag); + refFileWriter.append(reference); + mobCells++; + } + cells.clear(); + } while (hasMore); + } finally { + // close the scanner. + scanner.close(); + // append metadata to the mob file, and close the mob file writer. + closeMobFileWriter(writer, fileInfo.getFirst(), mobCells); + // append metadata and bulkload info to the ref mob file, and close the writer. + closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime); + } + if (mobCells > 0) { + // commit mob file + MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); + // bulkload the ref file + bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName()); + newFiles.add(new Path(mobFamilyDir, filePath.getName())); + } else { + // remove the new files + // the mob file is empty, delete it instead of committing. + deletePath(filePath); + // the ref file is empty, delete it instead of committing. + deletePath(refFilePath); + } + // archive the old mob files, do not archive the del files. + try { + MobUtils + .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact); + } catch (IOException e) { + LOG.error("Failed to archive the files " + mobFilesToCompact, e); + } + } + + /** + * Compacts the del files in batches which avoids opening too many files. + * @param request The compaction request. + * @param delFilePaths + * @return The paths of new del files after merging or the original files if no merging + * is necessary. + * @throws IOException + */ + protected List<Path> compactDelFiles(PartitionedMobFileCompactionRequest request, + List<Path> delFilePaths) throws IOException { + if (delFilePaths.size() <= delFileMaxCount) { + return delFilePaths; + } + // when there are more del files than the number that is allowed, merge it firstly. + int offset = 0; + List<Path> paths = new ArrayList<Path>(); + while (offset < delFilePaths.size()) { + // get the batch + int batch = compactionBatchSize; + if (delFilePaths.size() - offset < compactionBatchSize) { + batch = delFilePaths.size() - offset; + } + List<StoreFile> batchedDelFiles = new ArrayList<StoreFile>(); + if (batch == 1) { + // only one file left, do not compact it, directly add it to the new files. + paths.add(delFilePaths.get(offset)); + offset++; + continue; + } + for (int i = offset; i < batch + offset; i++) { + batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig, + BloomType.NONE)); + } + // compact the del files in a batch. + paths.add(compactDelFilesInBatch(request, batchedDelFiles)); + // move to the next batch. + offset += batch; + } + return compactDelFiles(request, paths); + } + + /** + * Compacts the del file in a batch. + * @param request The compaction request. + * @param delFiles The del files. + * @return The path of new del file after merging. + * @throws IOException + */ + private Path compactDelFilesInBatch(PartitionedMobFileCompactionRequest request, + List<StoreFile> delFiles) throws IOException { + // create a scanner for the del files. + StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES); + Writer writer = null; + Path filePath = null; + try { + writer = MobUtils.createDelFileWriter(conf, fs, column, + MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE, + column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig); + filePath = writer.getPath(); + List<Cell> cells = new ArrayList<Cell>(); + boolean hasMore = false; ++ ScannerContext scannerContext = ++ ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + do { - hasMore = scanner.next(cells, compactionKVMax); ++ hasMore = scanner.next(cells, scannerContext); + for (Cell cell : cells) { + // TODO remove this after the new code are introduced. + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + writer.append(kv); + } + cells.clear(); + } while (hasMore); + } finally { + scanner.close(); + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the file " + filePath, e); + } + } + } + // commit the new del file + Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); + // archive the old del files + try { + MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles); + } catch (IOException e) { + LOG.error("Failed to archive the old del files " + delFiles, e); + } + return path; + } + + /** + * Creates a store scanner. + * @param filesToCompact The files to be compacted. + * @param scanType The scan type. + * @return The store scanner. + * @throws IOException + */ + private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType) + throws IOException { + List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false, + null, HConstants.LATEST_TIMESTAMP); + Scan scan = new Scan(); + scan.setMaxVersions(column.getMaxVersions()); + long ttl = HStore.determineTTLFromFamily(column); + ScanInfo scanInfo = new ScanInfo(column, ttl, 0, KeyValue.COMPARATOR); + StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L, + HConstants.LATEST_TIMESTAMP); + return scanner; + } + + /** + * Bulkloads the current file. + * @param table The current table. + * @param bulkloadDirectory The path of bulkload directory. + * @param fileName The current file name. + * @throws IOException + */ - private void bulkloadRefFile(HTable table, Path bulkloadDirectory, String fileName) ++ private void bulkloadRefFile(Table table, Path bulkloadDirectory, String fileName) + throws IOException { + // bulkload the ref file + try { + LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); - bulkload.doBulkLoad(bulkloadDirectory, table); ++ bulkload.doBulkLoad(bulkloadDirectory, (HTable)table); + } catch (Exception e) { + // delete the committed mob file + deletePath(new Path(mobFamilyDir, fileName)); + throw new IOException(e); + } finally { + // delete the bulkload files in bulkloadPath + deletePath(bulkloadDirectory); + } + } + + /** + * Closes the mob file writer. + * @param writer The mob file writer. + * @param maxSeqId Maximum sequence id. + * @param mobCellsCount The number of mob cells. + * @throws IOException + */ + private void closeMobFileWriter(Writer writer, long maxSeqId, long mobCellsCount) + throws IOException { + if (writer != null) { + writer.appendMetadata(maxSeqId, false, mobCellsCount); + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the file " + writer.getPath(), e); + } + } + } + + /** + * Closes the ref file writer. + * @param writer The ref file writer. + * @param maxSeqId Maximum sequence id. + * @param bulkloadTime The timestamp at which the bulk load file is created. + * @throws IOException + */ + private void closeRefFileWriter(Writer writer, long maxSeqId, long bulkloadTime) + throws IOException { + if (writer != null) { + writer.appendMetadata(maxSeqId, false); + writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime)); + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e); + } + } + } + + /** + * Gets the max seqId and number of cells of the store files. + * @param storeFiles The store files. + * @return The pair of the max seqId and number of cells of the store files. + * @throws IOException + */ + private Pair<Long, Long> getFileInfo(List<StoreFile> storeFiles) throws IOException { + long maxSeqId = 0; + long maxKeyCount = 0; + for (StoreFile sf : storeFiles) { + // the readers will be closed later after the merge. + maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId()); + byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT); + if (count != null) { + maxKeyCount += Bytes.toLong(count); + } + } + return new Pair<Long, Long>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount)); + } + + /** + * Deletes a file. + * @param path The path of the file to be deleted. + */ + private void deletePath(Path path) { + try { + if (path != null) { + fs.delete(path, true); + } + } catch (IOException e) { + LOG.error("Failed to delete the file " + path, e); + } + } + + private FileStatus getLinkedFileStatus(HFileLink link) throws IOException { + Path[] locations = link.getLocations(); + for (Path location : locations) { + FileStatus file = getFileStatus(location); + if (file != null) { + return file; + } + } + return null; + } + + private FileStatus getFileStatus(Path path) throws IOException { + try { + if (path != null) { + FileStatus file = fs.getFileStatus(path); + return file; + } + } catch (FileNotFoundException e) { + LOG.warn("The file " + path + " can not be found", e); + } + return null; + } +}
