http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java index 718b513,0000000..f02da48 mode 100644,000000..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java @@@ -1,643 -1,0 +1,636 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.filecompactions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; - import org.apache.hadoop.hbase.Cell; - import org.apache.hadoop.hbase.HColumnDescriptor; - import org.apache.hadoop.hbase.HConstants; - import org.apache.hadoop.hbase.KeyValue; - import org.apache.hadoop.hbase.KeyValueUtil; - import org.apache.hadoop.hbase.TableName; - import org.apache.hadoop.hbase.Tag; - import org.apache.hadoop.hbase.TagType; ++import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType; +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition; +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartitionId; +import org.apache.hadoop.hbase.regionserver.*; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +/** + * An implementation of {@link MobFileCompactor} that compacts the mob files in partitions. + */ [email protected] +public class PartitionedMobFileCompactor extends MobFileCompactor { + + private static final Log LOG = LogFactory.getLog(PartitionedMobFileCompactor.class); + protected long mergeableSize; + protected int delFileMaxCount; + /** The number of files compacted in a batch */ + protected int compactionBatchSize; + protected int compactionKVMax; + + private Path tempPath; + private Path bulkloadPath; + private CacheConfig compactionCacheConfig; + private Tag tableNameTag; + + public PartitionedMobFileCompactor(Configuration conf, FileSystem fs, TableName tableName, + HColumnDescriptor column, ExecutorService pool) { + super(conf, fs, tableName, column, pool); + mergeableSize = conf.getLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD); + delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT, + MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); + // default is 100 + compactionBatchSize = conf.getInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE); + tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME); + bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path( + tableName.getNamespaceAsString(), tableName.getQualifierAsString()))); + compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, + HConstants.COMPACTION_KV_MAX_DEFAULT); + Configuration copyOfConf = new Configuration(conf); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + compactionCacheConfig = new CacheConfig(copyOfConf); + tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName()); + } + + @Override + public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles) throws IOException { + if (files == null || files.isEmpty()) { + LOG.info("No candidate mob files"); + return null; + } + LOG.info("isForceAllFiles: " + isForceAllFiles); + // find the files to compact. + PartitionedMobFileCompactionRequest request = select(files, isForceAllFiles); + // compact the files. + return performCompaction(request); + } + + /** + * Selects the compacted mob/del files. + * Iterates the candidates to find out all the del files and small mob files. + * @param candidates All the candidates. + * @param isForceAllFiles Whether add all mob files into the compaction. + * @return A compaction request. + * @throws IOException + */ + protected PartitionedMobFileCompactionRequest select(List<FileStatus> candidates, + boolean isForceAllFiles) throws IOException { + Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>(); + Map<CompactionPartitionId, CompactionPartition> filesToCompact = + new HashMap<CompactionPartitionId, CompactionPartition>(); + int selectedFileCount = 0; + int irrelevantFileCount = 0; + for (FileStatus file : candidates) { + if (!file.isFile()) { + irrelevantFileCount++; + continue; + } + // group the del files and small files. + FileStatus linkedFile = file; + if (HFileLink.isHFileLink(file.getPath())) { + HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); + linkedFile = getLinkedFileStatus(link); + if (linkedFile == null) { + // If the linked file cannot be found, regard it as an irrelevantFileCount file + irrelevantFileCount++; + continue; + } + } + if (StoreFileInfo.isDelFile(linkedFile.getPath())) { + allDelFiles.add(file); + } else if (isForceAllFiles || linkedFile.getLen() < mergeableSize) { + // add all files if isForceAllFiles is true, + // otherwise add the small files to the merge pool + MobFileName fileName = MobFileName.create(linkedFile.getPath().getName()); + CompactionPartitionId id = new CompactionPartitionId(fileName.getStartKey(), + fileName.getDate()); + CompactionPartition compactionPartition = filesToCompact.get(id); + if (compactionPartition == null) { + compactionPartition = new CompactionPartition(id); + compactionPartition.addFile(file); + filesToCompact.put(id, compactionPartition); + } else { + compactionPartition.addFile(file); + } + selectedFileCount++; + } + } + PartitionedMobFileCompactionRequest request = new PartitionedMobFileCompactionRequest( + filesToCompact.values(), allDelFiles); + if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) { + // all the files are selected + request.setCompactionType(CompactionType.ALL_FILES); + } + LOG.info("The compaction type is " + request.getCompactionType() + ", the request has " + + allDelFiles.size() + " del files, " + selectedFileCount + " selected files, and " + + irrelevantFileCount + " irrelevant files"); + return request; + } + + /** + * Performs the compaction on the selected files. + * <ol> + * <li>Compacts the del files.</li> + * <li>Compacts the selected small mob files and all the del files.</li> + * <li>If all the candidates are selected, delete the del files.</li> + * </ol> + * @param request The compaction request. + * @return The paths of new mob files generated in the compaction. + * @throws IOException + */ + protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request) + throws IOException { + // merge the del files + List<Path> delFilePaths = new ArrayList<Path>(); + for (FileStatus delFile : request.delFiles) { + delFilePaths.add(delFile.getPath()); + } + List<Path> newDelPaths = compactDelFiles(request, delFilePaths); + List<StoreFile> newDelFiles = new ArrayList<StoreFile>(); + for (Path newDelPath : newDelPaths) { + StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE); + newDelFiles.add(sf); + } + LOG.info("After merging, there are " + newDelFiles.size() + " del files"); + // compact the mob files by partitions. + List<Path> paths = compactMobFiles(request, newDelFiles); + LOG.info("After compaction, there are " + paths.size() + " mob files"); + // archive the del files if all the mob files are selected. + if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) { + LOG.info("After a mob file compaction with all files selected, archiving the del files " + + newDelFiles); + try { + MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles); + } catch (IOException e) { + LOG.error("Failed to archive the del files " + newDelFiles, e); + } + } + return paths; + } + + /** + * Compacts the selected small mob files and all the del files. + * @param request The compaction request. + * @param delFiles The del files. + * @return The paths of new mob files after compactions. + * @throws IOException + */ + protected List<Path> compactMobFiles(final PartitionedMobFileCompactionRequest request, + final List<StoreFile> delFiles) throws IOException { + Collection<CompactionPartition> partitions = request.compactionPartitions; + if (partitions == null || partitions.isEmpty()) { + LOG.info("No partitions of mob files"); + return Collections.emptyList(); + } + List<Path> paths = new ArrayList<Path>(); + Connection c = ConnectionFactory.createConnection(conf); + final Table table = c.getTable(tableName); + try { + Map<CompactionPartitionId, Future<List<Path>>> results = + new HashMap<CompactionPartitionId, Future<List<Path>>>(); + // compact the mob files by partitions in parallel. + for (final CompactionPartition partition : partitions) { + results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() { + @Override + public List<Path> call() throws Exception { + LOG.info("Compacting mob files for partition " + partition.getPartitionId()); + return compactMobFilePartition(request, partition, delFiles, table); + } + })); + } + // compact the partitions in parallel. + boolean hasFailure = false; + for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) { + try { + paths.addAll(result.getValue().get()); + } catch (Exception e) { + // just log the error + LOG.error("Failed to compact the partition " + result.getKey(), e); + hasFailure = true; + } + } + if (hasFailure) { + // if any partition fails in the compaction, directly throw an exception. + throw new IOException("Failed to compact the partitions"); + } + } finally { + try { + table.close(); + } catch (IOException e) { + LOG.error("Failed to close the HTable", e); + } + } + return paths; + } + + /** + * Compacts a partition of selected small mob files and all the del files. + * @param request The compaction request. + * @param partition A compaction partition. + * @param delFiles The del files. + * @param table The current table. + * @return The paths of new mob files after compactions. + * @throws IOException + */ + private List<Path> compactMobFilePartition(PartitionedMobFileCompactionRequest request, + CompactionPartition partition, List<StoreFile> delFiles, Table table) throws IOException { + List<Path> newFiles = new ArrayList<Path>(); + List<FileStatus> files = partition.listFiles(); + int offset = 0; + Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString()); + Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString()); + while (offset < files.size()) { + int batch = compactionBatchSize; + if (files.size() - offset < compactionBatchSize) { + batch = files.size() - offset; + } + if (batch == 1 && delFiles.isEmpty()) { + // only one file left and no del files, do not compact it, + // and directly add it to the new files. + newFiles.add(files.get(offset).getPath()); + offset++; + continue; + } + // clean the bulkload directory to avoid loading old files. + fs.delete(bulkloadPathOfPartition, true); + // add the selected mob files and del files into filesToCompact + List<StoreFile> filesToCompact = new ArrayList<StoreFile>(); + for (int i = offset; i < batch + offset; i++) { + StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig, + BloomType.NONE); + filesToCompact.add(sf); + } + filesToCompact.addAll(delFiles); + // compact the mob files in a batch. + compactMobFilesInBatch(request, partition, table, filesToCompact, batch, + bulkloadPathOfPartition, bulkloadColumnPath, newFiles); + // move to the next batch. + offset += batch; + } + LOG.info("Compaction is finished. The number of mob files is changed from " + files.size() + + " to " + newFiles.size()); + return newFiles; + } + + /** + * Compacts a partition of selected small mob files and all the del files in a batch. + * @param request The compaction request. + * @param partition A compaction partition. + * @param table The current table. + * @param filesToCompact The files to be compacted. + * @param batch The number of mob files to be compacted in a batch. + * @param bulkloadPathOfPartition The directory where the bulkload column of the current + * partition is saved. + * @param bulkloadColumnPath The directory where the bulkload files of current partition + * are saved. + * @param newFiles The paths of new mob files after compactions. + * @throws IOException + */ + private void compactMobFilesInBatch(PartitionedMobFileCompactionRequest request, + CompactionPartition partition, Table table, List<StoreFile> filesToCompact, int batch, + Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles) + throws IOException { + // open scanner to the selected mob files and del files. + StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES); + // the mob files to be compacted, not include the del files. + List<StoreFile> mobFilesToCompact = filesToCompact.subList(0, batch); + // Pair(maxSeqId, cellsCount) + Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact); + // open writers for the mob files and new ref store files. + Writer writer = null; + Writer refFileWriter = null; + Path filePath = null; + Path refFilePath = null; + long mobCells = 0; + try { + writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(), + tempPath, Long.MAX_VALUE, column.getCompactionCompression(), partition.getPartitionId() + .getStartKey(), compactionCacheConfig); + filePath = writer.getPath(); + byte[] fileName = Bytes.toBytes(filePath.getName()); + // create a temp file and open a writer for it in the bulkloadPath + refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo + .getSecond().longValue(), compactionCacheConfig); + refFilePath = refFileWriter.getPath(); + List<Cell> cells = new ArrayList<Cell>(); + boolean hasMore = false; + ScannerContext scannerContext = + ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + do { + hasMore = scanner.next(cells, scannerContext); + for (Cell cell : cells) { + // TODO remove this after the new code are introduced. + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + // write the mob cell to the mob file. + writer.append(kv); + // write the new reference cell to the store file. + KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag); + refFileWriter.append(reference); + mobCells++; + } + cells.clear(); + } while (hasMore); + } finally { + // close the scanner. + scanner.close(); + // append metadata to the mob file, and close the mob file writer. + closeMobFileWriter(writer, fileInfo.getFirst(), mobCells); + // append metadata and bulkload info to the ref mob file, and close the writer. + closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime); + } + if (mobCells > 0) { + // commit mob file + MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); + // bulkload the ref file + bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName()); + newFiles.add(new Path(mobFamilyDir, filePath.getName())); + } else { + // remove the new files + // the mob file is empty, delete it instead of committing. + deletePath(filePath); + // the ref file is empty, delete it instead of committing. + deletePath(refFilePath); + } + // archive the old mob files, do not archive the del files. + try { + MobUtils + .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact); + } catch (IOException e) { + LOG.error("Failed to archive the files " + mobFilesToCompact, e); + } + } + + /** + * Compacts the del files in batches which avoids opening too many files. + * @param request The compaction request. + * @param delFilePaths + * @return The paths of new del files after merging or the original files if no merging + * is necessary. + * @throws IOException + */ + protected List<Path> compactDelFiles(PartitionedMobFileCompactionRequest request, + List<Path> delFilePaths) throws IOException { + if (delFilePaths.size() <= delFileMaxCount) { + return delFilePaths; + } + // when there are more del files than the number that is allowed, merge it firstly. + int offset = 0; + List<Path> paths = new ArrayList<Path>(); + while (offset < delFilePaths.size()) { + // get the batch + int batch = compactionBatchSize; + if (delFilePaths.size() - offset < compactionBatchSize) { + batch = delFilePaths.size() - offset; + } + List<StoreFile> batchedDelFiles = new ArrayList<StoreFile>(); + if (batch == 1) { + // only one file left, do not compact it, directly add it to the new files. + paths.add(delFilePaths.get(offset)); + offset++; + continue; + } + for (int i = offset; i < batch + offset; i++) { + batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig, + BloomType.NONE)); + } + // compact the del files in a batch. + paths.add(compactDelFilesInBatch(request, batchedDelFiles)); + // move to the next batch. + offset += batch; + } + return compactDelFiles(request, paths); + } + + /** + * Compacts the del file in a batch. + * @param request The compaction request. + * @param delFiles The del files. + * @return The path of new del file after merging. + * @throws IOException + */ + private Path compactDelFilesInBatch(PartitionedMobFileCompactionRequest request, + List<StoreFile> delFiles) throws IOException { + // create a scanner for the del files. + StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES); + Writer writer = null; + Path filePath = null; + try { + writer = MobUtils.createDelFileWriter(conf, fs, column, + MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE, + column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig); + filePath = writer.getPath(); + List<Cell> cells = new ArrayList<Cell>(); + boolean hasMore = false; + ScannerContext scannerContext = + ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + do { + hasMore = scanner.next(cells, scannerContext); + for (Cell cell : cells) { + // TODO remove this after the new code are introduced. + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + writer.append(kv); + } + cells.clear(); + } while (hasMore); + } finally { + scanner.close(); + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the file " + filePath, e); + } + } + } + // commit the new del file + Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); + // archive the old del files + try { + MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles); + } catch (IOException e) { + LOG.error("Failed to archive the old del files " + delFiles, e); + } + return path; + } + + /** + * Creates a store scanner. + * @param filesToCompact The files to be compacted. + * @param scanType The scan type. + * @return The store scanner. + * @throws IOException + */ + private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType) + throws IOException { + List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false, + null, HConstants.LATEST_TIMESTAMP); + Scan scan = new Scan(); + scan.setMaxVersions(column.getMaxVersions()); + long ttl = HStore.determineTTLFromFamily(column); - ScanInfo scanInfo = new ScanInfo(column, ttl, 0, KeyValue.COMPARATOR); ++ ScanInfo scanInfo = new ScanInfo(column, ttl, 0, CellComparator.COMPARATOR); + StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L, + HConstants.LATEST_TIMESTAMP); + return scanner; + } + + /** + * Bulkloads the current file. + * @param table The current table. + * @param bulkloadDirectory The path of bulkload directory. + * @param fileName The current file name. + * @throws IOException + */ + private void bulkloadRefFile(Table table, Path bulkloadDirectory, String fileName) + throws IOException { + // bulkload the ref file + try { + LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); + bulkload.doBulkLoad(bulkloadDirectory, (HTable)table); + } catch (Exception e) { + // delete the committed mob file + deletePath(new Path(mobFamilyDir, fileName)); + throw new IOException(e); + } finally { + // delete the bulkload files in bulkloadPath + deletePath(bulkloadDirectory); + } + } + + /** + * Closes the mob file writer. + * @param writer The mob file writer. + * @param maxSeqId Maximum sequence id. + * @param mobCellsCount The number of mob cells. + * @throws IOException + */ + private void closeMobFileWriter(Writer writer, long maxSeqId, long mobCellsCount) + throws IOException { + if (writer != null) { + writer.appendMetadata(maxSeqId, false, mobCellsCount); + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the file " + writer.getPath(), e); + } + } + } + + /** + * Closes the ref file writer. + * @param writer The ref file writer. + * @param maxSeqId Maximum sequence id. + * @param bulkloadTime The timestamp at which the bulk load file is created. + * @throws IOException + */ + private void closeRefFileWriter(Writer writer, long maxSeqId, long bulkloadTime) + throws IOException { + if (writer != null) { + writer.appendMetadata(maxSeqId, false); + writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime)); + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e); + } + } + } + + /** + * Gets the max seqId and number of cells of the store files. + * @param storeFiles The store files. + * @return The pair of the max seqId and number of cells of the store files. + * @throws IOException + */ + private Pair<Long, Long> getFileInfo(List<StoreFile> storeFiles) throws IOException { + long maxSeqId = 0; + long maxKeyCount = 0; + for (StoreFile sf : storeFiles) { + // the readers will be closed later after the merge. + maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId()); + byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT); + if (count != null) { + maxKeyCount += Bytes.toLong(count); + } + } + return new Pair<Long, Long>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount)); + } + + /** + * Deletes a file. + * @param path The path of the file to be deleted. + */ + private void deletePath(Path path) { + try { + if (path != null) { + fs.delete(path, true); + } + } catch (IOException e) { + LOG.error("Failed to delete the file " + path, e); + } + } + + private FileStatus getLinkedFileStatus(HFileLink link) throws IOException { + Path[] locations = link.getLocations(); + for (Path location : locations) { + FileStatus file = getFileStatus(location); + if (file != null) { + return file; + } + } + return null; + } + + private FileStatus getFileStatus(Path path) throws IOException { + try { + if (path != null) { + FileStatus file = fs.getFileStatus(path); + return file; + } + } catch (FileNotFoundException e) { + LOG.warn("The file " + path + " can not be found", e); + } + return null; + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index d55822d,519edde..04d2b13 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@@ -63,15 -63,7 +63,15 @@@ public class DefaultStoreEngine extend @Override protected void createComponents( - Configuration conf, Store store, KVComparator kvComparator) throws IOException { + Configuration conf, Store store, CellComparator kvComparator) throws IOException { + createCompactor(conf, store); + createCompactionPolicy(conf, store); + createStoreFlusher(conf, store); + storeFileManager = new DefaultStoreFileManager(kvComparator, conf, compactionPolicy.getConf()); + + } + + protected void createCompactor(Configuration conf, Store store) throws IOException { String className = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DEFAULT_COMPACTOR_CLASS.getName()); try { compactor = ReflectionUtils.instantiateWithCustomCtor(className, http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index b4d2213,0000000..e4bdc74 mode 100644,000000..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@@ -1,560 -1,0 +1,558 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + ++import org.apache.commons.logging.Log; ++import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; - import org.apache.hadoop.hbase.Cell; - import org.apache.hadoop.hbase.HColumnDescriptor; - import org.apache.hadoop.hbase.HConstants; - import org.apache.hadoop.hbase.KeyValue; ++import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.KeyValue.Type; - import org.apache.hadoop.hbase.TableName; - import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.mob.MobCacheConfig; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFile; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobStoreEngine; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.util.Bytes; ++import org.apache.hadoop.hbase.util.ChecksumType; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.util.IdLock; + +/** + * The store implementation to save MOBs (medium objects), it extends the HStore. + * When a descriptor of a column family has the value "IS_MOB", it means this column family + * is a mob one. When a HRegion instantiate a store for this column family, the HMobStore is + * created. + * HMobStore is almost the same with the HStore except using different types of scanners. + * In the method of getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. + * In these scanners, a additional seeks in the mob files should be performed after the seek + * to HBase is done. + * The store implements how we save MOBs by extending HStore. When a descriptor + * of a column family has the value "IS_MOB", it means this column family is a mob one. When a + * HRegion instantiate a store for this column family, the HMobStore is created. HMobStore is + * almost the same with the HStore except using different types of scanners. In the method of + * getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. In these scanners, a + * additional seeks in the mob files should be performed after the seek in HBase is done. + */ [email protected] +public class HMobStore extends HStore { - ++ private static final Log LOG = LogFactory.getLog(HMobStore.class); + private MobCacheConfig mobCacheConfig; + private Path homePath; + private Path mobFamilyPath; + private volatile long mobCompactedIntoMobCellsCount = 0; + private volatile long mobCompactedFromMobCellsCount = 0; + private volatile long mobCompactedIntoMobCellsSize = 0; + private volatile long mobCompactedFromMobCellsSize = 0; + private volatile long mobFlushCount = 0; + private volatile long mobFlushedCellsCount = 0; + private volatile long mobFlushedCellsSize = 0; + private volatile long mobScanCellsCount = 0; + private volatile long mobScanCellsSize = 0; + private HColumnDescriptor family; + private TableLockManager tableLockManager; + private TableName tableLockName; + private Map<String, List<Path>> map = new ConcurrentHashMap<String, List<Path>>(); + private final IdLock keyLock = new IdLock(); + + public HMobStore(final HRegion region, final HColumnDescriptor family, + final Configuration confParam) throws IOException { + super(region, family, confParam); + this.family = family; + this.mobCacheConfig = (MobCacheConfig) cacheConf; + this.homePath = MobUtils.getMobHome(conf); + this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(), + family.getNameAsString()); + List<Path> locations = new ArrayList<Path>(2); + locations.add(mobFamilyPath); + TableName tn = region.getTableDesc().getTableName(); + locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils.getMobRegionInfo(tn) + .getEncodedName(), family.getNameAsString())); + map.put(Bytes.toString(tn.getName()), locations); + if (region.getRegionServerServices() != null) { + tableLockManager = region.getRegionServerServices().getTableLockManager(); + tableLockName = MobUtils.getTableLockName(getTableName()); + } + } + + /** + * Creates the mob cache config. + */ + @Override + protected void createCacheConf(HColumnDescriptor family) { + cacheConf = new MobCacheConfig(conf, family); + } + + /** + * Gets current config. + */ + public Configuration getConfiguration() { + return this.conf; + } + + /** + * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in + * the mob files should be performed after the seek in HBase is done. + */ + @Override + protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols, + long readPt, KeyValueScanner scanner) throws IOException { + if (scanner == null) { + if (MobUtils.isRefOnlyScan(scan)) { + Filter refOnlyFilter = new MobReferenceOnlyFilter(); + Filter filter = scan.getFilter(); + if (filter != null) { + scan.setFilter(new FilterList(filter, refOnlyFilter)); + } else { + scan.setFilter(refOnlyFilter); + } + } + scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan, + targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt); + } + return scanner; + } + + /** + * Creates the mob store engine. + */ + @Override + protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf, - KVComparator kvComparator) throws IOException { ++ CellComparator cellComparator) throws IOException { + MobStoreEngine engine = new MobStoreEngine(); - engine.createComponents(conf, store, kvComparator); ++ engine.createComponents(conf, store, cellComparator); + return engine; + } + + /** + * Gets the temp directory. + * @return The temp directory. + */ + private Path getTempDir() { + return new Path(homePath, MobConstants.TEMP_DIR_NAME); + } + + /** + * Creates the writer for the mob file in temp directory. + * @param date The latest date of written cells. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @return The writer for the mob file. + * @throws IOException + */ + public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey) throws IOException { + if (startKey == null) { + startKey = HConstants.EMPTY_START_ROW; + } + Path path = getTempDir(); + return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey); + } + + /** + * Creates the writer for the del file in temp directory. + * The del file keeps tracking the delete markers. Its name has a suffix _del, + * the format is [0-9a-f]+(_del)?. + * @param date The latest date of written cells. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @return The writer for the del file. + * @throws IOException + */ + public StoreFile.Writer createDelFileWriterInTmp(Date date, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey) throws IOException { + if (startKey == null) { + startKey = HConstants.EMPTY_START_ROW; + } + Path path = getTempDir(); + String suffix = UUID + .randomUUID().toString().replaceAll("-", "") + "_del"; + MobFileName mobFileName = MobFileName.create(startKey, MobUtils.formatDate(date), suffix); + return createWriterInTmp(mobFileName, path, maxKeyCount, compression); + } + + /** + * Creates the writer for the mob file in temp directory. + * @param date The date string, its format is yyyymmmdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @return The writer for the mob file. + * @throws IOException + */ + public StoreFile.Writer createWriterInTmp(String date, Path basePath, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey) throws IOException { + MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID() + .toString().replaceAll("-", "")); + return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression); + } + + /** + * Creates the writer for the mob file in temp directory. + * @param mobFileName The mob file name. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @return The writer for the mob file. + * @throws IOException + */ + public StoreFile.Writer createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount, + Compression.Algorithm compression) throws IOException { + final CacheConfig writerCacheConf = mobCacheConfig; + HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) + .withIncludesMvcc(true).withIncludesTags(true) - .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE) ++ .withChecksumType(ChecksumType.getDefaultChecksumType()) + .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) + .withBlockSize(getFamily().getBlocksize()) + .withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding()) + .withEncryptionContext(cryptoContext).build(); + + StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, region.getFilesystem()) + .withFilePath(new Path(basePath, mobFileName.getFileName())) - .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE) ++ .withComparator(CellComparator.COMPARATOR).withBloomType(BloomType.NONE) + .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); + return w; + } + + /** + * Commits the mob file. + * @param sourceFile The source file. + * @param targetPath The directory path where the source file is renamed to. + * @throws IOException + */ + public void commitFile(final Path sourceFile, Path targetPath) throws IOException { + if (sourceFile == null) { + return; + } + Path dstPath = new Path(targetPath, sourceFile.getName()); + validateMobFile(sourceFile); + String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath; + LOG.info(msg); + Path parent = dstPath.getParent(); + if (!region.getFilesystem().exists(parent)) { + region.getFilesystem().mkdirs(parent); + } + if (!region.getFilesystem().rename(sourceFile, dstPath)) { + throw new IOException("Failed rename of " + sourceFile + " to " + dstPath); + } + } + + /** + * Validates a mob file by opening and closing it. + * + * @param path the path to the mob file + */ + private void validateMobFile(Path path) throws IOException { + StoreFile storeFile = null; + try { + storeFile = + new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE); + storeFile.createReader(); + } catch (IOException e) { + LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e); + throw e; + } finally { + if (storeFile != null) { + storeFile.closeReader(false); + } + } + } + + /** + * Reads the cell from the mob file, and the read point does not count. + * @param reference The cell found in the HBase, its value is a path to a mob file. + * @param cacheBlocks Whether the scanner should cache blocks. + * @return The cell found in the mob file. + * @throws IOException + */ + public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException { + return resolve(reference, cacheBlocks, -1); + } + + /** + * Reads the cell from the mob file. + * @param reference The cell found in the HBase, its value is a path to a mob file. + * @param cacheBlocks Whether the scanner should cache blocks. + * @param readPt the read point. + * @return The cell found in the mob file. + * @throws IOException + */ + public Cell resolve(Cell reference, boolean cacheBlocks, long readPt) throws IOException { + Cell result = null; + if (MobUtils.hasValidMobRefCellValue(reference)) { + String fileName = MobUtils.getMobFileName(reference); + Tag tableNameTag = MobUtils.getTableNameTag(reference); + if (tableNameTag != null) { + byte[] tableName = tableNameTag.getValue(); + String tableNameString = Bytes.toString(tableName); + List<Path> locations = map.get(tableNameString); + if (locations == null) { + IdLock.Entry lockEntry = keyLock.getLockEntry(tableNameString.hashCode()); + try { + locations = map.get(tableNameString); + if (locations == null) { + locations = new ArrayList<Path>(2); + TableName tn = TableName.valueOf(tableName); + locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString())); + locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils + .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString())); + map.put(tableNameString, locations); + } + } finally { + keyLock.releaseLockEntry(lockEntry); + } + } + result = readCell(locations, fileName, reference, cacheBlocks, readPt); + } + } + if (result == null) { + LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family," + + "qualifier,timestamp,type and tags but with an empty value to return."); + result = new KeyValue(reference.getRowArray(), reference.getRowOffset(), + reference.getRowLength(), reference.getFamilyArray(), reference.getFamilyOffset(), + reference.getFamilyLength(), reference.getQualifierArray(), + reference.getQualifierOffset(), reference.getQualifierLength(), reference.getTimestamp(), + Type.codeToType(reference.getTypeByte()), HConstants.EMPTY_BYTE_ARRAY, + 0, 0, reference.getTagsArray(), reference.getTagsOffset(), + reference.getTagsLength()); + } + return result; + } + + /** + * Reads the cell from a mob file. + * The mob file might be located in different directories. + * 1. The working directory. + * 2. The archive directory. + * Reads the cell from the files located in both of the above directories. + * @param locations The possible locations where the mob files are saved. + * @param fileName The file to be read. + * @param search The cell to be searched. + * @param cacheMobBlocks Whether the scanner should cache blocks. + * @param readPt the read point. + * @return The found cell. Null if there's no such a cell. + * @throws IOException + */ + private Cell readCell(List<Path> locations, String fileName, Cell search, boolean cacheMobBlocks, + long readPt) throws IOException { + FileSystem fs = getFileSystem(); + for (Path location : locations) { + MobFile file = null; + Path path = new Path(location, fileName); + try { + file = mobCacheConfig.getMobFileCache().openFile(fs, path, mobCacheConfig); + return readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt) : file.readCell(search, + cacheMobBlocks); + } catch (IOException e) { + mobCacheConfig.getMobFileCache().evictFile(fileName); + if ((e instanceof FileNotFoundException) || + (e.getCause() instanceof FileNotFoundException)) { + LOG.warn("Fail to read the cell, the mob file " + path + " doesn't exist", e); + } else { + throw e; + } + } catch (NullPointerException e) { + mobCacheConfig.getMobFileCache().evictFile(fileName); + LOG.warn("Fail to read the cell", e); + } catch (AssertionError e) { + mobCacheConfig.getMobFileCache().evictFile(fileName); + LOG.warn("Fail to read the cell", e); + } finally { + if (file != null) { + mobCacheConfig.getMobFileCache().closeFile(file); + } + } + } + LOG.error("The mob file " + fileName + " could not be found in the locations " + + locations); + return null; + } + + /** + * Gets the mob file path. + * @return The mob file path. + */ + public Path getPath() { + return mobFamilyPath; + } + + /** + * The compaction in the store of mob. + * The cells in this store contains the path of the mob files. There might be race + * condition between the major compaction and the sweeping in mob files. + * In order to avoid this, we need mutually exclude the running of the major compaction and + * sweeping in mob files. + * The minor compaction is not affected. + * The major compaction is marked as retainDeleteMarkers when a sweeping is in progress. + */ + @Override + public List<StoreFile> compact(CompactionContext compaction, + CompactionThroughputController throughputController) throws IOException { + // If it's major compaction, try to find whether there's a sweeper is running + // If yes, mark the major compaction as retainDeleteMarkers + if (compaction.getRequest().isAllFiles()) { + // Use the Zookeeper to coordinate. + // 1. Acquire a operation lock. + // 1.1. If no, mark the major compaction as retainDeleteMarkers and continue the compaction. + // 1.2. If the lock is obtained, search the node of sweeping. + // 1.2.1. If the node is there, the sweeping is in progress, mark the major + // compaction as retainDeleteMarkers and continue the compaction. + // 1.2.2. If the node is not there, add a child to the major compaction node, and + // run the compaction directly. + TableLock lock = null; + if (tableLockManager != null) { + lock = tableLockManager.readLock(tableLockName, "Major compaction in HMobStore"); + } + boolean tableLocked = false; + String tableName = getTableName().getNameAsString(); + if (lock != null) { + try { + LOG.info("Start to acquire a read lock for the table[" + tableName + + "], ready to perform the major compaction"); + lock.acquire(); + tableLocked = true; + } catch (Exception e) { + LOG.error("Fail to lock the table " + tableName, e); + } + } else { + // If the tableLockManager is null, mark the tableLocked as true. + tableLocked = true; + } + try { + if (!tableLocked) { + LOG.warn("Cannot obtain the table lock, maybe a sweep tool is running on this table[" + + tableName + "], forcing the delete markers to be retained"); + compaction.getRequest().forceRetainDeleteMarkers(); + } + return super.compact(compaction, throughputController); + } finally { + if (tableLocked && lock != null) { + try { + lock.release(); + } catch (IOException e) { + LOG.error("Fail to release the table lock " + tableName, e); + } + } + } + } else { + // If it's not a major compaction, continue the compaction. + return super.compact(compaction, throughputController); + } + } + + public void updateMobCompactedIntoMobCellsCount(long count) { + mobCompactedIntoMobCellsCount += count; + } + + public long getMobCompactedIntoMobCellsCount() { + return mobCompactedIntoMobCellsCount; + } + + public void updateMobCompactedFromMobCellsCount(long count) { + mobCompactedFromMobCellsCount += count; + } + + public long getMobCompactedFromMobCellsCount() { + return mobCompactedFromMobCellsCount; + } + + public void updateMobCompactedIntoMobCellsSize(long size) { + mobCompactedIntoMobCellsSize += size; + } + + public long getMobCompactedIntoMobCellsSize() { + return mobCompactedIntoMobCellsSize; + } + + public void updateMobCompactedFromMobCellsSize(long size) { + mobCompactedFromMobCellsSize += size; + } + + public long getMobCompactedFromMobCellsSize() { + return mobCompactedFromMobCellsSize; + } + + public void updateMobFlushCount() { + mobFlushCount++; + } + + public long getMobFlushCount() { + return mobFlushCount; + } + + public void updateMobFlushedCellsCount(long count) { + mobFlushedCellsCount += count; + } + + public long getMobFlushedCellsCount() { + return mobFlushedCellsCount; + } + + public void updateMobFlushedCellsSize(long size) { + mobFlushedCellsSize += size; + } + + public long getMobFlushedCellsSize() { + return mobFlushedCellsSize; + } + + public void updateMobScanCellsCount(long count) { + mobScanCellsCount += count; + } + + public long getMobScanCellsCount() { + return mobScanCellsCount; + } + + public void updateMobScanCellsSize(long size) { + mobScanCellsSize += size; + } + + public long getMobScanCellsSize() { + return mobScanCellsSize; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index ad7ce98,5d7248d..552ffd2 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@@ -55,8 -55,8 +55,9 @@@ import org.apache.hadoop.hbase.HColumnD import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; + import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@@ -344,27 -344,6 +345,27 @@@ public class HStore implements Store } /** + * Creates the cache config. + * @param family The current column family. + */ + protected void createCacheConf(final HColumnDescriptor family) { + this.cacheConf = new CacheConfig(conf, family); + } + + /** + * Creates the store engine configured for the given Store. + * @param store The store. An unfortunate dependency needed due to it + * being passed to coprocessors via the compactor. + * @param conf Store configuration. + * @param kvComparator KVComparator for storeFileManager. + * @return StoreEngine to use. + */ + protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf, - KVComparator kvComparator) throws IOException { ++ CellComparator kvComparator) throws IOException { + return StoreEngine.create(store, conf, comparator); + } + + /** * @param family * @return TTL in seconds of the specified family */ http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 090be8c,bc8dd01..68ce76a --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@@ -98,10 -98,13 +98,11 @@@ public class DefaultCompactor extends C smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); cleanSeqId = true; } + - // When all MVCC readpoints are 0, don't write them. - // See HBASE-8166, HBASE-12600, and HBASE-13389. - writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, - fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0); + writer = createTmpWriter(fd, smallestReadPoint); boolean finished = - performCompaction(scanner, writer, smallestReadPoint, cleanSeqId, throughputController); + performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, throughputController, + request.isAllFiles()); if (!finished) { writer.close(); store.getFileSystem().delete(writer.getPath(), false); @@@ -145,20 -148,6 +146,24 @@@ } /** + * Creates a writer for a new file in a temporary directory. + * @param fd The file details. + * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region. + * @return Writer for a new StoreFile in the tmp dir. + * @throws IOException + */ - protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint) - throws IOException { ++ protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException { ++ // When all MVCC readpoints are 0, don't write them. ++ // See HBASE-8166, HBASE-12600, and HBASE-13389. ++ ++ // make this writer with tags always because of possible new cells with tags. + StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, - true, fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0); ++ true, fd.maxMVCCReadpoint >= 0, fd.maxTagsLength >0); + return writer; + } + ++ + /** * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to * {@link #compact(CompactionRequest, CompactionThroughputController)}; * @param filesToCompact the files to compact. These are used as the compactionSelection for http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/09a00efc/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java index ed3853e,0000000..544d145 mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java @@@ -1,441 -1,0 +1,436 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.filecompactions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; - import org.apache.hadoop.hbase.Cell; - import org.apache.hadoop.hbase.HBaseTestingUtility; - import org.apache.hadoop.hbase.HColumnDescriptor; - import org.apache.hadoop.hbase.HConstants; - import org.apache.hadoop.hbase.KeyValue; ++import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.regionserver.*; +import org.apache.hadoop.hbase.testclassification.LargeTests; - import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType; +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestPartitionedMobFileCompactor { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static String family = "family"; + private final static String qf = "qf"; + private HColumnDescriptor hcd = new HColumnDescriptor(family); + private Configuration conf = TEST_UTIL.getConfiguration(); + private CacheConfig cacheConf = new CacheConfig(conf); + private FileSystem fs; + private List<FileStatus> mobFiles = new ArrayList<>(); + private List<FileStatus> delFiles = new ArrayList<>(); + private List<FileStatus> allFiles = new ArrayList<>(); + private Path basePath; + private String mobSuffix; + private String delSuffix; + private static ExecutorService pool; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + TEST_UTIL.startMiniCluster(1); + pool = createThreadPool(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + pool.shutdown(); + TEST_UTIL.shutdownMiniCluster(); + } + + private void init(String tableName) throws Exception { + fs = FileSystem.get(conf); + Path testDir = FSUtils.getRootDir(conf); + Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); + basePath = new Path(new Path(mobTestDir, tableName), family); + mobSuffix = UUID.randomUUID().toString().replaceAll("-", ""); + delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del"; + } + + @Test + public void testCompactionSelectWithAllFiles() throws Exception { + resetConf(); + String tableName = "testCompactionSelectWithAllFiles"; + init(tableName); + int count = 10; + // create 10 mob files. + createStoreFiles(basePath, family, qf, count, Type.Put); + // create 10 del files + createStoreFiles(basePath, family, qf, count, Type.Delete); + listFiles(); + long mergeSize = MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD; + List<String> expectedStartKeys = new ArrayList<>(); + for(FileStatus file : mobFiles) { + if(file.getLen() < mergeSize) { + String fileName = file.getPath().getName(); + String startKey = fileName.substring(0, 32); + expectedStartKeys.add(startKey); + } + } + testSelectFiles(tableName, CompactionType.ALL_FILES, false, expectedStartKeys); + } + + @Test + public void testCompactionSelectWithPartFiles() throws Exception { + resetConf(); + String tableName = "testCompactionSelectWithPartFiles"; + init(tableName); + int count = 10; + // create 10 mob files. + createStoreFiles(basePath, family, qf, count, Type.Put); + // create 10 del files + createStoreFiles(basePath, family, qf, count, Type.Delete); + listFiles(); + long mergeSize = 4000; + List<String> expectedStartKeys = new ArrayList<>(); + for(FileStatus file : mobFiles) { + if(file.getLen() < 4000) { + String fileName = file.getPath().getName(); + String startKey = fileName.substring(0, 32); + expectedStartKeys.add(startKey); + } + } + // set the mob file compaction mergeable threshold + conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); + testSelectFiles(tableName, CompactionType.PART_FILES, false, expectedStartKeys); + } + + @Test + public void testCompactionSelectWithForceAllFiles() throws Exception { + resetConf(); + String tableName = "testCompactionSelectWithForceAllFiles"; + init(tableName); + int count = 10; + // create 10 mob files. + createStoreFiles(basePath, family, qf, count, Type.Put); + // create 10 del files + createStoreFiles(basePath, family, qf, count, Type.Delete); + listFiles(); + long mergeSize = 4000; + List<String> expectedStartKeys = new ArrayList<>(); + for(FileStatus file : mobFiles) { + String fileName = file.getPath().getName(); + String startKey = fileName.substring(0, 32); + expectedStartKeys.add(startKey); + } + // set the mob file compaction mergeable threshold + conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); + testSelectFiles(tableName, CompactionType.ALL_FILES, true, expectedStartKeys); + } + + @Test + public void testCompactDelFilesWithDefaultBatchSize() throws Exception { + resetConf(); + String tableName = "testCompactDelFilesWithDefaultBatchSize"; + init(tableName); + // create 20 mob files. + createStoreFiles(basePath, family, qf, 20, Type.Put); + // create 13 del files + createStoreFiles(basePath, family, qf, 13, Type.Delete); + listFiles(); + testCompactDelFiles(tableName, 1, 13, false); + } + + @Test + public void testCompactDelFilesWithSmallBatchSize() throws Exception { + resetConf(); + String tableName = "testCompactDelFilesWithSmallBatchSize"; + init(tableName); + // create 20 mob files. + createStoreFiles(basePath, family, qf, 20, Type.Put); + // create 13 del files + createStoreFiles(basePath, family, qf, 13, Type.Delete); + listFiles(); + + // set the mob file compaction batch size + conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 4); + testCompactDelFiles(tableName, 1, 13, false); + } + + @Test + public void testCompactDelFilesChangeMaxDelFileCount() throws Exception { + resetConf(); + String tableName = "testCompactDelFilesWithSmallBatchSize"; + init(tableName); + // create 20 mob files. + createStoreFiles(basePath, family, qf, 20, Type.Put); + // create 13 del files + createStoreFiles(basePath, family, qf, 13, Type.Delete); + listFiles(); + + // set the max del file count + conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, 5); + // set the mob file compaction batch size + conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 2); + testCompactDelFiles(tableName, 4, 13, false); + } + + /** + * Tests the selectFiles + * @param tableName the table name + * @param type the expected compaction type + * @param expected the expected start keys + */ + private void testSelectFiles(String tableName, final CompactionType type, + final boolean isForceAllFiles, final List<String> expected) throws IOException { + PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, + TableName.valueOf(tableName), hcd, pool) { + @Override + public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles) + throws IOException { + if (files == null || files.isEmpty()) { + return null; + } + PartitionedMobFileCompactionRequest request = select(files, isForceAllFiles); + // assert the compaction type + Assert.assertEquals(type, request.type); + // assert get the right partitions + compareCompactedPartitions(expected, request.compactionPartitions); + // assert get the right del files + compareDelFiles(request.delFiles); + return null; + } + }; + compactor.compact(allFiles, isForceAllFiles); + } + + /** + * Tests the compacteDelFile + * @param tableName the table name + * @param expectedFileCount the expected file count + * @param expectedCellCount the expected cell count + */ + private void testCompactDelFiles(String tableName, final int expectedFileCount, + final int expectedCellCount, boolean isForceAllFiles) throws IOException { + PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, + TableName.valueOf(tableName), hcd, pool) { + @Override + protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request) + throws IOException { + List<Path> delFilePaths = new ArrayList<Path>(); + for (FileStatus delFile : request.delFiles) { + delFilePaths.add(delFile.getPath()); + } + List<Path> newDelPaths = compactDelFiles(request, delFilePaths); + // assert the del files are merged. + Assert.assertEquals(expectedFileCount, newDelPaths.size()); + Assert.assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths)); + return null; + } + }; + compactor.compact(allFiles, isForceAllFiles); + } + + /** + * Lists the files in the path + */ + private void listFiles() throws IOException { + for (FileStatus file : fs.listStatus(basePath)) { + allFiles.add(file); + if (file.getPath().getName().endsWith("_del")) { + delFiles.add(file); + } else { + mobFiles.add(file); + } + } + } + + /** + * Compares the compacted partitions. + * @param partitions the collection of CompactedPartitions + */ + private void compareCompactedPartitions(List<String> expected, + Collection<CompactionPartition> partitions) { + List<String> actualKeys = new ArrayList<>(); + for (CompactionPartition partition : partitions) { + actualKeys.add(partition.getPartitionId().getStartKey()); + } + Collections.sort(expected); + Collections.sort(actualKeys); + Assert.assertEquals(expected.size(), actualKeys.size()); + for (int i = 0; i < expected.size(); i++) { + Assert.assertEquals(expected.get(i), actualKeys.get(i)); + } + } + + /** + * Compares the del files. + * @param allDelFiles all the del files + */ + private void compareDelFiles(Collection<FileStatus> allDelFiles) { + int i = 0; + for (FileStatus file : allDelFiles) { + Assert.assertEquals(delFiles.get(i), file); + i++; + } + } + + /** + * Creates store files. + * @param basePath the path to create file + * @family the family name + * @qualifier the column qualifier + * @count the store file number + * @type the key type + */ + private void createStoreFiles(Path basePath, String family, String qualifier, int count, + Type type) throws IOException { + HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); + String startKey = "row_"; + MobFileName mobFileName = null; + for (int i = 0; i < count; i++) { + byte[] startRow = Bytes.toBytes(startKey + i) ; + if(type.equals(Type.Delete)) { + mobFileName = MobFileName.create(startRow, MobUtils.formatDate( + new Date()), delSuffix); + } + if(type.equals(Type.Put)){ + mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate( + new Date()), mobSuffix); + } + StoreFile.Writer mobFileWriter = new StoreFile.WriterBuilder(conf, cacheConf, fs) + .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build(); + writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier), + type, (i+1)*1000); + } + } + + /** + * Writes data to store file. + * @param writer the store file writer + * @param row the row key + * @param family the family name + * @param qualifier the column qualifier + * @param type the key type + * @param size the size of value + */ + private static void writeStoreFile(final StoreFile.Writer writer, byte[]row, byte[] family, + byte[] qualifier, Type type, int size) throws IOException { + long now = System.currentTimeMillis(); + try { + byte[] dummyData = new byte[size]; + new Random().nextBytes(dummyData); + writer.append(new KeyValue(row, family, qualifier, now, type, dummyData)); + } finally { + writer.close(); + } + } + + /** + * Gets the number of del cell in the del files + * @param paths the del file paths + * @return the cell size + */ + private int countDelCellsInDelFiles(List<Path> paths) throws IOException { + List<StoreFile> sfs = new ArrayList<StoreFile>(); + int size = 0; + for(Path path : paths) { + StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE); + sfs.add(sf); + } + List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, + false, null, HConstants.LATEST_TIMESTAMP); + Scan scan = new Scan(); + scan.setMaxVersions(hcd.getMaxVersions()); + long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); + long ttl = HStore.determineTTLFromFamily(hcd); - ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR); ++ ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, CellComparator.COMPARATOR); + StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_RETAIN_DELETES, null, + scanners, 0L, HConstants.LATEST_TIMESTAMP); + List<Cell> results = new ArrayList<>(); + boolean hasMore = true; + + while (hasMore) { + hasMore = scanner.next(results); + size += results.size(); + results.clear(); + } + scanner.close(); + return size; + } + + private static ExecutorService createThreadPool() { + int maxThreads = 10; + long keepAliveTime = 60; + final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>(); + ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, + TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // waiting for a thread to pick up instead of throwing exceptions. + queue.put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException(e); + } + } + }); + ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); + return pool; + } + + /** + * Resets the configuration. + */ + private void resetConf() { + conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD); + conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); + conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE); + } +}
