http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index 0000000,d283729..424a39b mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@@ -1,0 -1,897 +1,898 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob; + + import java.io.FileNotFoundException; + import java.io.IOException; + import java.security.Key; + import java.security.KeyException; + import java.text.ParseException; + import java.text.SimpleDateFormat; + import java.util.ArrayList; + import java.util.Collection; + import java.util.Date; + import java.util.List; + import java.util.UUID; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.RejectedExecutionException; + import java.util.concurrent.RejectedExecutionHandler; + import java.util.concurrent.SynchronousQueue; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FileStatus; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hbase.Cell; + import org.apache.hadoop.hbase.CellComparator; + import org.apache.hadoop.hbase.HBaseConfiguration; + import org.apache.hadoop.hbase.HColumnDescriptor; + import org.apache.hadoop.hbase.HConstants; + import org.apache.hadoop.hbase.HRegionInfo; + import org.apache.hadoop.hbase.HTableDescriptor; + import org.apache.hadoop.hbase.KeyValue; + import org.apache.hadoop.hbase.TableName; + import org.apache.hadoop.hbase.Tag; + import org.apache.hadoop.hbase.TagType; + import org.apache.hadoop.hbase.backup.HFileArchiver; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.client.Scan; + import org.apache.hadoop.hbase.io.HFileLink; + import org.apache.hadoop.hbase.io.compress.Compression; + import org.apache.hadoop.hbase.io.crypto.Cipher; + import org.apache.hadoop.hbase.io.crypto.Encryption; + import org.apache.hadoop.hbase.io.hfile.CacheConfig; + import org.apache.hadoop.hbase.io.hfile.HFileContext; + import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; + import org.apache.hadoop.hbase.master.TableLockManager; + import org.apache.hadoop.hbase.master.TableLockManager.TableLock; + import org.apache.hadoop.hbase.mob.compactions.MobCompactor; + import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor; + import org.apache.hadoop.hbase.regionserver.BloomType; + import org.apache.hadoop.hbase.regionserver.HStore; + import org.apache.hadoop.hbase.regionserver.StoreFile; + import org.apache.hadoop.hbase.security.EncryptionUtil; + import org.apache.hadoop.hbase.security.User; + import org.apache.hadoop.hbase.util.Bytes; + import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + import org.apache.hadoop.hbase.util.FSUtils; + import org.apache.hadoop.hbase.util.ReflectionUtils; + import org.apache.hadoop.hbase.util.Threads; + + /** + * The mob utilities + */ + @InterfaceAudience.Private + public class MobUtils { + + private static final Log LOG = LogFactory.getLog(MobUtils.class); + + private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT = + new ThreadLocal<SimpleDateFormat>() { + @Override + protected SimpleDateFormat initialValue() { + return new SimpleDateFormat("yyyyMMdd"); + } + }; + + /** + * Formats a date to a string. + * @param date The date. + * @return The string format of the date, it's yyyymmdd. + */ + public static String formatDate(Date date) { + return LOCAL_FORMAT.get().format(date); + } + + /** + * Parses the string to a date. + * @param dateString The string format of a date, it's yyyymmdd. + * @return A date. + * @throws ParseException + */ + public static Date parseDate(String dateString) throws ParseException { + return LOCAL_FORMAT.get().parse(dateString); + } + + /** + * Whether the current cell is a mob reference cell. + * @param cell The current cell. + * @return True if the cell has a mob reference tag, false if it doesn't. + */ + public static boolean isMobReferenceCell(Cell cell) { + if (cell.getTagsLength() > 0) { + Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(), + TagType.MOB_REFERENCE_TAG_TYPE); + return tag != null; + } + return false; + } + + /** + * Gets the table name tag. + * @param cell The current cell. + * @return The table name tag. + */ + public static Tag getTableNameTag(Cell cell) { + if (cell.getTagsLength() > 0) { + Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(), + TagType.MOB_TABLE_NAME_TAG_TYPE); + return tag; + } + return null; + } + + /** + * Whether the tag list has a mob reference tag. + * @param tags The tag list. + * @return True if the list has a mob reference tag, false if it doesn't. + */ + public static boolean hasMobReferenceTag(List<Tag> tags) { + if (!tags.isEmpty()) { + for (Tag tag : tags) { + if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) { + return true; + } + } + } + return false; + } + + /** + * Indicates whether it's a raw scan. + * The information is set in the attribute "hbase.mob.scan.raw" of scan. + * For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file. + * In a raw scan, the scanner directly returns cell in HBase without retrieve the one in + * the mob file. + * @param scan The current scan. + * @return True if it's a raw scan. + */ + public static boolean isRawMobScan(Scan scan) { + byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW); + try { + return raw != null && Bytes.toBoolean(raw); + } catch (IllegalArgumentException e) { + return false; + } + } + + /** + * Indicates whether it's a reference only scan. + * The information is set in the attribute "hbase.mob.scan.ref.only" of scan. + * If it's a ref only scan, only the cells with ref tag are returned. + * @param scan The current scan. + * @return True if it's a ref only scan. + */ + public static boolean isRefOnlyScan(Scan scan) { + byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY); + try { + return refOnly != null && Bytes.toBoolean(refOnly); + } catch (IllegalArgumentException e) { + return false; + } + } + + /** + * Indicates whether the scan contains the information of caching blocks. + * The information is set in the attribute "hbase.mob.cache.blocks" of scan. + * @param scan The current scan. + * @return True when the Scan attribute specifies to cache the MOB blocks. + */ + public static boolean isCacheMobBlocks(Scan scan) { + byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS); + try { + return cache != null && Bytes.toBoolean(cache); + } catch (IllegalArgumentException e) { + return false; + } + } + + /** + * Sets the attribute of caching blocks in the scan. + * + * @param scan + * The current scan. + * @param cacheBlocks + * True, set the attribute of caching blocks into the scan, the scanner with this scan + * caches blocks. + * False, the scanner doesn't cache blocks for this scan. + */ + public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) { + scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks)); + } + + /** + * Cleans the expired mob files. + * Cleans the files whose creation date is older than (current - columnFamily.ttl), and + * the minVersions of that column family is 0. + * @param fs The current file system. + * @param conf The current configuration. + * @param tableName The current table name. + * @param columnDescriptor The descriptor of the current column family. + * @param cacheConfig The cacheConfig that disables the block cache. + * @param current The current time. + * @throws IOException + */ + public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName, + HColumnDescriptor columnDescriptor, CacheConfig cacheConfig, long current) + throws IOException { + long timeToLive = columnDescriptor.getTimeToLive(); + if (Integer.MAX_VALUE == timeToLive) { + // no need to clean, because the TTL is not set. + return; + } + + Date expireDate = new Date(current - timeToLive * 1000); + expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate()); + LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!"); + + FileStatus[] stats = null; + Path mobTableDir = FSUtils.getTableDir(getMobHome(conf), tableName); + Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString()); + try { + stats = fs.listStatus(path); + } catch (FileNotFoundException e) { + LOG.warn("Failed to find the mob file " + path, e); + } + if (null == stats) { + // no file found + return; + } + List<StoreFile> filesToClean = new ArrayList<StoreFile>(); + int deletedFileCount = 0; + for (FileStatus file : stats) { + String fileName = file.getPath().getName(); + try { + MobFileName mobFileName = null; + if (!HFileLink.isHFileLink(file.getPath())) { + mobFileName = MobFileName.create(fileName); + } else { + HFileLink hfileLink = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); + mobFileName = MobFileName.create(hfileLink.getOriginPath().getName()); + } + Date fileDate = parseDate(mobFileName.getDate()); + if (LOG.isDebugEnabled()) { + LOG.debug("Checking file " + fileName); + } + if (fileDate.getTime() < expireDate.getTime()) { + if (LOG.isDebugEnabled()) { + LOG.debug(fileName + " is an expired file"); + } + filesToClean.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE)); + } + } catch (Exception e) { + LOG.error("Cannot parse the fileName " + fileName, e); + } + } + if (!filesToClean.isEmpty()) { + try { + removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(), + filesToClean); + deletedFileCount = filesToClean.size(); + } catch (IOException e) { + LOG.error("Failed to delete the mob files " + filesToClean, e); + } + } + LOG.info(deletedFileCount + " expired mob files are deleted"); + } + + /** + * Gets the root dir of the mob files. + * It's {HBASE_DIR}/mobdir. + * @param conf The current configuration. + * @return the root dir of the mob file. + */ + public static Path getMobHome(Configuration conf) { + Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR)); + return new Path(hbaseDir, MobConstants.MOB_DIR_NAME); + } + + /** + * Gets the qualified root dir of the mob files. + * @param conf The current configuration. + * @return The qualified root dir. + * @throws IOException + */ + public static Path getQualifiedMobRootDir(Configuration conf) throws IOException { + Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR)); + Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME); + FileSystem fs = mobRootDir.getFileSystem(conf); + return mobRootDir.makeQualified(fs); + } + + /** + * Gets the region dir of the mob files. + * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}. + * @param conf The current configuration. + * @param tableName The current table name. + * @return The region dir of the mob files. + */ + public static Path getMobRegionPath(Configuration conf, TableName tableName) { + Path tablePath = FSUtils.getTableDir(getMobHome(conf), tableName); + HRegionInfo regionInfo = getMobRegionInfo(tableName); + return new Path(tablePath, regionInfo.getEncodedName()); + } + + /** + * Gets the family dir of the mob files. + * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}. + * @param conf The current configuration. + * @param tableName The current table name. + * @param familyName The current family name. + * @return The family dir of the mob files. + */ + public static Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) { + return new Path(getMobRegionPath(conf, tableName), familyName); + } + + /** + * Gets the family dir of the mob files. + * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}. + * @param regionPath The path of mob region which is a dummy one. + * @param familyName The current family name. + * @return The family dir of the mob files. + */ + public static Path getMobFamilyPath(Path regionPath, String familyName) { + return new Path(regionPath, familyName); + } + + /** + * Gets the HRegionInfo of the mob files. + * This is a dummy region. The mob files are not saved in a region in HBase. + * This is only used in mob snapshot. It's internally used only. + * @param tableName + * @return A dummy mob region info. + */ + public static HRegionInfo getMobRegionInfo(TableName tableName) { + HRegionInfo info = new HRegionInfo(tableName, MobConstants.MOB_REGION_NAME_BYTES, + HConstants.EMPTY_END_ROW, false, 0); + return info; + } + + /** + * Gets whether the current HRegionInfo is a mob one. + * @param regionInfo The current HRegionInfo. + * @return If true, the current HRegionInfo is a mob one. + */ + public static boolean isMobRegionInfo(HRegionInfo regionInfo) { + return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName() + .equals(regionInfo.getEncodedName()); + } + + /** + * Gets whether the current region name follows the pattern of a mob region name. + * @param tableName The current table name. + * @param regionName The current region name. + * @return True if the current region name follows the pattern of a mob region name. + */ + public static boolean isMobRegionName(TableName tableName, byte[] regionName) { + return Bytes.equals(regionName, getMobRegionInfo(tableName).getRegionName()); + } + + /** + * Gets the working directory of the mob compaction. + * @param root The root directory of the mob compaction. + * @param jobName The current job name. + * @return The directory of the mob compaction for the current job. + */ + public static Path getCompactionWorkingPath(Path root, String jobName) { + return new Path(root, jobName); + } + + /** + * Archives the mob files. + * @param conf The current configuration. + * @param fs The current file system. + * @param tableName The table name. + * @param tableDir The table directory. + * @param family The name of the column family. + * @param storeFiles The files to be deleted. + * @throws IOException + */ + public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName, + Path tableDir, byte[] family, Collection<StoreFile> storeFiles) throws IOException { + HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family, + storeFiles); + } + + /** + * Creates a mob reference KeyValue. + * The value of the mob reference KeyValue is mobCellValueSize + mobFileName. + * @param cell The original Cell. + * @param fileName The mob file name where the mob reference KeyValue is written. + * @param tableNameTag The tag of the current table name. It's very important in + * cloning the snapshot. + * @return The mob reference KeyValue. + */ + public static KeyValue createMobRefKeyValue(Cell cell, byte[] fileName, Tag tableNameTag) { + // Append the tags to the KeyValue. + // The key is same, the value is the filename of the mob file + List<Tag> tags = new ArrayList<Tag>(); + // Add the ref tag as the 1st one. + tags.add(MobConstants.MOB_REF_TAG); + // Add the tag of the source table name, this table is where this mob file is flushed + // from. + // It's very useful in cloning the snapshot. When reading from the cloning table, we need to + // find the original mob files by this table name. For details please see cloning + // snapshot for mob files. + tags.add(tableNameTag); + // Add the existing tags. + tags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength())); + int valueLength = cell.getValueLength(); + byte[] refValue = Bytes.add(Bytes.toBytes(valueLength), fileName); + KeyValue reference = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), + cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), + cell.getTimestamp(), KeyValue.Type.Put, refValue, 0, refValue.length, tags); + reference.setSequenceId(cell.getSequenceId()); + return reference; + } + + /** + * Creates a writer for the mob file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param date The date string, its format is yyyymmmdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The hex string of the start key. + * @param cacheConfig The current cache config. + * @param cryptoContext The encryption context. + * @return The writer for the mob file. + * @throws IOException + */ + public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, String date, Path basePath, long maxKeyCount, + Compression.Algorithm compression, String startKey, CacheConfig cacheConfig, + Encryption.Context cryptoContext) + throws IOException { + MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString() + .replaceAll("-", "")); + return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, + cacheConfig, cryptoContext); + } + + /** + * Creates a writer for the ref file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param cacheConfig The current cache config. + * @param cryptoContext The encryption context. + * @return The writer for the mob file. + * @throws IOException + */ + public static StoreFile.Writer createRefFileWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig, + Encryption.Context cryptoContext) + throws IOException { + HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(true) + .withIncludesTags(true).withCompression(family.getCompactionCompression()) + .withCompressTags(family.isCompressTags()).withChecksumType(HStore.getChecksumType(conf)) + .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize()) + .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()) + .withEncryptionContext(cryptoContext).withCreateTime(EnvironmentEdgeManager.currentTime()) + .build(); + Path tempPath = new Path(basePath, UUID.randomUUID().toString().replaceAll("-", "")); + StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs).withFilePath(tempPath) + .withComparator(CellComparator.COMPARATOR).withBloomType(family.getBloomFilterType()) + .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); + return w; + } + + /** + * Creates a writer for the mob file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param date The date string, its format is yyyymmmdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @param cacheConfig The current cache config. + * @param cryptoContext The encryption context. + * @return The writer for the mob file. + * @throws IOException + */ + public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, String date, Path basePath, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig, + Encryption.Context cryptoContext) + throws IOException { + MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString() + .replaceAll("-", "")); + return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, + cacheConfig, cryptoContext); + } + + /** + * Creates a writer for the del file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param date The date string, its format is yyyymmmdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @param cacheConfig The current cache config. + * @param cryptoContext The encryption context. + * @return The writer for the del file. + * @throws IOException + */ + public static StoreFile.Writer createDelFileWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, String date, Path basePath, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig, + Encryption.Context cryptoContext) + throws IOException { + String suffix = UUID + .randomUUID().toString().replaceAll("-", "") + "_del"; + MobFileName mobFileName = MobFileName.create(startKey, date, suffix); + return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, + cacheConfig, cryptoContext); + } + + /** + * Creates a writer for the mob file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param mobFileName The mob file name. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param cacheConfig The current cache config. + * @param cryptoContext The encryption context. + * @return The writer for the mob file. + * @throws IOException + */ + private static StoreFile.Writer createWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount, + Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext) + throws IOException { + HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) + .withIncludesMvcc(true).withIncludesTags(true) + .withCompressTags(family.isCompressTags()) + .withChecksumType(HStore.getChecksumType(conf)) + .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize()) + .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()) + .withEncryptionContext(cryptoContext) + .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); + + StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs) + .withFilePath(new Path(basePath, mobFileName.getFileName())) + .withComparator(CellComparator.COMPARATOR).withBloomType(BloomType.NONE) + .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); + return w; + } + + /** + * Commits the mob file. + * @param conf The current configuration. + * @param fs The current file system. + * @param sourceFile The path where the mob file is saved. + * @param targetPath The directory path where the source file is renamed to. + * @param cacheConfig The current cache config. + * @return The target file path the source file is renamed to. + * @throws IOException + */ + public static Path commitFile(Configuration conf, FileSystem fs, final Path sourceFile, + Path targetPath, CacheConfig cacheConfig) throws IOException { + if (sourceFile == null) { + return null; + } + Path dstPath = new Path(targetPath, sourceFile.getName()); + validateMobFile(conf, fs, sourceFile, cacheConfig); + String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath; + LOG.info(msg); + Path parent = dstPath.getParent(); + if (!fs.exists(parent)) { + fs.mkdirs(parent); + } + if (!fs.rename(sourceFile, dstPath)) { + throw new IOException("Failed rename of " + sourceFile + " to " + dstPath); + } + return dstPath; + } + + /** + * Validates a mob file by opening and closing it. + * @param conf The current configuration. + * @param fs The current file system. + * @param path The path where the mob file is saved. + * @param cacheConfig The current cache config. + */ + private static void validateMobFile(Configuration conf, FileSystem fs, Path path, + CacheConfig cacheConfig) throws IOException { + StoreFile storeFile = null; + try { + storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE); + storeFile.createReader(); + } catch (IOException e) { + LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e); + throw e; + } finally { + if (storeFile != null) { + storeFile.closeReader(false); + } + } + } + + /** + * Indicates whether the current mob ref cell has a valid value. + * A mob ref cell has a mob reference tag. + * The value of a mob ref cell consists of two parts, real mob value length and mob file name. + * The real mob value length takes 4 bytes. + * The remaining part is the mob file name. + * @param cell The mob ref cell. + * @return True if the cell has a valid value. + */ + public static boolean hasValidMobRefCellValue(Cell cell) { + return cell.getValueLength() > Bytes.SIZEOF_INT; + } + + /** + * Gets the mob value length from the mob ref cell. + * A mob ref cell has a mob reference tag. + * The value of a mob ref cell consists of two parts, real mob value length and mob file name. + * The real mob value length takes 4 bytes. + * The remaining part is the mob file name. + * @param cell The mob ref cell. + * @return The real mob value length. + */ + public static int getMobValueLength(Cell cell) { + return Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), Bytes.SIZEOF_INT); + } + + /** + * Gets the mob file name from the mob ref cell. + * A mob ref cell has a mob reference tag. + * The value of a mob ref cell consists of two parts, real mob value length and mob file name. + * The real mob value length takes 4 bytes. + * The remaining part is the mob file name. + * @param cell The mob ref cell. + * @return The mob file name. + */ + public static String getMobFileName(Cell cell) { + return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT, + cell.getValueLength() - Bytes.SIZEOF_INT); + } + + /** + * Gets the table name used in the table lock. + * The table lock name is a dummy one, it's not a table name. It's tableName + ".mobLock". + * @param tn The table name. + * @return The table name used in table lock. + */ + public static TableName getTableLockName(TableName tn) { + byte[] tableName = tn.getName(); + return TableName.valueOf(Bytes.add(tableName, MobConstants.MOB_TABLE_LOCK_SUFFIX)); + } + + /** + * Performs the mob compaction. + * @param conf the Configuration + * @param fs the file system + * @param tableName the table the compact + * @param hcd the column descriptor + * @param pool the thread pool + * @param tableLockManager the tableLock manager + * @param allFiles Whether add all mob files into the compaction. + */ + public static void doMobCompaction(Configuration conf, FileSystem fs, TableName tableName, + HColumnDescriptor hcd, ExecutorService pool, TableLockManager tableLockManager, + boolean allFiles) throws IOException { + String className = conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY, + PartitionedMobCompactor.class.getName()); + // instantiate the mob compactor. + MobCompactor compactor = null; + try { + compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { + Configuration.class, FileSystem.class, TableName.class, HColumnDescriptor.class, + ExecutorService.class }, new Object[] { conf, fs, tableName, hcd, pool }); + } catch (Exception e) { + throw new IOException("Unable to load configured mob file compactor '" + className + "'", e); + } + // compact only for mob-enabled column. + // obtain a write table lock before performing compaction to avoid race condition + // with major compaction in mob-enabled column. + boolean tableLocked = false; + TableLock lock = null; + try { + // the tableLockManager might be null in testing. In that case, it is lock-free. + if (tableLockManager != null) { + lock = tableLockManager.writeLock(MobUtils.getTableLockName(tableName), + "Run MobCompactor"); + lock.acquire(); + } + tableLocked = true; + compactor.compact(allFiles); + } catch (Exception e) { + LOG.error("Failed to compact the mob files for the column " + hcd.getNameAsString() + + " in the table " + tableName.getNameAsString(), e); + } finally { + if (lock != null && tableLocked) { + try { + lock.release(); + } catch (IOException e) { + LOG.error( + "Failed to release the write lock for the table " + tableName.getNameAsString(), e); + } + } + } + } + + /** + * Creates a thread pool. + * @param conf the Configuration + * @return A thread pool. + */ + public static ExecutorService createMobCompactorThreadPool(Configuration conf) { + int maxThreads = conf.getInt(MobConstants.MOB_COMPACTION_THREADS_MAX, + MobConstants.DEFAULT_MOB_COMPACTION_THREADS_MAX); + if (maxThreads == 0) { + maxThreads = 1; + } + final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>(); + ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, queue, + Threads.newDaemonThreadFactory("MobCompactor"), new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // waiting for a thread to pick up instead of throwing exceptions. + queue.put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException(e); + } + } + }); + ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); + return pool; + } + + /** + * Creates the encyption context. + * @param conf The current configuration. + * @param family The current column descriptor. + * @return The encryption context. + * @throws IOException + */ + public static Encryption.Context createEncryptionContext(Configuration conf, + HColumnDescriptor family) throws IOException { + // TODO the code is repeated, and needs to be unified. + Encryption.Context cryptoContext = Encryption.Context.NONE; + String cipherName = family.getEncryptionType(); + if (cipherName != null) { + Cipher cipher; + Key key; + byte[] keyBytes = family.getEncryptionKey(); + if (keyBytes != null) { + // Family provides specific key material + String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User + .getCurrent().getShortName()); + try { + // First try the master key + key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes); + } catch (KeyException e) { + // If the current master key fails to unwrap, try the alternate, if + // one is configured + if (LOG.isDebugEnabled()) { + LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'"); + } + String alternateKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY); + if (alternateKeyName != null) { + try { + key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes); + } catch (KeyException ex) { + throw new IOException(ex); + } + } else { + throw new IOException(e); + } + } + // Use the algorithm the key wants + cipher = Encryption.getCipher(conf, key.getAlgorithm()); + if (cipher == null) { + throw new RuntimeException("Cipher '" + key.getAlgorithm() + "' is not available"); + } + // Fail if misconfigured + // We use the encryption type specified in the column schema as a sanity check on + // what the wrapped key is telling us + if (!cipher.getName().equalsIgnoreCase(cipherName)) { + throw new RuntimeException("Encryption for family '" + family.getNameAsString() + + "' configured with type '" + cipherName + "' but key specifies algorithm '" + + cipher.getName() + "'"); + } + } else { + // Family does not provide key material, create a random key + cipher = Encryption.getCipher(conf, cipherName); + if (cipher == null) { + throw new RuntimeException("Cipher '" + cipherName + "' is not available"); + } + key = cipher.getRandomKey(); + } + cryptoContext = Encryption.newContext(conf); + cryptoContext.setCipher(cipher); + cryptoContext.setKey(key); + } + return cryptoContext; + } + + /** + * Checks whether this table has mob-enabled columns. + * @param htd The current table descriptor. + * @return Whether this table has mob-enabled columns. + */ + public static boolean hasMobColumns(HTableDescriptor htd) { + HColumnDescriptor[] hcds = htd.getColumnFamilies(); + for (HColumnDescriptor hcd : hcds) { + if (hcd.isMobEnabled()) { + return true; + } + } + return false; + } + + /** + * Indicates whether return null value when the mob file is missing or corrupt. + * The information is set in the attribute "empty.value.on.mobcell.miss" of scan. + * @param scan The current scan. + * @return True if the readEmptyValueOnMobCellMiss is enabled. + */ + public static boolean isReadEmptyValueOnMobCellMiss(Scan scan) { - byte[] readEmptyValueOnMobCellMiss = scan.getAttribute(MobConstants.EMPTY_VALUE_ON_MOBCELL_MISS); ++ byte[] readEmptyValueOnMobCellMiss = ++ scan.getAttribute(MobConstants.EMPTY_VALUE_ON_MOBCELL_MISS); + try { + return readEmptyValueOnMobCellMiss != null && Bytes.toBoolean(readEmptyValueOnMobCellMiss); + } catch (IllegalArgumentException e) { + return false; + } + } - ++ + /** - * Archive mob store files ++ * Archives mob store files + * @param conf The current configuration. + * @param fs The current file system. + * @param mobRegionInfo The mob family region info. + * @param mobFamilyDir The mob family directory. + * @param family The name of the column family. + * @throws IOException + */ + public static void archiveMobStoreFiles(Configuration conf, FileSystem fs, + HRegionInfo mobRegionInfo, Path mobFamilyDir, byte[] family) throws IOException { + // disable the block cache. + Configuration copyOfConf = HBaseConfiguration.create(conf); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + CacheConfig cacheConfig = new CacheConfig(copyOfConf); + FileStatus[] fileStatus = FSUtils.listStatus(fs, mobFamilyDir); + List<StoreFile> storeFileList = new ArrayList<StoreFile>(); + for (FileStatus file : fileStatus) { + storeFileList.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE)); + } + HFileArchiver.archiveStoreFiles(conf, fs, mobRegionInfo, mobFamilyDir, family, storeFileList); + } + }
http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactionRequest.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactionRequest.java index 0000000,5d162b4..08865ee mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactionRequest.java @@@ -1,0 -1,64 +1,64 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob.compactions; + -import org.apache.hadoop.classification.InterfaceAudience; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + + /** + * The compaction request for mob files. + */ + @InterfaceAudience.Private + public abstract class MobCompactionRequest { + + protected long selectionTime; + protected CompactionType type = CompactionType.PART_FILES; + + public void setCompactionType(CompactionType type) { + this.type = type; + } + + /** + * Gets the selection time. + * @return The selection time. + */ + public long getSelectionTime() { + return this.selectionTime; + } + + /** + * Gets the compaction type. + * @return The compaction type. + */ + public CompactionType getCompactionType() { + return type; + } + + protected enum CompactionType { + + /** + * Part of mob files are selected. + */ + PART_FILES, + + /** + * All of mob files are selected. + */ + ALL_FILES; + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactor.java index 0000000,156c6f6..77de0cd mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactor.java @@@ -1,0 -1,90 +1,90 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob.compactions; + + import java.io.IOException; + import java.util.Arrays; + import java.util.List; + import java.util.concurrent.ExecutorService; + -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FileStatus; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hbase.HColumnDescriptor; + import org.apache.hadoop.hbase.TableName; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.mob.MobUtils; + import org.apache.hadoop.hbase.util.FSUtils; + + /** + * A mob compactor to directly compact the mob files. + */ + @InterfaceAudience.Private + public abstract class MobCompactor { + + protected FileSystem fs; + protected Configuration conf; + protected TableName tableName; + protected HColumnDescriptor column; + + protected Path mobTableDir; + protected Path mobFamilyDir; + protected ExecutorService pool; + + public MobCompactor(Configuration conf, FileSystem fs, TableName tableName, + HColumnDescriptor column, ExecutorService pool) { + this.conf = conf; + this.fs = fs; + this.tableName = tableName; + this.column = column; + this.pool = pool; + mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName); + mobFamilyDir = MobUtils.getMobFamilyPath(conf, tableName, column.getNameAsString()); + } + + /** + * Compacts the mob files for the current column family. + * @return The paths of new mob files generated in the compaction. + * @throws IOException + */ + public List<Path> compact() throws IOException { + return compact(false); + } + + /** + * Compacts the mob files by compaction type for the current column family. + * @param allFiles Whether add all mob files into the compaction. + * @return The paths of new mob files generated in the compaction. + * @throws IOException + */ + public List<Path> compact(boolean allFiles) throws IOException { + return compact(Arrays.asList(fs.listStatus(mobFamilyDir)), allFiles); + } + + /** + * Compacts the candidate mob files. + * @param files The candidate mob files. + * @param allFiles Whether add all mob files into the compaction. + * @return The paths of new mob files generated in the compaction. + * @throws IOException + */ + public abstract List<Path> compact(List<FileStatus> files, boolean allFiles) + throws IOException; + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java index 0000000,af1eb4a..227f1e4 mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java @@@ -1,0 -1,146 +1,146 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob.compactions; + + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.List; + -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.fs.FileStatus; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + + /** + * An implementation of {@link MobCompactionRequest} that is used in + * {@link PartitionedMobCompactor}. + * The mob files that have the same start key and date in their names belong to + * the same partition. + */ + @InterfaceAudience.Private + public class PartitionedMobCompactionRequest extends MobCompactionRequest { + + protected Collection<FileStatus> delFiles; + protected Collection<CompactionPartition> compactionPartitions; + + public PartitionedMobCompactionRequest(Collection<CompactionPartition> compactionPartitions, + Collection<FileStatus> delFiles) { + this.selectionTime = EnvironmentEdgeManager.currentTime(); + this.compactionPartitions = compactionPartitions; + this.delFiles = delFiles; + } + + /** + * Gets the compaction partitions. + * @return The compaction partitions. + */ + public Collection<CompactionPartition> getCompactionPartitions() { + return this.compactionPartitions; + } + + /** + * Gets the del files. + * @return The del files. + */ + public Collection<FileStatus> getDelFiles() { + return this.delFiles; + } + + /** + * The partition in the mob compaction. + * The mob files that have the same start key and date in their names belong to + * the same partition. + */ + protected static class CompactionPartition { + private List<FileStatus> files = new ArrayList<FileStatus>(); + private CompactionPartitionId partitionId; + + public CompactionPartition(CompactionPartitionId partitionId) { + this.partitionId = partitionId; + } + + public CompactionPartitionId getPartitionId() { + return this.partitionId; + } + + public void addFile(FileStatus file) { + files.add(file); + } + + public List<FileStatus> listFiles() { + return Collections.unmodifiableList(files); + } + } + + /** + * The partition id that consists of start key and date of the mob file name. + */ + public static class CompactionPartitionId { + + private String startKey; + private String date; + + public CompactionPartitionId(String startKey, String date) { + if (startKey == null || date == null) { + throw new IllegalArgumentException("Neither of start key and date could be null"); + } + this.startKey = startKey; + this.date = date; + } + + public String getStartKey() { + return this.startKey; + } + + public String getDate() { + return this.date; + } + + @Override + public int hashCode() { + int result = 17; + result = 31 * result + startKey.hashCode(); + result = 31 * result + date.hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof CompactionPartitionId)) { + return false; + } + CompactionPartitionId another = (CompactionPartitionId) obj; + if (!this.startKey.equals(another.startKey)) { + return false; + } + if (!this.date.equals(another.date)) { + return false; + } + return true; + } + + @Override + public String toString() { + return new StringBuilder(startKey).append(date).toString(); + } + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index 0000000,6c2ff01..19137c4 mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@@ -1,0 -1,679 +1,679 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob.compactions; + + import java.io.FileNotFoundException; + import java.io.IOException; + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.Date; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + import java.util.Map.Entry; + import java.util.concurrent.Callable; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Future; + + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FileStatus; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hbase.Cell; + import org.apache.hadoop.hbase.CellComparator; + import org.apache.hadoop.hbase.HColumnDescriptor; + import org.apache.hadoop.hbase.HConstants; + import org.apache.hadoop.hbase.KeyValue; + import org.apache.hadoop.hbase.TableName; + import org.apache.hadoop.hbase.Tag; + import org.apache.hadoop.hbase.TagType; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.client.Connection; + import org.apache.hadoop.hbase.client.ConnectionFactory; + import org.apache.hadoop.hbase.client.HTable; + import org.apache.hadoop.hbase.client.Scan; + import org.apache.hadoop.hbase.client.Table; + import org.apache.hadoop.hbase.io.HFileLink; + import org.apache.hadoop.hbase.io.crypto.Encryption; + import org.apache.hadoop.hbase.io.hfile.CacheConfig; + import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; + import org.apache.hadoop.hbase.mob.MobConstants; + import org.apache.hadoop.hbase.mob.MobFileName; + import org.apache.hadoop.hbase.mob.MobUtils; + import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType; + import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition; + import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId; + import org.apache.hadoop.hbase.regionserver.BloomType; + import org.apache.hadoop.hbase.regionserver.HStore; + import org.apache.hadoop.hbase.regionserver.ScanInfo; + import org.apache.hadoop.hbase.regionserver.ScanType; + import org.apache.hadoop.hbase.regionserver.ScannerContext; + import org.apache.hadoop.hbase.regionserver.StoreFile; + import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; + import org.apache.hadoop.hbase.regionserver.StoreFileInfo; + import org.apache.hadoop.hbase.regionserver.StoreFileScanner; + import org.apache.hadoop.hbase.regionserver.StoreScanner; + import org.apache.hadoop.hbase.util.Bytes; + import org.apache.hadoop.hbase.util.Pair; + + /** + * An implementation of {@link MobCompactor} that compacts the mob files in partitions. + */ + @InterfaceAudience.Private + public class PartitionedMobCompactor extends MobCompactor { + + private static final Log LOG = LogFactory.getLog(PartitionedMobCompactor.class); + protected long mergeableSize; + protected int delFileMaxCount; + /** The number of files compacted in a batch */ + protected int compactionBatchSize; + protected int compactionKVMax; + + private Path tempPath; + private Path bulkloadPath; + private CacheConfig compactionCacheConfig; + private Tag tableNameTag; + private Encryption.Context cryptoContext = Encryption.Context.NONE; + + public PartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName, + HColumnDescriptor column, ExecutorService pool) throws IOException { + super(conf, fs, tableName, column, pool); + mergeableSize = conf.getLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, + MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD); + delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT, + MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); + // default is 100 + compactionBatchSize = conf.getInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, + MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); + tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME); + bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path( + tableName.getNamespaceAsString(), tableName.getQualifierAsString()))); + compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, + HConstants.COMPACTION_KV_MAX_DEFAULT); + Configuration copyOfConf = new Configuration(conf); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + compactionCacheConfig = new CacheConfig(copyOfConf); + tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName()); + cryptoContext = MobUtils.createEncryptionContext(copyOfConf, column); + } + + @Override + public List<Path> compact(List<FileStatus> files, boolean allFiles) throws IOException { + if (files == null || files.isEmpty()) { + LOG.info("No candidate mob files"); + return null; + } + LOG.info("is allFiles: " + allFiles); + // find the files to compact. + PartitionedMobCompactionRequest request = select(files, allFiles); + // compact the files. + return performCompaction(request); + } + + /** + * Selects the compacted mob/del files. + * Iterates the candidates to find out all the del files and small mob files. + * @param candidates All the candidates. + * @param allFiles Whether add all mob files into the compaction. + * @return A compaction request. + * @throws IOException + */ + protected PartitionedMobCompactionRequest select(List<FileStatus> candidates, + boolean allFiles) throws IOException { + Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>(); + Map<CompactionPartitionId, CompactionPartition> filesToCompact = + new HashMap<CompactionPartitionId, CompactionPartition>(); + int selectedFileCount = 0; + int irrelevantFileCount = 0; + for (FileStatus file : candidates) { + if (!file.isFile()) { + irrelevantFileCount++; + continue; + } + // group the del files and small files. + FileStatus linkedFile = file; + if (HFileLink.isHFileLink(file.getPath())) { + HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); + linkedFile = getLinkedFileStatus(link); + if (linkedFile == null) { + // If the linked file cannot be found, regard it as an irrelevantFileCount file + irrelevantFileCount++; + continue; + } + } + if (StoreFileInfo.isDelFile(linkedFile.getPath())) { + allDelFiles.add(file); + } else if (allFiles || linkedFile.getLen() < mergeableSize) { + // add all files if allFiles is true, + // otherwise add the small files to the merge pool + MobFileName fileName = MobFileName.create(linkedFile.getPath().getName()); + CompactionPartitionId id = new CompactionPartitionId(fileName.getStartKey(), + fileName.getDate()); + CompactionPartition compactionPartition = filesToCompact.get(id); + if (compactionPartition == null) { + compactionPartition = new CompactionPartition(id); + compactionPartition.addFile(file); + filesToCompact.put(id, compactionPartition); + } else { + compactionPartition.addFile(file); + } + selectedFileCount++; + } + } + PartitionedMobCompactionRequest request = new PartitionedMobCompactionRequest( + filesToCompact.values(), allDelFiles); + if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) { + // all the files are selected + request.setCompactionType(CompactionType.ALL_FILES); + } + LOG.info("The compaction type is " + request.getCompactionType() + ", the request has " + + allDelFiles.size() + " del files, " + selectedFileCount + " selected files, and " + + irrelevantFileCount + " irrelevant files"); + return request; + } + + /** + * Performs the compaction on the selected files. + * <ol> + * <li>Compacts the del files.</li> + * <li>Compacts the selected small mob files and all the del files.</li> + * <li>If all the candidates are selected, delete the del files.</li> + * </ol> + * @param request The compaction request. + * @return The paths of new mob files generated in the compaction. + * @throws IOException + */ + protected List<Path> performCompaction(PartitionedMobCompactionRequest request) + throws IOException { + // merge the del files + List<Path> delFilePaths = new ArrayList<Path>(); + for (FileStatus delFile : request.delFiles) { + delFilePaths.add(delFile.getPath()); + } + List<Path> newDelPaths = compactDelFiles(request, delFilePaths); + List<StoreFile> newDelFiles = new ArrayList<StoreFile>(); + List<Path> paths = null; + try { + for (Path newDelPath : newDelPaths) { + StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE); + // pre-create reader of a del file to avoid race condition when opening the reader in each + // partition. + sf.createReader(); + newDelFiles.add(sf); + } + LOG.info("After merging, there are " + newDelFiles.size() + " del files"); + // compact the mob files by partitions. + paths = compactMobFiles(request, newDelFiles); + LOG.info("After compaction, there are " + paths.size() + " mob files"); + } finally { + closeStoreFileReaders(newDelFiles); + } + // archive the del files if all the mob files are selected. + if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) { + LOG.info("After a mob compaction with all files selected, archiving the del files " + + newDelPaths); + try { + MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles); + } catch (IOException e) { + LOG.error("Failed to archive the del files " + newDelPaths, e); + } + } + return paths; + } + + /** + * Compacts the selected small mob files and all the del files. + * @param request The compaction request. + * @param delFiles The del files. + * @return The paths of new mob files after compactions. + * @throws IOException + */ + protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest request, + final List<StoreFile> delFiles) throws IOException { + Collection<CompactionPartition> partitions = request.compactionPartitions; + if (partitions == null || partitions.isEmpty()) { + LOG.info("No partitions of mob files"); + return Collections.emptyList(); + } + List<Path> paths = new ArrayList<Path>(); + Connection c = ConnectionFactory.createConnection(conf); + final Table table = c.getTable(tableName); + try { + Map<CompactionPartitionId, Future<List<Path>>> results = + new HashMap<CompactionPartitionId, Future<List<Path>>>(); + // compact the mob files by partitions in parallel. + for (final CompactionPartition partition : partitions) { + results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() { + @Override + public List<Path> call() throws Exception { + LOG.info("Compacting mob files for partition " + partition.getPartitionId()); + return compactMobFilePartition(request, partition, delFiles, table); + } + })); + } + // compact the partitions in parallel. + List<CompactionPartitionId> failedPartitions = new ArrayList<CompactionPartitionId>(); + for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) { + try { + paths.addAll(result.getValue().get()); + } catch (Exception e) { + // just log the error + LOG.error("Failed to compact the partition " + result.getKey(), e); + failedPartitions.add(result.getKey()); + } + } + if (!failedPartitions.isEmpty()) { + // if any partition fails in the compaction, directly throw an exception. + throw new IOException("Failed to compact the partitions " + failedPartitions); + } + } finally { + try { + table.close(); + } catch (IOException e) { + LOG.error("Failed to close the HTable", e); + } + } + return paths; + } + + /** + * Compacts a partition of selected small mob files and all the del files. + * @param request The compaction request. + * @param partition A compaction partition. + * @param delFiles The del files. + * @param table The current table. + * @return The paths of new mob files after compactions. + * @throws IOException + */ + private List<Path> compactMobFilePartition(PartitionedMobCompactionRequest request, + CompactionPartition partition, List<StoreFile> delFiles, Table table) throws IOException { + List<Path> newFiles = new ArrayList<Path>(); + List<FileStatus> files = partition.listFiles(); + int offset = 0; + Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString()); + Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString()); + while (offset < files.size()) { + int batch = compactionBatchSize; + if (files.size() - offset < compactionBatchSize) { + batch = files.size() - offset; + } + if (batch == 1 && delFiles.isEmpty()) { + // only one file left and no del files, do not compact it, + // and directly add it to the new files. + newFiles.add(files.get(offset).getPath()); + offset++; + continue; + } + // clean the bulkload directory to avoid loading old files. + fs.delete(bulkloadPathOfPartition, true); + // add the selected mob files and del files into filesToCompact + List<StoreFile> filesToCompact = new ArrayList<StoreFile>(); + for (int i = offset; i < batch + offset; i++) { + StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig, + BloomType.NONE); + filesToCompact.add(sf); + } + filesToCompact.addAll(delFiles); + // compact the mob files in a batch. + compactMobFilesInBatch(request, partition, table, filesToCompact, batch, + bulkloadPathOfPartition, bulkloadColumnPath, newFiles); + // move to the next batch. + offset += batch; + } + LOG.info("Compaction is finished. The number of mob files is changed from " + files.size() + + " to " + newFiles.size()); + return newFiles; + } + + /** + * Closes the readers of store files. + * @param storeFiles The store files to be closed. + */ + private void closeStoreFileReaders(List<StoreFile> storeFiles) { + for (StoreFile storeFile : storeFiles) { + try { + storeFile.closeReader(true); + } catch (IOException e) { + LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e); + } + } + } + + /** + * Compacts a partition of selected small mob files and all the del files in a batch. + * @param request The compaction request. + * @param partition A compaction partition. + * @param table The current table. + * @param filesToCompact The files to be compacted. + * @param batch The number of mob files to be compacted in a batch. + * @param bulkloadPathOfPartition The directory where the bulkload column of the current + * partition is saved. + * @param bulkloadColumnPath The directory where the bulkload files of current partition + * are saved. + * @param newFiles The paths of new mob files after compactions. + * @throws IOException + */ + private void compactMobFilesInBatch(PartitionedMobCompactionRequest request, + CompactionPartition partition, Table table, List<StoreFile> filesToCompact, int batch, + Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles) + throws IOException { + // open scanner to the selected mob files and del files. + StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES); + // the mob files to be compacted, not include the del files. + List<StoreFile> mobFilesToCompact = filesToCompact.subList(0, batch); + // Pair(maxSeqId, cellsCount) + Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact); + // open writers for the mob files and new ref store files. + Writer writer = null; + Writer refFileWriter = null; + Path filePath = null; + Path refFilePath = null; + long mobCells = 0; + try { + writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(), + tempPath, Long.MAX_VALUE, column.getCompactionCompression(), partition.getPartitionId() + .getStartKey(), compactionCacheConfig, cryptoContext); + filePath = writer.getPath(); + byte[] fileName = Bytes.toBytes(filePath.getName()); + // create a temp file and open a writer for it in the bulkloadPath + refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo + .getSecond().longValue(), compactionCacheConfig, cryptoContext); + refFilePath = refFileWriter.getPath(); + List<Cell> cells = new ArrayList<Cell>(); + boolean hasMore = false; + ScannerContext scannerContext = + ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + do { + hasMore = scanner.next(cells, scannerContext); + for (Cell cell : cells) { + // write the mob cell to the mob file. + writer.append(cell); + // write the new reference cell to the store file. + KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag); + refFileWriter.append(reference); + mobCells++; + } + cells.clear(); + } while (hasMore); + } finally { + // close the scanner. + scanner.close(); + // append metadata to the mob file, and close the mob file writer. + closeMobFileWriter(writer, fileInfo.getFirst(), mobCells); + // append metadata and bulkload info to the ref mob file, and close the writer. + closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime); + } + if (mobCells > 0) { + // commit mob file + MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); + // bulkload the ref file + bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName()); + newFiles.add(new Path(mobFamilyDir, filePath.getName())); + } else { + // remove the new files + // the mob file is empty, delete it instead of committing. + deletePath(filePath); + // the ref file is empty, delete it instead of committing. + deletePath(refFilePath); + } + // archive the old mob files, do not archive the del files. + try { + closeStoreFileReaders(mobFilesToCompact); + MobUtils + .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact); + } catch (IOException e) { + LOG.error("Failed to archive the files " + mobFilesToCompact, e); + } + } + + /** + * Compacts the del files in batches which avoids opening too many files. + * @param request The compaction request. + * @param delFilePaths + * @return The paths of new del files after merging or the original files if no merging + * is necessary. + * @throws IOException + */ + protected List<Path> compactDelFiles(PartitionedMobCompactionRequest request, + List<Path> delFilePaths) throws IOException { + if (delFilePaths.size() <= delFileMaxCount) { + return delFilePaths; + } + // when there are more del files than the number that is allowed, merge it firstly. + int offset = 0; + List<Path> paths = new ArrayList<Path>(); + while (offset < delFilePaths.size()) { + // get the batch + int batch = compactionBatchSize; + if (delFilePaths.size() - offset < compactionBatchSize) { + batch = delFilePaths.size() - offset; + } + List<StoreFile> batchedDelFiles = new ArrayList<StoreFile>(); + if (batch == 1) { + // only one file left, do not compact it, directly add it to the new files. + paths.add(delFilePaths.get(offset)); + offset++; + continue; + } + for (int i = offset; i < batch + offset; i++) { + batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig, + BloomType.NONE)); + } + // compact the del files in a batch. + paths.add(compactDelFilesInBatch(request, batchedDelFiles)); + // move to the next batch. + offset += batch; + } + return compactDelFiles(request, paths); + } + + /** + * Compacts the del file in a batch. + * @param request The compaction request. + * @param delFiles The del files. + * @return The path of new del file after merging. + * @throws IOException + */ + private Path compactDelFilesInBatch(PartitionedMobCompactionRequest request, + List<StoreFile> delFiles) throws IOException { + // create a scanner for the del files. + StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES); + Writer writer = null; + Path filePath = null; + try { + writer = MobUtils.createDelFileWriter(conf, fs, column, + MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE, + column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig, + cryptoContext); + filePath = writer.getPath(); + List<Cell> cells = new ArrayList<Cell>(); + boolean hasMore = false; + ScannerContext scannerContext = + ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + do { + hasMore = scanner.next(cells, scannerContext); + for (Cell cell : cells) { + writer.append(cell); + } + cells.clear(); + } while (hasMore); + } finally { + scanner.close(); + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the file " + filePath, e); + } + } + } + // commit the new del file + Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); + // archive the old del files + try { + MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles); + } catch (IOException e) { + LOG.error("Failed to archive the old del files " + delFiles, e); + } + return path; + } + + /** + * Creates a store scanner. + * @param filesToCompact The files to be compacted. + * @param scanType The scan type. + * @return The store scanner. + * @throws IOException + */ + private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType) + throws IOException { + List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false, + null, HConstants.LATEST_TIMESTAMP); + Scan scan = new Scan(); + scan.setMaxVersions(column.getMaxVersions()); + long ttl = HStore.determineTTLFromFamily(column); + ScanInfo scanInfo = new ScanInfo(column, ttl, 0, CellComparator.COMPARATOR); + StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L, + HConstants.LATEST_TIMESTAMP); + return scanner; + } + + /** + * Bulkloads the current file. + * @param table The current table. + * @param bulkloadDirectory The path of bulkload directory. + * @param fileName The current file name. + * @throws IOException + */ + private void bulkloadRefFile(Table table, Path bulkloadDirectory, String fileName) + throws IOException { + // bulkload the ref file + try { + LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); + bulkload.doBulkLoad(bulkloadDirectory, (HTable)table); + } catch (Exception e) { + // delete the committed mob file + deletePath(new Path(mobFamilyDir, fileName)); + throw new IOException(e); + } finally { + // delete the bulkload files in bulkloadPath + deletePath(bulkloadDirectory); + } + } + + /** + * Closes the mob file writer. + * @param writer The mob file writer. + * @param maxSeqId Maximum sequence id. + * @param mobCellsCount The number of mob cells. + * @throws IOException + */ + private void closeMobFileWriter(Writer writer, long maxSeqId, long mobCellsCount) + throws IOException { + if (writer != null) { + writer.appendMetadata(maxSeqId, false, mobCellsCount); + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the file " + writer.getPath(), e); + } + } + } + + /** + * Closes the ref file writer. + * @param writer The ref file writer. + * @param maxSeqId Maximum sequence id. + * @param bulkloadTime The timestamp at which the bulk load file is created. + * @throws IOException + */ + private void closeRefFileWriter(Writer writer, long maxSeqId, long bulkloadTime) + throws IOException { + if (writer != null) { + writer.appendMetadata(maxSeqId, false); + writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime)); + writer.appendFileInfo(StoreFile.SKIP_RESET_SEQ_ID, Bytes.toBytes(true)); + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e); + } + } + } + + /** + * Gets the max seqId and number of cells of the store files. + * @param storeFiles The store files. + * @return The pair of the max seqId and number of cells of the store files. + * @throws IOException + */ + private Pair<Long, Long> getFileInfo(List<StoreFile> storeFiles) throws IOException { + long maxSeqId = 0; + long maxKeyCount = 0; + for (StoreFile sf : storeFiles) { + // the readers will be closed later after the merge. + maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId()); + byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT); + if (count != null) { + maxKeyCount += Bytes.toLong(count); + } + } + return new Pair<Long, Long>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount)); + } + + /** + * Deletes a file. + * @param path The path of the file to be deleted. + */ + private void deletePath(Path path) { + try { + if (path != null) { + fs.delete(path, true); + } + } catch (IOException e) { + LOG.error("Failed to delete the file " + path, e); + } + } + + private FileStatus getLinkedFileStatus(HFileLink link) throws IOException { + Path[] locations = link.getLocations(); + for (Path location : locations) { + FileStatus file = getFileStatus(location); + if (file != null) { + return file; + } + } + return null; + } + + private FileStatus getFileStatus(Path path) throws IOException { + try { + if (path != null) { + FileStatus file = fs.getFileStatus(path); + return file; + } + } catch (FileNotFoundException e) { + LOG.warn("The file " + path + " can not be found", e); + } + return null; + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java index 0000000,82d03cd..fdda1de mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java @@@ -1,0 -1,185 +1,185 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob.mapreduce; + + import java.io.IOException; + + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hbase.Cell; + import org.apache.hadoop.hbase.HColumnDescriptor; + import org.apache.hadoop.hbase.HConstants; + import org.apache.hadoop.hbase.KeyValue; + import org.apache.hadoop.hbase.KeyValueUtil; + import org.apache.hadoop.hbase.Tag; + import org.apache.hadoop.hbase.TagType; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.client.BufferedMutator; + import org.apache.hadoop.hbase.client.Put; + import org.apache.hadoop.hbase.io.crypto.Encryption; + import org.apache.hadoop.hbase.io.hfile.CacheConfig; + import org.apache.hadoop.hbase.mob.MobConstants; + import org.apache.hadoop.hbase.mob.MobUtils; + import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId; + import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter; + import org.apache.hadoop.hbase.regionserver.KeyValueScanner; + import org.apache.hadoop.hbase.regionserver.MemStore; + import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; + import org.apache.hadoop.hbase.regionserver.StoreFile; + import org.apache.hadoop.hbase.util.Bytes; + import org.apache.hadoop.mapreduce.Reducer.Context; + + /** + * The wrapper of a DefaultMemStore. + * This wrapper is used in the sweep reducer to buffer and sort the cells written from + * the invalid and small mob files. + * It's flushed when it's full, the mob data are written to the mob files, and their file names + * are written back to store files of HBase. + * This memStore is used to sort the cells in mob files. + * In a reducer of sweep tool, the mob files are grouped by the same prefix (start key and date), + * in each group, the reducer iterates the files and read the cells to a new and bigger mob file. + * The cells in the same mob file are ordered, but cells across mob files are not. + * So we need this MemStoreWrapper to sort those cells come from different mob files before + * flushing them to the disk, when the memStore is big enough it's flushed as a new mob file. + */ + @InterfaceAudience.Private + public class MemStoreWrapper { + + private static final Log LOG = LogFactory.getLog(MemStoreWrapper.class); + + private MemStore memstore; + private long flushSize; + private CompactionPartitionId partitionId; + private Context context; + private Configuration conf; + private BufferedMutator table; + private HColumnDescriptor hcd; + private Path mobFamilyDir; + private FileSystem fs; + private CacheConfig cacheConfig; + private Encryption.Context cryptoContext = Encryption.Context.NONE; + + public MemStoreWrapper(Context context, FileSystem fs, BufferedMutator table, + HColumnDescriptor hcd, MemStore memstore, CacheConfig cacheConfig) throws IOException { + this.memstore = memstore; + this.context = context; + this.fs = fs; + this.table = table; + this.hcd = hcd; + this.conf = context.getConfiguration(); + this.cacheConfig = cacheConfig; + flushSize = this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE, + MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE); + mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), hcd.getNameAsString()); + cryptoContext = MobUtils.createEncryptionContext(conf, hcd); + } + + public void setPartitionId(CompactionPartitionId partitionId) { + this.partitionId = partitionId; + } + + /** + * Flushes the memstore if the size is large enough. + * @throws IOException + */ + private void flushMemStoreIfNecessary() throws IOException { + if (memstore.heapSize() >= flushSize) { + flushMemStore(); + } + } + + /** + * Flushes the memstore anyway. + * @throws IOException + */ + public void flushMemStore() throws IOException { + MemStoreSnapshot snapshot = memstore.snapshot(); + internalFlushCache(snapshot); + memstore.clearSnapshot(snapshot.getId()); + } + + /** + * Flushes the snapshot of the memstore. + * Flushes the mob data to the mob files, and flushes the name of these mob files to HBase. + * @param snapshot The snapshot of the memstore. + * @throws IOException + */ + private void internalFlushCache(final MemStoreSnapshot snapshot) + throws IOException { + if (snapshot.getCellsCount() == 0) { + return; + } + // generate the files into a temp directory. + String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY); + StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd, partitionId.getDate(), + new Path(tempPathString), snapshot.getCellsCount(), hcd.getCompactionCompression(), + partitionId.getStartKey(), cacheConfig, cryptoContext); + + String relativePath = mobFileWriter.getPath().getName(); + LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString()); + + byte[] referenceValue = Bytes.toBytes(relativePath); + KeyValueScanner scanner = snapshot.getScanner(); + Cell cell = null; + while (null != (cell = scanner.next())) { + mobFileWriter.append(cell); + } + scanner.close(); + // Write out the log sequence number that corresponds to this output + // hfile. The hfile is current up to and including logCacheFlushId. + mobFileWriter.appendMetadata(Long.MAX_VALUE, false, snapshot.getCellsCount()); + mobFileWriter.close(); + + MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig); + context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1); + // write reference/fileName back to the store files of HBase. + scanner = snapshot.getScanner(); + scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); + cell = null; + Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, Bytes.toBytes(this.table.getName() + .toString())); + long updatedCount = 0; + while (null != (cell = scanner.next())) { + KeyValue reference = MobUtils.createMobRefKeyValue(cell, referenceValue, tableNameTag); + Put put = + new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength()); + put.add(reference); + table.mutate(put); + updatedCount++; + } + table.flush(); + context.getCounter(SweepCounter.RECORDS_UPDATED).increment(updatedCount); + scanner.close(); + } + + /** + * Adds a KeyValue into the memstore. + * @param kv The KeyValue to be added. + * @throws IOException + */ + public void addToMemstore(KeyValue kv) throws IOException { + memstore.add(kv); + // flush the memstore if it's full. + flushMemStoreIfNecessary(); + } + + }
