http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index d8b1376,0000000..4e8ccc1 mode 100644,000000..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@@ -1,648 -1,0 +1,648 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.backup.HFileArchiver; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; + +/** + * The mob utilities + */ [email protected] +public class MobUtils { + + private static final Log LOG = LogFactory.getLog(MobUtils.class); + + private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT = + new ThreadLocal<SimpleDateFormat>() { + @Override + protected SimpleDateFormat initialValue() { + return new SimpleDateFormat("yyyyMMdd"); + } + }; + + /** + * Formats a date to a string. + * @param date The date. + * @return The string format of the date, it's yyyymmdd. + */ + public static String formatDate(Date date) { + return LOCAL_FORMAT.get().format(date); + } + + /** + * Parses the string to a date. + * @param dateString The string format of a date, it's yyyymmdd. + * @return A date. + * @throws ParseException + */ + public static Date parseDate(String dateString) throws ParseException { + return LOCAL_FORMAT.get().parse(dateString); + } + + /** + * Whether the current cell is a mob reference cell. + * @param cell The current cell. + * @return True if the cell has a mob reference tag, false if it doesn't. + */ + public static boolean isMobReferenceCell(Cell cell) { + if (cell.getTagsLength() > 0) { + Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(), + TagType.MOB_REFERENCE_TAG_TYPE); + return tag != null; + } + return false; + } + + /** + * Gets the table name tag. + * @param cell The current cell. + * @return The table name tag. + */ + public static Tag getTableNameTag(Cell cell) { + if (cell.getTagsLength() > 0) { + Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(), + TagType.MOB_TABLE_NAME_TAG_TYPE); + return tag; + } + return null; + } + + /** + * Whether the tag list has a mob reference tag. + * @param tags The tag list. + * @return True if the list has a mob reference tag, false if it doesn't. + */ + public static boolean hasMobReferenceTag(List<Tag> tags) { + if (!tags.isEmpty()) { + for (Tag tag : tags) { + if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) { + return true; + } + } + } + return false; + } + + /** + * Indicates whether it's a raw scan. + * The information is set in the attribute "hbase.mob.scan.raw" of scan. + * For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file. + * In a raw scan, the scanner directly returns cell in HBase without retrieve the one in + * the mob file. + * @param scan The current scan. + * @return True if it's a raw scan. + */ + public static boolean isRawMobScan(Scan scan) { + byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW); + try { + return raw != null && Bytes.toBoolean(raw); + } catch (IllegalArgumentException e) { + return false; + } + } + + /** + * Indicates whether it's a reference only scan. + * The information is set in the attribute "hbase.mob.scan.ref.only" of scan. + * If it's a ref only scan, only the cells with ref tag are returned. + * @param scan The current scan. + * @return True if it's a ref only scan. + */ + public static boolean isRefOnlyScan(Scan scan) { + byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY); + try { + return refOnly != null && Bytes.toBoolean(refOnly); + } catch (IllegalArgumentException e) { + return false; + } + } + + /** + * Indicates whether the scan contains the information of caching blocks. + * The information is set in the attribute "hbase.mob.cache.blocks" of scan. + * @param scan The current scan. + * @return True when the Scan attribute specifies to cache the MOB blocks. + */ + public static boolean isCacheMobBlocks(Scan scan) { + byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS); + try { + return cache != null && Bytes.toBoolean(cache); + } catch (IllegalArgumentException e) { + return false; + } + } + + /** + * Sets the attribute of caching blocks in the scan. + * + * @param scan + * The current scan. + * @param cacheBlocks + * True, set the attribute of caching blocks into the scan, the scanner with this scan + * caches blocks. + * False, the scanner doesn't cache blocks for this scan. + */ + public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) { + scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks)); + } + + /** + * Cleans the expired mob files. + * Cleans the files whose creation date is older than (current - columnFamily.ttl), and + * the minVersions of that column family is 0. + * @param fs The current file system. + * @param conf The current configuration. + * @param tableName The current table name. + * @param columnDescriptor The descriptor of the current column family. + * @param cacheConfig The cacheConfig that disables the block cache. + * @param current The current time. + * @throws IOException + */ + public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName, + HColumnDescriptor columnDescriptor, CacheConfig cacheConfig, long current) + throws IOException { + long timeToLive = columnDescriptor.getTimeToLive(); + if (Integer.MAX_VALUE == timeToLive) { + // no need to clean, because the TTL is not set. + return; + } + + Date expireDate = new Date(current - timeToLive * 1000); + expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate()); + LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!"); + + FileStatus[] stats = null; + Path mobTableDir = FSUtils.getTableDir(getMobHome(conf), tableName); + Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString()); + try { + stats = fs.listStatus(path); + } catch (FileNotFoundException e) { + LOG.warn("Fail to find the mob file " + path, e); + } + if (null == stats) { + // no file found + return; + } + List<StoreFile> filesToClean = new ArrayList<StoreFile>(); + int deletedFileCount = 0; + for (FileStatus file : stats) { + String fileName = file.getPath().getName(); + try { + MobFileName mobFileName = null; + if (!HFileLink.isHFileLink(file.getPath())) { + mobFileName = MobFileName.create(fileName); + } else { - HFileLink hfileLink = new HFileLink(conf, file.getPath()); ++ HFileLink hfileLink = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); + mobFileName = MobFileName.create(hfileLink.getOriginPath().getName()); + } + Date fileDate = parseDate(mobFileName.getDate()); + if (LOG.isDebugEnabled()) { + LOG.debug("Checking file " + fileName); + } + if (fileDate.getTime() < expireDate.getTime()) { + if (LOG.isDebugEnabled()) { + LOG.debug(fileName + " is an expired file"); + } + filesToClean.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE)); + } + } catch (Exception e) { + LOG.error("Cannot parse the fileName " + fileName, e); + } + } + if (!filesToClean.isEmpty()) { + try { + removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(), + filesToClean); + deletedFileCount = filesToClean.size(); + } catch (IOException e) { + LOG.error("Fail to delete the mob files " + filesToClean, e); + } + } + LOG.info(deletedFileCount + " expired mob files are deleted"); + } + + /** + * Gets the root dir of the mob files. + * It's {HBASE_DIR}/mobdir. + * @param conf The current configuration. + * @return the root dir of the mob file. + */ + public static Path getMobHome(Configuration conf) { + Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR)); + return new Path(hbaseDir, MobConstants.MOB_DIR_NAME); + } + + /** + * Gets the qualified root dir of the mob files. + * @param conf The current configuration. + * @return The qualified root dir. + * @throws IOException + */ + public static Path getQualifiedMobRootDir(Configuration conf) throws IOException { + Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR)); + Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME); + FileSystem fs = mobRootDir.getFileSystem(conf); + return mobRootDir.makeQualified(fs); + } + + /** + * Gets the region dir of the mob files. + * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}. + * @param conf The current configuration. + * @param tableName The current table name. + * @return The region dir of the mob files. + */ + public static Path getMobRegionPath(Configuration conf, TableName tableName) { + Path tablePath = FSUtils.getTableDir(getMobHome(conf), tableName); + HRegionInfo regionInfo = getMobRegionInfo(tableName); + return new Path(tablePath, regionInfo.getEncodedName()); + } + + /** + * Gets the family dir of the mob files. + * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}. + * @param conf The current configuration. + * @param tableName The current table name. + * @param familyName The current family name. + * @return The family dir of the mob files. + */ + public static Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) { + return new Path(getMobRegionPath(conf, tableName), familyName); + } + + /** + * Gets the family dir of the mob files. + * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}. + * @param regionPath The path of mob region which is a dummy one. + * @param familyName The current family name. + * @return The family dir of the mob files. + */ + public static Path getMobFamilyPath(Path regionPath, String familyName) { + return new Path(regionPath, familyName); + } + + /** + * Gets the HRegionInfo of the mob files. + * This is a dummy region. The mob files are not saved in a region in HBase. + * This is only used in mob snapshot. It's internally used only. + * @param tableName + * @return A dummy mob region info. + */ + public static HRegionInfo getMobRegionInfo(TableName tableName) { + HRegionInfo info = new HRegionInfo(tableName, MobConstants.MOB_REGION_NAME_BYTES, + HConstants.EMPTY_END_ROW, false, 0); + return info; + } + + /** + * Gets whether the current HRegionInfo is a mob one. + * @param regionInfo The current HRegionInfo. + * @return If true, the current HRegionInfo is a mob one. + */ + public static boolean isMobRegionInfo(HRegionInfo regionInfo) { + return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName() + .equals(regionInfo.getEncodedName()); + } + + /** + * Gets the working directory of the mob compaction. + * @param root The root directory of the mob compaction. + * @param jobName The current job name. + * @return The directory of the mob compaction for the current job. + */ + public static Path getCompactionWorkingPath(Path root, String jobName) { + return new Path(root, jobName); + } + + /** + * Archives the mob files. + * @param conf The current configuration. + * @param fs The current file system. + * @param tableName The table name. + * @param tableDir The table directory. + * @param family The name of the column family. + * @param storeFiles The files to be deleted. + * @throws IOException + */ + public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName, + Path tableDir, byte[] family, Collection<StoreFile> storeFiles) throws IOException { + HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family, + storeFiles); + } + + /** + * Creates a mob reference KeyValue. + * The value of the mob reference KeyValue is mobCellValueSize + mobFileName. + * @param cell The original Cell. + * @param fileName The mob file name where the mob reference KeyValue is written. + * @param tableNameTag The tag of the current table name. It's very important in + * cloning the snapshot. + * @return The mob reference KeyValue. + */ + public static KeyValue createMobRefKeyValue(Cell cell, byte[] fileName, Tag tableNameTag) { + // Append the tags to the KeyValue. + // The key is same, the value is the filename of the mob file + List<Tag> tags = new ArrayList<Tag>(); + // Add the ref tag as the 1st one. + tags.add(MobConstants.MOB_REF_TAG); + // Add the tag of the source table name, this table is where this mob file is flushed + // from. + // It's very useful in cloning the snapshot. When reading from the cloning table, we need to + // find the original mob files by this table name. For details please see cloning + // snapshot for mob files. + tags.add(tableNameTag); + // Add the existing tags. + tags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength())); + int valueLength = cell.getValueLength(); + byte[] refValue = Bytes.add(Bytes.toBytes(valueLength), fileName); + KeyValue reference = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), + cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), + cell.getTimestamp(), KeyValue.Type.Put, refValue, 0, refValue.length, tags); + reference.setSequenceId(cell.getSequenceId()); + return reference; + } + + /** + * Creates a writer for the mob file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param date The date string, its format is yyyymmmdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The hex string of the start key. + * @param cacheConfig The current cache config. + * @return The writer for the mob file. + * @throws IOException + */ + public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, String date, Path basePath, long maxKeyCount, + Compression.Algorithm compression, String startKey, CacheConfig cacheConfig) + throws IOException { + MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString() + .replaceAll("-", "")); + return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, + cacheConfig); + } + + /** + * Creates a writer for the ref file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param cacheConfig The current cache config. + * @return The writer for the mob file. + * @throws IOException + */ + public static StoreFile.Writer createRefFileWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig) + throws IOException { + HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(true) + .withIncludesTags(true).withCompression(family.getCompactionCompression()) + .withCompressTags(family.shouldCompressTags()).withChecksumType(HStore.getChecksumType(conf)) + .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize()) + .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()).build(); + Path tempPath = new Path(basePath, UUID.randomUUID().toString().replaceAll("-", "")); + StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs).withFilePath(tempPath) + .withComparator(KeyValue.COMPARATOR).withBloomType(family.getBloomFilterType()) + .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); + return w; + } + + /** + * Creates a writer for the mob file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param date The date string, its format is yyyymmmdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @param cacheConfig The current cache config. + * @return The writer for the mob file. + * @throws IOException + */ + public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, String date, Path basePath, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig) + throws IOException { + MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString() + .replaceAll("-", "")); + return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, + cacheConfig); + } + + /** + * Creates a writer for the del file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param date The date string, its format is yyyymmmdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @param cacheConfig The current cache config. + * @return The writer for the del file. + * @throws IOException + */ + public static StoreFile.Writer createDelFileWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, String date, Path basePath, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig) + throws IOException { + String suffix = UUID + .randomUUID().toString().replaceAll("-", "") + "_del"; + MobFileName mobFileName = MobFileName.create(startKey, date, suffix); + return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, + cacheConfig); + } + + /** + * Creates a writer for the del file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param mobFileName The mob file name. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param cacheConfig The current cache config. + * @return The writer for the mob file. + * @throws IOException + */ + private static StoreFile.Writer createWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount, + Compression.Algorithm compression, CacheConfig cacheConfig) throws IOException { + HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) + .withIncludesMvcc(false).withIncludesTags(true).withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE) + .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).withBlockSize(family.getBlocksize()) + .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()).build(); + + StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs) + .withFilePath(new Path(basePath, mobFileName.getFileName())) + .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE) + .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); + return w; + } + + /** + * Commits the mob file. + * @param @param conf The current configuration. + * @param fs The current file system. + * @param path The path where the mob file is saved. + * @param targetPath The directory path where the source file is renamed to. + * @param cacheConfig The current cache config. + * @return The target file path the source file is renamed to. + * @throws IOException + */ + public static Path commitFile(Configuration conf, FileSystem fs, final Path sourceFile, + Path targetPath, CacheConfig cacheConfig) throws IOException { + if (sourceFile == null) { + return null; + } + Path dstPath = new Path(targetPath, sourceFile.getName()); + validateMobFile(conf, fs, sourceFile, cacheConfig); + String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath; + LOG.info(msg); + Path parent = dstPath.getParent(); + if (!fs.exists(parent)) { + fs.mkdirs(parent); + } + if (!fs.rename(sourceFile, dstPath)) { + throw new IOException("Failed rename of " + sourceFile + " to " + dstPath); + } + return dstPath; + } + + /** + * Validates a mob file by opening and closing it. + * @param conf The current configuration. + * @param fs The current file system. + * @param path The path where the mob file is saved. + * @param cacheConfig The current cache config. + */ + private static void validateMobFile(Configuration conf, FileSystem fs, Path path, + CacheConfig cacheConfig) throws IOException { + StoreFile storeFile = null; + try { + storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE); + storeFile.createReader(); + } catch (IOException e) { + LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e); + throw e; + } finally { + if (storeFile != null) { + storeFile.closeReader(false); + } + } + } + + /** + * Indicates whether the current mob ref cell has a valid value. + * A mob ref cell has a mob reference tag. + * The value of a mob ref cell consists of two parts, real mob value length and mob file name. + * The real mob value length takes 4 bytes. + * The remaining part is the mob file name. + * @param cell The mob ref cell. + * @return True if the cell has a valid value. + */ + public static boolean hasValidMobRefCellValue(Cell cell) { + return cell.getValueLength() > Bytes.SIZEOF_INT; + } + + /** + * Gets the mob value length from the mob ref cell. + * A mob ref cell has a mob reference tag. + * The value of a mob ref cell consists of two parts, real mob value length and mob file name. + * The real mob value length takes 4 bytes. + * The remaining part is the mob file name. + * @param cell The mob ref cell. + * @return The real mob value length. + */ + public static int getMobValueLength(Cell cell) { + return Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), Bytes.SIZEOF_INT); + } + + /** + * Gets the mob file name from the mob ref cell. + * A mob ref cell has a mob reference tag. + * The value of a mob ref cell consists of two parts, real mob value length and mob file name. + * The real mob value length takes 4 bytes. + * The remaining part is the mob file name. + * @param cell The mob ref cell. + * @return The mob file name. + */ + public static String getMobFileName(Cell cell) { + return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT, + cell.getValueLength() - Bytes.SIZEOF_INT); + } + + /** + * Gets the table name used in the table lock. + * The table lock name is a dummy one, it's not a table name. It's tableName + ".mobLock". + * @param tn The table name. + * @return The table name used in table lock. + */ + public static TableName getTableLockName(TableName tn) { + byte[] tableName = tn.getName(); + return TableName.valueOf(Bytes.add(tableName, MobConstants.MOB_TABLE_LOCK_SUFFIX)); + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java index 6cd3172,0000000..d6ad143 mode 100644,000000..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java @@@ -1,631 -1,0 +1,631 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.filecompactions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType; +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition; +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartitionId; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.ScanInfo; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +/** + * An implementation of {@link MobFileCompactor} that compacts the mob files in partitions. + */ [email protected] +public class PartitionedMobFileCompactor extends MobFileCompactor { + + private static final Log LOG = LogFactory.getLog(PartitionedMobFileCompactor.class); + protected long mergeableSize; + protected int delFileMaxCount; + /** The number of files compacted in a batch */ + protected int compactionBatchSize; + protected int compactionKVMax; + + private Path tempPath; + private Path bulkloadPath; + private CacheConfig compactionCacheConfig; + private Tag tableNameTag; + + public PartitionedMobFileCompactor(Configuration conf, FileSystem fs, TableName tableName, + HColumnDescriptor column, ExecutorService pool) { + super(conf, fs, tableName, column, pool); + mergeableSize = conf.getLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD); + delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT, + MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); + // default is 100 + compactionBatchSize = conf.getInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE); + tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME); + bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, + tableName.getNameAsString())); + compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, + HConstants.COMPACTION_KV_MAX_DEFAULT); + Configuration copyOfConf = new Configuration(conf); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + compactionCacheConfig = new CacheConfig(copyOfConf); + tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName()); + } + + @Override + public List<Path> compact(List<FileStatus> files) throws IOException { + if (files == null || files.isEmpty()) { + return null; + } + // find the files to compact. + PartitionedMobFileCompactionRequest request = select(files); + // compact the files. + return performCompaction(request); + } + + /** + * Selects the compacted mob/del files. + * Iterates the candidates to find out all the del files and small mob files. + * @param candidates All the candidates. + * @return A compaction request. + * @throws IOException + */ + protected PartitionedMobFileCompactionRequest select(List<FileStatus> candidates) + throws IOException { + Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>(); + Map<CompactionPartitionId, CompactionPartition> filesToCompact = + new HashMap<CompactionPartitionId, CompactionPartition>(); + int selectedFileCount = 0; + int irrelevantFileCount = 0; + for (FileStatus file : candidates) { + if (!file.isFile()) { + irrelevantFileCount++; + continue; + } + // group the del files and small files. + FileStatus linkedFile = file; + if (HFileLink.isHFileLink(file.getPath())) { - HFileLink link = new HFileLink(conf, file.getPath()); ++ HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); + linkedFile = getLinkedFileStatus(link); + if (linkedFile == null) { + // If the linked file cannot be found, regard it as an irrelevantFileCount file + irrelevantFileCount++; + continue; + } + } + if (StoreFileInfo.isDelFile(linkedFile.getPath())) { + allDelFiles.add(file); + } else if (linkedFile.getLen() < mergeableSize) { + // add the small files to the merge pool + MobFileName fileName = MobFileName.create(linkedFile.getPath().getName()); + CompactionPartitionId id = new CompactionPartitionId(fileName.getStartKey(), + fileName.getDate()); + CompactionPartition compactionPartition = filesToCompact.get(id); + if (compactionPartition == null) { + compactionPartition = new CompactionPartition(id); + compactionPartition.addFile(file); + filesToCompact.put(id, compactionPartition); + } else { + compactionPartition.addFile(file); + } + selectedFileCount++; + } + } + PartitionedMobFileCompactionRequest request = new PartitionedMobFileCompactionRequest( + filesToCompact.values(), allDelFiles); + if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) { + // all the files are selected + request.setCompactionType(CompactionType.ALL_FILES); + } + return request; + } + + /** + * Performs the compaction on the selected files. + * <ol> + * <li>Compacts the del files.</li> + * <li>Compacts the selected small mob files and all the del files.</li> + * <li>If all the candidates are selected, delete the del files.</li> + * </ol> + * @param request The compaction request. + * @return The paths of new mob files generated in the compaction. + * @throws IOException + */ + protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request) + throws IOException { + // merge the del files + List<Path> delFilePaths = new ArrayList<Path>(); + for (FileStatus delFile : request.delFiles) { + delFilePaths.add(delFile.getPath()); + } + List<Path> newDelPaths = compactDelFiles(request, delFilePaths); + List<StoreFile> newDelFiles = new ArrayList<StoreFile>(); + for (Path newDelPath : newDelPaths) { + StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE); + newDelFiles.add(sf); + } + // compact the mob files by partitions. + List<Path> paths = compactMobFiles(request, newDelFiles); + // archive the del files if all the mob files are selected. + if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) { + try { + MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles); + } catch (IOException e) { + LOG.error("Failed to archive the del files " + newDelFiles, e); + } + } + return paths; + } + + /** + * Compacts the selected small mob files and all the del files. + * @param request The compaction request. + * @param delFiles The del files. + * @return The paths of new mob files after compactions. + * @throws IOException + */ + protected List<Path> compactMobFiles(final PartitionedMobFileCompactionRequest request, + final List<StoreFile> delFiles) throws IOException { + Collection<CompactionPartition> partitions = request.compactionPartitions; + if (partitions == null || partitions.isEmpty()) { + return Collections.emptyList(); + } + List<Path> paths = new ArrayList<Path>(); + final HTable table = new HTable(conf, tableName); + try { + Map<CompactionPartitionId, Future<List<Path>>> results = + new HashMap<CompactionPartitionId, Future<List<Path>>>(); + // compact the mob files by partitions in parallel. + for (final CompactionPartition partition : partitions) { + results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() { + @Override + public List<Path> call() throws Exception { + return compactMobFilePartition(request, partition, delFiles, table); + } + })); + } + // compact the partitions in parallel. + boolean hasFailure = false; + for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) { + try { + paths.addAll(result.getValue().get()); + } catch (Exception e) { + // just log the error + LOG.error("Failed to compact the partition " + result.getKey(), e); + hasFailure = true; + } + } + if (hasFailure) { + // if any partition fails in the compaction, directly throw an exception. + throw new IOException("Failed to compact the partitions"); + } + } finally { + try { + table.close(); + } catch (IOException e) { + LOG.error("Failed to close the HTable", e); + } + } + return paths; + } + + /** + * Compacts a partition of selected small mob files and all the del files. + * @param request The compaction request. + * @param partition A compaction partition. + * @param delFiles The del files. + * @param table The current table. + * @return The paths of new mob files after compactions. + * @throws IOException + */ + private List<Path> compactMobFilePartition(PartitionedMobFileCompactionRequest request, + CompactionPartition partition, List<StoreFile> delFiles, HTable table) throws IOException { + List<Path> newFiles = new ArrayList<Path>(); + List<FileStatus> files = partition.listFiles(); + int offset = 0; + Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString()); + Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString()); + while (offset < files.size()) { + int batch = compactionBatchSize; + if (files.size() - offset < compactionBatchSize) { + batch = files.size() - offset; + } + if (batch == 1 && delFiles.isEmpty()) { + // only one file left and no del files, do not compact it, + // and directly add it to the new files. + newFiles.add(files.get(offset).getPath()); + offset++; + continue; + } + // clean the bulkload directory to avoid loading old files. + fs.delete(bulkloadPathOfPartition, true); + // add the selected mob files and del files into filesToCompact + List<StoreFile> filesToCompact = new ArrayList<StoreFile>(); + for (int i = offset; i < batch + offset; i++) { + StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig, + BloomType.NONE); + filesToCompact.add(sf); + } + filesToCompact.addAll(delFiles); + // compact the mob files in a batch. + compactMobFilesInBatch(request, partition, table, filesToCompact, batch, + bulkloadPathOfPartition, bulkloadColumnPath, newFiles); + // move to the next batch. + offset += batch; + } + return newFiles; + } + + /** + * Compacts a partition of selected small mob files and all the del files in a batch. + * @param request The compaction request. + * @param partition A compaction partition. + * @param table The current table. + * @param filesToCompact The files to be compacted. + * @param batch The number of mob files to be compacted in a batch. + * @param bulkloadPathOfPartition The directory where the bulkload column of the current + * partition is saved. + * @param bulkloadColumnPath The directory where the bulkload files of current partition + * are saved. + * @param newFiles The paths of new mob files after compactions. + * @throws IOException + */ + private void compactMobFilesInBatch(PartitionedMobFileCompactionRequest request, + CompactionPartition partition, HTable table, List<StoreFile> filesToCompact, int batch, + Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles) + throws IOException { + // open scanner to the selected mob files and del files. + StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES); + // the mob files to be compacted, not include the del files. + List<StoreFile> mobFilesToCompact = filesToCompact.subList(0, batch); + // Pair(maxSeqId, cellsCount) + Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact); + // open writers for the mob files and new ref store files. + Writer writer = null; + Writer refFileWriter = null; + Path filePath = null; + Path refFilePath = null; + long mobCells = 0; + try { + writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(), + tempPath, Long.MAX_VALUE, column.getCompactionCompression(), partition.getPartitionId() + .getStartKey(), compactionCacheConfig); + filePath = writer.getPath(); + byte[] fileName = Bytes.toBytes(filePath.getName()); + // create a temp file and open a writer for it in the bulkloadPath + refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo + .getSecond().longValue(), compactionCacheConfig); + refFilePath = refFileWriter.getPath(); + List<Cell> cells = new ArrayList<Cell>(); + boolean hasMore = false; + do { + hasMore = scanner.next(cells, compactionKVMax); + for (Cell cell : cells) { + // TODO remove this after the new code are introduced. + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + // write the mob cell to the mob file. + writer.append(kv); + // write the new reference cell to the store file. + KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag); + refFileWriter.append(reference); + mobCells++; + } + cells.clear(); + } while (hasMore); + } finally { + // close the scanner. + scanner.close(); + // append metadata to the mob file, and close the mob file writer. + closeMobFileWriter(writer, fileInfo.getFirst(), mobCells); + // append metadata and bulkload info to the ref mob file, and close the writer. + closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime); + } + if (mobCells > 0) { + // commit mob file + MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); + // bulkload the ref file + bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName()); + newFiles.add(new Path(mobFamilyDir, filePath.getName())); + } else { + // remove the new files + // the mob file is empty, delete it instead of committing. + deletePath(filePath); + // the ref file is empty, delete it instead of committing. + deletePath(refFilePath); + } + // archive the old mob files, do not archive the del files. + try { + MobUtils + .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact); + } catch (IOException e) { + LOG.error("Failed to archive the files " + mobFilesToCompact, e); + } + } + + /** + * Compacts the del files in batches which avoids opening too many files. + * @param request The compaction request. + * @param delFilePaths + * @return The paths of new del files after merging or the original files if no merging + * is necessary. + * @throws IOException + */ + protected List<Path> compactDelFiles(PartitionedMobFileCompactionRequest request, + List<Path> delFilePaths) throws IOException { + if (delFilePaths.size() <= delFileMaxCount) { + return delFilePaths; + } + // when there are more del files than the number that is allowed, merge it firstly. + int offset = 0; + List<Path> paths = new ArrayList<Path>(); + while (offset < delFilePaths.size()) { + // get the batch + int batch = compactionBatchSize; + if (delFilePaths.size() - offset < compactionBatchSize) { + batch = delFilePaths.size() - offset; + } + List<StoreFile> batchedDelFiles = new ArrayList<StoreFile>(); + if (batch == 1) { + // only one file left, do not compact it, directly add it to the new files. + paths.add(delFilePaths.get(offset)); + offset++; + continue; + } + for (int i = offset; i < batch + offset; i++) { + batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig, + BloomType.NONE)); + } + // compact the del files in a batch. + paths.add(compactDelFilesInBatch(request, batchedDelFiles)); + // move to the next batch. + offset += batch; + } + return compactDelFiles(request, paths); + } + + /** + * Compacts the del file in a batch. + * @param request The compaction request. + * @param delFiles The del files. + * @return The path of new del file after merging. + * @throws IOException + */ + private Path compactDelFilesInBatch(PartitionedMobFileCompactionRequest request, + List<StoreFile> delFiles) throws IOException { + // create a scanner for the del files. + StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES); + Writer writer = null; + Path filePath = null; + try { + writer = MobUtils.createDelFileWriter(conf, fs, column, + MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE, + column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig); + filePath = writer.getPath(); + List<Cell> cells = new ArrayList<Cell>(); + boolean hasMore = false; + do { + hasMore = scanner.next(cells, compactionKVMax); + for (Cell cell : cells) { + // TODO remove this after the new code are introduced. + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + writer.append(kv); + } + cells.clear(); + } while (hasMore); + } finally { + scanner.close(); + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the file " + filePath, e); + } + } + } + // commit the new del file + Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); + // archive the old del files + try { + MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles); + } catch (IOException e) { + LOG.error("Failed to archive the old del files " + delFiles, e); + } + return path; + } + + /** + * Creates a store scanner. + * @param filesToCompact The files to be compacted. + * @param scanType The scan type. + * @return The store scanner. + * @throws IOException + */ + private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType) + throws IOException { + List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false, + null, HConstants.LATEST_TIMESTAMP); + Scan scan = new Scan(); + scan.setMaxVersions(column.getMaxVersions()); + long ttl = HStore.determineTTLFromFamily(column); + ScanInfo scanInfo = new ScanInfo(column, ttl, 0, KeyValue.COMPARATOR); + StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L, + HConstants.LATEST_TIMESTAMP); + return scanner; + } + + /** + * Bulkloads the current file. + * @param table The current table. + * @param bulkloadDirectory The path of bulkload directory. + * @param fileName The current file name. + * @throws IOException + */ + private void bulkloadRefFile(HTable table, Path bulkloadDirectory, String fileName) + throws IOException { + // bulkload the ref file + try { + LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); + bulkload.doBulkLoad(bulkloadDirectory, table); + } catch (Exception e) { + // delete the committed mob file + deletePath(new Path(mobFamilyDir, fileName)); + throw new IOException(e); + } finally { + // delete the bulkload files in bulkloadPath + deletePath(bulkloadDirectory); + } + } + + /** + * Closes the mob file writer. + * @param writer The mob file writer. + * @param maxSeqId Maximum sequence id. + * @param mobCellsCount The number of mob cells. + * @throws IOException + */ + private void closeMobFileWriter(Writer writer, long maxSeqId, long mobCellsCount) + throws IOException { + if (writer != null) { + writer.appendMetadata(maxSeqId, false, mobCellsCount); + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the file " + writer.getPath(), e); + } + } + } + + /** + * Closes the ref file writer. + * @param writer The ref file writer. + * @param maxSeqId Maximum sequence id. + * @param bulkloadTime The timestamp at which the bulk load file is created. + * @throws IOException + */ + private void closeRefFileWriter(Writer writer, long maxSeqId, long bulkloadTime) + throws IOException { + if (writer != null) { + writer.appendMetadata(maxSeqId, false); + writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime)); + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e); + } + } + } + + /** + * Gets the max seqId and number of cells of the store files. + * @param storeFiles The store files. + * @return The pair of the max seqId and number of cells of the store files. + * @throws IOException + */ + private Pair<Long, Long> getFileInfo(List<StoreFile> storeFiles) throws IOException { + long maxSeqId = 0; + long maxKeyCount = 0; + for (StoreFile sf : storeFiles) { + // the readers will be closed later after the merge. + maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId()); + byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT); + if (count != null) { + maxKeyCount += Bytes.toLong(count); + } + } + return new Pair<Long, Long>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount)); + } + + /** + * Deletes a file. + * @param path The path of the file to be deleted. + */ + private void deletePath(Path path) { + try { + if (path != null) { + fs.delete(path, true); + } + } catch (IOException e) { + LOG.error("Failed to delete the file " + path, e); + } + } + + private FileStatus getLinkedFileStatus(HFileLink link) throws IOException { + Path[] locations = link.getLocations(); + for (Path location : locations) { + FileStatus file = getFileStatus(location); + if (file != null) { + return file; + } + } + return null; + } + + private FileStatus getFileStatus(Path path) throws IOException { + try { + if (path != null) { + FileStatus file = fs.getFileStatus(path); + return file; + } + } catch (FileNotFoundException e) { + LOG.warn("The file " + path + " can not be found", e); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java index 56e5726,0000000..559d6db mode 100644,000000..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java @@@ -1,85 -1,0 +1,87 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; ++import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; ++import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.Text; +import org.apache.zookeeper.KeeperException; + +/** + * The mapper of a sweep job. + * Takes the rows from the table and their results and map to <filename:Text, mobValue:KeyValue> + * where mobValue is the actual cell in HBase. + */ [email protected] +public class SweepMapper extends TableMapper<Text, KeyValue> { + + private ZooKeeperWatcher zkw = null; + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + String id = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID); + String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME); + String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE); + zkw = new ZooKeeperWatcher(context.getConfiguration(), id, + new DummyMobAbortable()); + try { + SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner); + tracker.start(); + } catch (KeeperException e) { + throw new IOException(e); + } + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + if (zkw != null) { + zkw.close(); + } + } + + @Override + public void map(ImmutableBytesWritable r, Result columns, Context context) throws IOException, + InterruptedException { + if (columns == null) { + return; + } - KeyValue[] kvList = columns.raw(); - if (kvList == null || kvList.length == 0) { ++ Cell[] cells = columns.rawCells(); ++ if (cells == null || cells.length == 0) { + return; + } - for (KeyValue kv : kvList) { - if (MobUtils.hasValidMobRefCellValue(kv)) { - String fileName = MobUtils.getMobFileName(kv); - context.write(new Text(fileName), kv); ++ for (Cell c : cells) { ++ if (MobUtils.hasValidMobRefCellValue(c)) { ++ String fileName = MobUtils.getMobFileName(c); ++ context.write(new Text(fileName), KeyValueUtil.ensureKeyValue(c)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~HEAD ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~HEAD index 0000000,0000000..6b954ac new file mode 100644 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~HEAD @@@ -1,0 -1,0 +1,54 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hbase.quotas; ++ ++import java.io.IOException; ++ ++import org.apache.hadoop.classification.InterfaceAudience; ++import org.apache.hadoop.hbase.HRegionInfo; ++ ++/** ++ * The listener interface for receiving region state events. ++ */ [email protected] ++public interface RegionStateListener { ++ ++ /** ++ * Process region split event. ++ * ++ * @param hri An instance of HRegionInfo ++ * @throws IOException ++ */ ++ void onRegionSplit(HRegionInfo hri) throws IOException; ++ ++ /** ++ * Process region split reverted event. ++ * ++ * @param hri An instance of HRegionInfo ++ * @throws IOException Signals that an I/O exception has occurred. ++ */ ++ void onRegionSplitReverted(HRegionInfo hri) throws IOException; ++ ++ /** ++ * Process region merge event. ++ * ++ * @param hri An instance of HRegionInfo ++ * @throws IOException ++ */ ++ void onRegionMerged(HRegionInfo hri) throws IOException; ++} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~HEAD_0 ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~HEAD_0 index 0000000,0000000..6b954ac new file mode 100644 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~HEAD_0 @@@ -1,0 -1,0 +1,54 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hbase.quotas; ++ ++import java.io.IOException; ++ ++import org.apache.hadoop.classification.InterfaceAudience; ++import org.apache.hadoop.hbase.HRegionInfo; ++ ++/** ++ * The listener interface for receiving region state events. ++ */ [email protected] ++public interface RegionStateListener { ++ ++ /** ++ * Process region split event. ++ * ++ * @param hri An instance of HRegionInfo ++ * @throws IOException ++ */ ++ void onRegionSplit(HRegionInfo hri) throws IOException; ++ ++ /** ++ * Process region split reverted event. ++ * ++ * @param hri An instance of HRegionInfo ++ * @throws IOException Signals that an I/O exception has occurred. ++ */ ++ void onRegionSplitReverted(HRegionInfo hri) throws IOException; ++ ++ /** ++ * Process region merge event. ++ * ++ * @param hri An instance of HRegionInfo ++ * @throws IOException ++ */ ++ void onRegionMerged(HRegionInfo hri) throws IOException; ++} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~jon_master ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~jon_master index 0000000,0000000..6b954ac new file mode 100644 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~jon_master @@@ -1,0 -1,0 +1,54 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hbase.quotas; ++ ++import java.io.IOException; ++ ++import org.apache.hadoop.classification.InterfaceAudience; ++import org.apache.hadoop.hbase.HRegionInfo; ++ ++/** ++ * The listener interface for receiving region state events. ++ */ [email protected] ++public interface RegionStateListener { ++ ++ /** ++ * Process region split event. ++ * ++ * @param hri An instance of HRegionInfo ++ * @throws IOException ++ */ ++ void onRegionSplit(HRegionInfo hri) throws IOException; ++ ++ /** ++ * Process region split reverted event. ++ * ++ * @param hri An instance of HRegionInfo ++ * @throws IOException Signals that an I/O exception has occurred. ++ */ ++ void onRegionSplitReverted(HRegionInfo hri) throws IOException; ++ ++ /** ++ * Process region merge event. ++ * ++ * @param hri An instance of HRegionInfo ++ * @throws IOException ++ */ ++ void onRegionMerged(HRegionInfo hri) throws IOException; ++} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~master ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~master index 0000000,0000000..6b954ac new file mode 100644 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~master @@@ -1,0 -1,0 +1,54 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hbase.quotas; ++ ++import java.io.IOException; ++ ++import org.apache.hadoop.classification.InterfaceAudience; ++import org.apache.hadoop.hbase.HRegionInfo; ++ ++/** ++ * The listener interface for receiving region state events. ++ */ [email protected] ++public interface RegionStateListener { ++ ++ /** ++ * Process region split event. ++ * ++ * @param hri An instance of HRegionInfo ++ * @throws IOException ++ */ ++ void onRegionSplit(HRegionInfo hri) throws IOException; ++ ++ /** ++ * Process region split reverted event. ++ * ++ * @param hri An instance of HRegionInfo ++ * @throws IOException Signals that an I/O exception has occurred. ++ */ ++ void onRegionSplitReverted(HRegionInfo hri) throws IOException; ++ ++ /** ++ * Process region merge event. ++ * ++ * @param hri An instance of HRegionInfo ++ * @throws IOException ++ */ ++ void onRegionMerged(HRegionInfo hri) throws IOException; ++} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index 4afa80c,3c1345d..d55822d --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@@ -62,29 -64,19 +64,30 @@@ public class DefaultStoreEngine extend @Override protected void createComponents( Configuration conf, Store store, KVComparator kvComparator) throws IOException { - storeFileManager = new DefaultStoreFileManager(kvComparator, conf); + createCompactor(conf, store); + createCompactionPolicy(conf, store); + createStoreFlusher(conf, store); ++ storeFileManager = new DefaultStoreFileManager(kvComparator, conf, compactionPolicy.getConf()); ++ + } + + protected void createCompactor(Configuration conf, Store store) throws IOException { String className = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DEFAULT_COMPACTOR_CLASS.getName()); try { compactor = ReflectionUtils.instantiateWithCustomCtor(className, -- new Class[] { Configuration.class, Store.class }, new Object[] { conf, store }); ++ new Class[]{Configuration.class, Store.class}, new Object[]{conf, store}); } catch (Exception e) { throw new IOException("Unable to load configured compactor '" + className + "'", e); } - className = conf.get( - DEFAULT_COMPACTION_POLICY_CLASS_KEY, DEFAULT_COMPACTION_POLICY_CLASS.getName()); + } + + protected void createCompactionPolicy(Configuration conf, Store store) throws IOException { + String className = conf.get( - DEFAULT_COMPACTION_POLICY_CLASS_KEY, DEFAULT_COMPACTION_POLICY_CLASS.getName()); ++ DEFAULT_COMPACTION_POLICY_CLASS_KEY, DEFAULT_COMPACTION_POLICY_CLASS.getName()); try { compactionPolicy = ReflectionUtils.instantiateWithCustomCtor(className, -- new Class[] { Configuration.class, StoreConfigInformation.class }, -- new Object[] { conf, store }); ++ new Class[]{Configuration.class, StoreConfigInformation.class}, ++ new Object[]{conf, store}); } catch (Exception e) { throw new IOException("Unable to load configured compaction policy '" + className + "'", e); } @@@ -101,7 -91,7 +104,6 @@@ } } -- @Override public CompactionContext createCompaction() { return new DefaultCompactionContext();
