http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 3c8fa87,0000000..aba81eb mode 100644,000000..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@@ -1,546 -1,0 +1,548 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.NavigableSet; +import java.util.UUID; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.mob.MobCacheConfig; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFile; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobStoreEngine; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; ++import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; + +/** + * The store implementation to save MOBs (medium objects), it extends the HStore. + * When a descriptor of a column family has the value "IS_MOB", it means this column family + * is a mob one. When a HRegion instantiate a store for this column family, the HMobStore is + * created. + * HMobStore is almost the same with the HStore except using different types of scanners. + * In the method of getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. + * In these scanners, a additional seeks in the mob files should be performed after the seek + * to HBase is done. + * The store implements how we save MOBs by extending HStore. When a descriptor + * of a column family has the value "IS_MOB", it means this column family is a mob one. When a + * HRegion instantiate a store for this column family, the HMobStore is created. HMobStore is + * almost the same with the HStore except using different types of scanners. In the method of + * getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. In these scanners, a + * additional seeks in the mob files should be performed after the seek in HBase is done. + */ [email protected] +public class HMobStore extends HStore { + + private MobCacheConfig mobCacheConfig; + private Path homePath; + private Path mobFamilyPath; + private volatile long mobCompactedIntoMobCellsCount = 0; + private volatile long mobCompactedFromMobCellsCount = 0; + private volatile long mobCompactedIntoMobCellsSize = 0; + private volatile long mobCompactedFromMobCellsSize = 0; + private volatile long mobFlushCount = 0; + private volatile long mobFlushedCellsCount = 0; + private volatile long mobFlushedCellsSize = 0; + private volatile long mobScanCellsCount = 0; + private volatile long mobScanCellsSize = 0; + private List<Path> mobDirLocations; + private HColumnDescriptor family; + private TableLockManager tableLockManager; + private TableName tableLockName; + + public HMobStore(final HRegion region, final HColumnDescriptor family, + final Configuration confParam) throws IOException { + super(region, family, confParam); + this.family = family; + this.mobCacheConfig = (MobCacheConfig) cacheConf; + this.homePath = MobUtils.getMobHome(conf); + this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(), + family.getNameAsString()); + mobDirLocations = new ArrayList<Path>(); + mobDirLocations.add(mobFamilyPath); + TableName tn = region.getTableDesc().getTableName(); + mobDirLocations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils + .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString())); + if (region.getRegionServerServices() != null) { + tableLockManager = region.getRegionServerServices().getTableLockManager(); + tableLockName = MobUtils.getTableLockName(getTableName()); + } + } + + /** + * Creates the mob cache config. + */ + @Override + protected void createCacheConf(HColumnDescriptor family) { + cacheConf = new MobCacheConfig(conf, family); + } + + /** + * Gets current config. + */ + public Configuration getConfiguration() { + return this.conf; + } + + /** + * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in + * the mob files should be performed after the seek in HBase is done. + */ + @Override + protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols, + long readPt, KeyValueScanner scanner) throws IOException { + if (scanner == null) { + if (MobUtils.isRefOnlyScan(scan)) { + Filter refOnlyFilter = new MobReferenceOnlyFilter(); + Filter filter = scan.getFilter(); + if (filter != null) { + scan.setFilter(new FilterList(filter, refOnlyFilter)); + } else { + scan.setFilter(refOnlyFilter); + } + } + scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan, + targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt); + } + return scanner; + } + + /** + * Creates the mob store engine. + */ + @Override + protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf, + KVComparator kvComparator) throws IOException { + MobStoreEngine engine = new MobStoreEngine(); + engine.createComponents(conf, store, kvComparator); + return engine; + } + + /** + * Gets the temp directory. + * @return The temp directory. + */ + private Path getTempDir() { + return new Path(homePath, MobConstants.TEMP_DIR_NAME); + } + + /** + * Creates the writer for the mob file in temp directory. + * @param date The latest date of written cells. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @return The writer for the mob file. + * @throws IOException + */ + public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey) throws IOException { + if (startKey == null) { + startKey = HConstants.EMPTY_START_ROW; + } + Path path = getTempDir(); + return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey); + } + + /** + * Creates the writer for the del file in temp directory. + * The del file keeps tracking the delete markers. Its name has a suffix _del, + * the format is [0-9a-f]+(_del)?. + * @param date The latest date of written cells. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @return The writer for the del file. + * @throws IOException + */ + public StoreFile.Writer createDelFileWriterInTmp(Date date, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey) throws IOException { + if (startKey == null) { + startKey = HConstants.EMPTY_START_ROW; + } + Path path = getTempDir(); + String suffix = UUID + .randomUUID().toString().replaceAll("-", "") + "_del"; + MobFileName mobFileName = MobFileName.create(startKey, MobUtils.formatDate(date), suffix); + return createWriterInTmp(mobFileName, path, maxKeyCount, compression); + } + + /** + * Creates the writer for the mob file in temp directory. + * @param date The date string, its format is yyyymmmdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @return The writer for the mob file. + * @throws IOException + */ + public StoreFile.Writer createWriterInTmp(String date, Path basePath, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey) throws IOException { + MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID() + .toString().replaceAll("-", "")); + return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression); + } + + /** + * Creates the writer for the mob file in temp directory. + * @param mobFileName The mob file name. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @return The writer for the mob file. + * @throws IOException + */ + public StoreFile.Writer createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount, + Compression.Algorithm compression) throws IOException { + final CacheConfig writerCacheConf = mobCacheConfig; + HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) + .withIncludesMvcc(false).withIncludesTags(true) + .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE) + .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) + .withBlockSize(getFamily().getBlocksize()) + .withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding()).build(); + + StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, region.getFilesystem()) + .withFilePath(new Path(basePath, mobFileName.getFileName())) + .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE) + .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); + return w; + } + + /** + * Commits the mob file. + * @param sourceFile The source file. + * @param targetPath The directory path where the source file is renamed to. + * @throws IOException + */ + public void commitFile(final Path sourceFile, Path targetPath) throws IOException { + if (sourceFile == null) { + return; + } + Path dstPath = new Path(targetPath, sourceFile.getName()); + validateMobFile(sourceFile); + String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath; + LOG.info(msg); + Path parent = dstPath.getParent(); + if (!region.getFilesystem().exists(parent)) { + region.getFilesystem().mkdirs(parent); + } + if (!region.getFilesystem().rename(sourceFile, dstPath)) { + throw new IOException("Failed rename of " + sourceFile + " to " + dstPath); + } + } + + /** + * Validates a mob file by opening and closing it. + * + * @param path the path to the mob file + */ + private void validateMobFile(Path path) throws IOException { + StoreFile storeFile = null; + try { + storeFile = + new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE); + storeFile.createReader(); + } catch (IOException e) { + LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e); + throw e; + } finally { + if (storeFile != null) { + storeFile.closeReader(false); + } + } + } + + /** + * Reads the cell from the mob file. + * @param reference The cell found in the HBase, its value is a path to a mob file. + * @param cacheBlocks Whether the scanner should cache blocks. + * @return The cell found in the mob file. + * @throws IOException + */ + public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException { + Cell result = null; + if (MobUtils.hasValidMobRefCellValue(reference)) { + String fileName = MobUtils.getMobFileName(reference); + result = readCell(mobDirLocations, fileName, reference, cacheBlocks); + if (result == null) { + result = readClonedCell(fileName, reference, cacheBlocks); + } + } + if (result == null) { + LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family," + + "qualifier,timestamp,type and tags but with an empty value to return."); + result = new KeyValue(reference.getRowArray(), reference.getRowOffset(), + reference.getRowLength(), reference.getFamilyArray(), reference.getFamilyOffset(), + reference.getFamilyLength(), reference.getQualifierArray(), + reference.getQualifierOffset(), reference.getQualifierLength(), reference.getTimestamp(), + Type.codeToType(reference.getTypeByte()), HConstants.EMPTY_BYTE_ARRAY, + 0, 0, reference.getTagsArray(), reference.getTagsOffset(), + reference.getTagsLength()); + } + return result; + } + + /** + * Reads the cell from a mob file. + * The mob file might be located in different directories. + * 1. The working directory. + * 2. The archive directory. + * Reads the cell from the files located in both of the above directories. + * @param locations The possible locations where the mob files are saved. + * @param fileName The file to be read. + * @param search The cell to be searched. + * @param cacheMobBlocks Whether the scanner should cache blocks. + * @return The found cell. Null if there's no such a cell. + * @throws IOException + */ + private Cell readCell(List<Path> locations, String fileName, Cell search, boolean cacheMobBlocks) + throws IOException { + FileSystem fs = getFileSystem(); + for (Path location : locations) { + MobFile file = null; + Path path = new Path(location, fileName); + try { + file = mobCacheConfig.getMobFileCache().openFile(fs, path, mobCacheConfig); + return file.readCell(search, cacheMobBlocks); + } catch (IOException e) { + mobCacheConfig.getMobFileCache().evictFile(fileName); + if (e instanceof FileNotFoundException) { + LOG.warn("Fail to read the cell, the mob file " + path + " doesn't exist", e); + } else { + throw e; + } + } finally { + if (file != null) { + mobCacheConfig.getMobFileCache().closeFile(file); + } + } + } + LOG.error("The mob file " + fileName + " could not be found in the locations " + + mobDirLocations); + return null; + } + + /** + * Reads the cell from a mob file of source table. + * The table might be cloned, in this case only hfile link is created in the new table, + * and the mob file is located in the source table directories. + * 1. The working directory of the source table. + * 2. The archive directory of the source table. + * Reads the cell from the files located in both of the above directories. + * @param fileName The file to be read. + * @param search The cell to be searched. + * @param cacheMobBlocks Whether the scanner should cache blocks. + * @return The found cell. Null if there's no such a cell. + * @throws IOException + */ + private Cell readClonedCell(String fileName, Cell search, boolean cacheMobBlocks) + throws IOException { + Tag tableNameTag = MobUtils.getTableNameTag(search); + if (tableNameTag == null) { + return null; + } + byte[] tableName = tableNameTag.getValue(); + if (Bytes.equals(this.getTableName().getName(), tableName)) { + return null; + } + // the possible locations in the source table. + List<Path> locations = new ArrayList<Path>(); + TableName tn = TableName.valueOf(tableName); + locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString())); + locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils.getMobRegionInfo(tn) + .getEncodedName(), family.getNameAsString())); + // read the cell from the source table. + return readCell(locations, fileName, search, cacheMobBlocks); + } + + /** + * Gets the mob file path. + * @return The mob file path. + */ + public Path getPath() { + return mobFamilyPath; + } + + /** + * The compaction in the store of mob. + * The cells in this store contains the path of the mob files. There might be race + * condition between the major compaction and the sweeping in mob files. + * In order to avoid this, we need mutually exclude the running of the major compaction and + * sweeping in mob files. + * The minor compaction is not affected. + * The major compaction is marked as retainDeleteMarkers when a sweeping is in progress. + */ + @Override - public List<StoreFile> compact(CompactionContext compaction) throws IOException { ++ public List<StoreFile> compact(CompactionContext compaction, ++ CompactionThroughputController throughputController) throws IOException { + // If it's major compaction, try to find whether there's a sweeper is running + // If yes, mark the major compaction as retainDeleteMarkers + if (compaction.getRequest().isAllFiles()) { + // Use the Zookeeper to coordinate. + // 1. Acquire a operation lock. + // 1.1. If no, mark the major compaction as retainDeleteMarkers and continue the compaction. + // 1.2. If the lock is obtained, search the node of sweeping. + // 1.2.1. If the node is there, the sweeping is in progress, mark the major + // compaction as retainDeleteMarkers and continue the compaction. + // 1.2.2. If the node is not there, add a child to the major compaction node, and + // run the compaction directly. + TableLock lock = null; + if (tableLockManager != null) { + lock = tableLockManager.readLock(tableLockName, "Major compaction in HMobStore"); + } + boolean tableLocked = false; + String tableName = getTableName().getNameAsString(); + if (lock != null) { + try { + LOG.info("Start to acquire a read lock for the table[" + tableName + + "], ready to perform the major compaction"); + lock.acquire(); + tableLocked = true; + } catch (Exception e) { + LOG.error("Fail to lock the table " + tableName, e); + } + } else { + // If the tableLockManager is null, mark the tableLocked as true. + tableLocked = true; + } + try { + if (!tableLocked) { + LOG.warn("Cannot obtain the table lock, maybe a sweep tool is running on this table[" + + tableName + "], forcing the delete markers to be retained"); + compaction.getRequest().forceRetainDeleteMarkers(); + } - return super.compact(compaction); ++ return super.compact(compaction, throughputController); + } finally { + if (tableLocked && lock != null) { + try { + lock.release(); + } catch (IOException e) { + LOG.error("Fail to release the table lock " + tableName, e); + } + } + } + } else { + // If it's not a major compaction, continue the compaction. - return super.compact(compaction); ++ return super.compact(compaction, throughputController); + } + } + + public void updateMobCompactedIntoMobCellsCount(long count) { + mobCompactedIntoMobCellsCount += count; + } + + public long getMobCompactedIntoMobCellsCount() { + return mobCompactedIntoMobCellsCount; + } + + public void updateMobCompactedFromMobCellsCount(long count) { + mobCompactedFromMobCellsCount += count; + } + + public long getMobCompactedFromMobCellsCount() { + return mobCompactedFromMobCellsCount; + } + + public void updateMobCompactedIntoMobCellsSize(long size) { + mobCompactedIntoMobCellsSize += size; + } + + public long getMobCompactedIntoMobCellsSize() { + return mobCompactedIntoMobCellsSize; + } + + public void updateMobCompactedFromMobCellsSize(long size) { + mobCompactedFromMobCellsSize += size; + } + + public long getMobCompactedFromMobCellsSize() { + return mobCompactedFromMobCellsSize; + } + + public void updateMobFlushCount() { + mobFlushCount++; + } + + public long getMobFlushCount() { + return mobFlushCount; + } + + public void updateMobFlushedCellsCount(long count) { + mobFlushedCellsCount += count; + } + + public long getMobFlushedCellsCount() { + return mobFlushedCellsCount; + } + + public void updateMobFlushedCellsSize(long size) { + mobFlushedCellsSize += size; + } + + public long getMobFlushedCellsSize() { + return mobFlushedCellsSize; + } + + public void updateMobScanCellsCount(long count) { + mobScanCellsCount += count; + } + + public long getMobScanCellsCount() { + return mobScanCellsCount; + } + + public void updateMobScanCellsSize(long size) { + mobScanCellsSize += size; + } + + public long getMobScanCellsSize() { + return mobScanCellsSize; + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index df5c900,53e732a..ab0165d --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@@ -33,9 -34,9 +34,10 @@@ import java.util.HashSet import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.NavigableMap; import java.util.NavigableSet; + import java.util.RandomAccess; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index e11aac2,c170a65..787828b --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@@ -347,9 -372,7 +373,9 @@@ public class HRegionServer extends HasT private final RegionServerAccounting regionServerAccounting; // Cache configuration and block cache reference - final CacheConfig cacheConfig; + protected CacheConfig cacheConfig; + // Cache configuration for mob + final MobCacheConfig mobCacheConfig; /** The health check chore. */ private HealthCheckChore healthCheckChore; @@@ -831,10 -933,9 +938,10 @@@ } } // Send cache a shutdown. - if (cacheConfig.isBlockCacheEnabled()) { + if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) { cacheConfig.getBlockCache().shutdown(); } + mobCacheConfig.getMobFileCache().shutdown(); if (movedRegionsCleaner != null) { movedRegionsCleaner.stop("Region Server stopping"); http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index b9f4038,252e5e1..f5bb67a --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@@ -55,10 -55,12 +55,13 @@@ import org.apache.hadoop.hbase.HColumnD import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; - import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.KeyValue.KVComparator; + import org.apache.hadoop.hbase.Tag; + import org.apache.hadoop.hbase.TagType; + import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; + import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Cipher; import org.apache.hadoop.hbase.io.crypto.Encryption; @@@ -133,11 -137,11 +138,11 @@@ public class HStore implements Store protected final MemStore memstore; // This stores directory in the filesystem. - private final HRegion region; + protected final HRegion region; private final HColumnDescriptor family; private final HRegionFileSystem fs; - protected final Configuration conf; - private Configuration conf; - private final CacheConfig cacheConf; ++ protected Configuration conf; + protected CacheConfig cacheConf; private long lastCompactSize = 0; volatile boolean forceMajor = false; /* how many bytes to write between status checks */ http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index 4384d87,5e5590d..159ec55 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@@ -32,10 -33,11 +33,13 @@@ import org.apache.hadoop.hbase.ServerNa import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheStats; +import org.apache.hadoop.hbase.mob.MobCacheConfig; +import org.apache.hadoop.hbase.mob.MobFileCache; + import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; import org.apache.hadoop.metrics2.MetricsExecutor; /** @@@ -50,11 -52,10 +54,11 @@@ class MetricsRegionServerWrapperImp private final HRegionServer regionServer; private BlockCache blockCache; + private MobFileCache mobFileCache; private volatile long numStores = 0; - private volatile long numHLogFiles = 0; - private volatile long hlogFileSize = 0; + private volatile long numWALFiles = 0; + private volatile long walFileSize = 0; private volatile long numStoreFiles = 0; private volatile long memstoreSize = 0; private volatile long storeFileSize = 0; @@@ -75,20 -76,7 +79,21 @@@ private volatile long flushedCellsSize = 0; private volatile long compactedCellsSize = 0; private volatile long majorCompactedCellsSize = 0; + private volatile long mobCompactedIntoMobCellsCount = 0; + private volatile long mobCompactedFromMobCellsCount = 0; + private volatile long mobCompactedIntoMobCellsSize = 0; + private volatile long mobCompactedFromMobCellsSize = 0; + private volatile long mobFlushCount = 0; + private volatile long mobFlushedCellsCount = 0; + private volatile long mobFlushedCellsSize = 0; + private volatile long mobScanCellsCount = 0; + private volatile long mobScanCellsSize = 0; + private volatile long mobFileCacheAccessCount = 0; + private volatile long mobFileCacheMissCount = 0; + private volatile double mobFileCacheHitRatio = 0; + private volatile long mobFileCacheEvictedCount = 0; + private volatile long mobFileCacheCount = 0; + private volatile long blockedRequestsCount = 0L; private CacheStats cacheStats; private ScheduledExecutorService executor; @@@ -526,15 -450,7 +549,16 @@@ long tempFlushedCellsSize = 0; long tempCompactedCellsSize = 0; long tempMajorCompactedCellsSize = 0; + long tempMobCompactedIntoMobCellsCount = 0; + long tempMobCompactedFromMobCellsCount = 0; + long tempMobCompactedIntoMobCellsSize = 0; + long tempMobCompactedFromMobCellsSize = 0; + long tempMobFlushCount = 0; + long tempMobFlushedCellsCount = 0; + long tempMobFlushedCellsSize = 0; + long tempMobScanCellsCount = 0; + long tempMobScanCellsSize = 0; + long tempBlockedRequestsCount = 0L; for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { tempNumMutationsWithoutWAL += r.numMutationsWithoutWAL.get(); @@@ -631,20 -526,22 +646,36 @@@ flushedCellsSize = tempFlushedCellsSize; compactedCellsSize = tempCompactedCellsSize; majorCompactedCellsSize = tempMajorCompactedCellsSize; + mobCompactedIntoMobCellsCount = tempMobCompactedIntoMobCellsCount; + mobCompactedFromMobCellsCount = tempMobCompactedFromMobCellsCount; + mobCompactedIntoMobCellsSize = tempMobCompactedIntoMobCellsSize; + mobCompactedFromMobCellsSize = tempMobCompactedFromMobCellsSize; + mobFlushCount = tempMobFlushCount; + mobFlushedCellsCount = tempMobFlushedCellsCount; + mobFlushedCellsSize = tempMobFlushedCellsSize; + mobScanCellsCount = tempMobScanCellsCount; + mobScanCellsSize = tempMobScanCellsSize; + mobFileCacheAccessCount = mobFileCache.getAccessCount(); + mobFileCacheMissCount = mobFileCache.getMissCount(); + mobFileCacheHitRatio = mobFileCache.getHitRatio(); + mobFileCacheEvictedCount = mobFileCache.getEvictedFileCount(); + mobFileCacheCount = mobFileCache.getCacheSize(); + blockedRequestsCount = tempBlockedRequestsCount; } } + + @Override + public long getHedgedReadOps() { + return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadOps(); + } + + @Override + public long getHedgedReadWins() { + return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadWins(); + } + + @Override + public long getBlockedRequestsCount() { + return blockedRequestsCount; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 13967c2,a92c17e..7870040 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@@ -43,10 -43,9 +43,12 @@@ import org.apache.hadoop.hbase.regionse import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.util.StringUtils; + import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; /** * A compactor is a compaction algorithm associated a given policy. Base class also contains @@@ -226,64 -220,87 +233,90 @@@ public abstract class Compactor return store.getCoprocessorHost().preCompact(store, scanner, scanType, request); } ++ // TODO mob introduced the fd parameter; can we make this cleaner and easier to extend in future? /** * Performs the compaction. - * @param fd File details ++ * @param fd FileDetails of cell sink writer * @param scanner Where to read from. * @param writer Where to write to. * @param smallestReadPoint Smallest read point. * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint + * @param major Is a major compaction. * @return Whether compaction ended; false if it was interrupted for some reason. */ - protected boolean performCompaction(InternalScanner scanner, CellSink writer, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, - long smallestReadPoint, boolean cleanSeqId, boolean major) throws IOException { - int bytesWritten = 0; + long smallestReadPoint, boolean cleanSeqId, - CompactionThroughputController throughputController) throws IOException { ++ CompactionThroughputController throughputController, boolean major) throws IOException { + long bytesWritten = 0; + long bytesWrittenProgress = 0; // Since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. - List<Cell> kvs = new ArrayList<Cell>(); - int closeCheckInterval = HStore.getCloseCheckInterval(); - long lastMillis; + List<Cell> cells = new ArrayList<Cell>(); + long closeCheckInterval = HStore.getCloseCheckInterval(); + long lastMillis = 0; if (LOG.isDebugEnabled()) { - lastMillis = System.currentTimeMillis(); - } else { - lastMillis = 0; + lastMillis = EnvironmentEdgeManager.currentTime(); } + String compactionName = + store.getRegionInfo().getRegionNameAsString() + "#" + store.getFamily().getNameAsString(); + long now = 0; boolean hasMore; - do { - hasMore = scanner.next(kvs, compactionKVMax); - // output to writer: - for (Cell c : kvs) { - KeyValue kv = KeyValueUtil.ensureKeyValue(c); - resetSeqId(smallestReadPoint, cleanSeqId, kv); - writer.append(kv); - ++progress.currentCompactedKVs; - progress.totalCompactedSize += kv.getLength(); - - // check periodically to see if a system stop is requested - if (closeCheckInterval > 0) { - bytesWritten += kv.getLength(); - if (bytesWritten > closeCheckInterval) { - // Log the progress of long running compactions every minute if - // logging at DEBUG level - if (LOG.isDebugEnabled()) { - long now = System.currentTimeMillis(); - if ((now - lastMillis) >= 60 * 1000) { - LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec", - (bytesWritten / 1024.0) / ((now - lastMillis) / 1000.0))); - lastMillis = now; + throughputController.start(compactionName); + try { + do { + hasMore = scanner.next(cells, compactionKVMax); + if (LOG.isDebugEnabled()) { + now = EnvironmentEdgeManager.currentTime(); + } + // output to writer: + for (Cell c : cells) { + if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { + CellUtil.setSequenceId(c, 0); + } + writer.append(c); + int len = KeyValueUtil.length(c); + ++progress.currentCompactedKVs; + progress.totalCompactedSize += len; + if (LOG.isDebugEnabled()) { + bytesWrittenProgress += len; + } + throughputController.control(compactionName, len); + // check periodically to see if a system stop is requested + if (closeCheckInterval > 0) { + bytesWritten += len; + if (bytesWritten > closeCheckInterval) { + bytesWritten = 0; + if (!store.areWritesEnabled()) { + progress.cancel(); + return false; } } - bytesWritten = 0; - if (!store.areWritesEnabled()) { - progress.cancel(); - return false; - } } } - } - kvs.clear(); - } while (hasMore); + // Log the progress of long running compactions every minute if + // logging at DEBUG level + if (LOG.isDebugEnabled()) { + if ((now - lastMillis) >= 60 * 1000) { + LOG.debug("Compaction progress: " + + compactionName + + " " + + progress + + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0) + / ((now - lastMillis) / 1000.0)) + ", throughputController is " + + throughputController); + lastMillis = now; + bytesWrittenProgress = 0; + } + } + cells.clear(); + } while (hasMore); + } catch (InterruptedException e) { + progress.cancel(); + throw new InterruptedIOException("Interrupted while control throughput of compacting " + + compactionName); + } finally { + throughputController.finish(compactionName); + } progress.complete(); return true; } @@@ -321,29 -338,4 +354,17 @@@ return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint, earliestPutTs, dropDeletesFromRow, dropDeletesToRow); } + + /** - * Resets the sequence id. - * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region. - * @param cleanSeqId Should clean the sequence id. - * @param kv The current KeyValue. - */ - protected void resetSeqId(long smallestReadPoint, boolean cleanSeqId, KeyValue kv) { - if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) { - kv.setSequenceId(0); - } - } - - /** + * Appends the metadata and closes the writer. + * @param writer The current store writer. + * @param fd The file details. + * @param isMajor Is a major compaction. + * @throws IOException + */ + protected void appendMetadataAndCloseWriter(StoreFile.Writer writer, FileDetails fd, + boolean isMajor) throws IOException { + writer.appendMetadata(fd.maxSeqId, isMajor); + writer.close(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index be859c0,5d712c1..090be8c --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@@ -76,9 -98,11 +98,10 @@@ public class DefaultCompactor extends C smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); cleanSeqId = true; } - - writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, - true, fd.maxTagsLength > 0); + writer = createTmpWriter(fd, smallestReadPoint); - boolean finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, - request.isAllFiles()); + boolean finished = - performCompaction(scanner, writer, smallestReadPoint, cleanSeqId, throughputController); ++ performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, throughputController, ++ request.isAllFiles()); if (!finished) { writer.close(); store.getFileSystem().delete(writer.getPath(), false); @@@ -102,22 -146,8 +145,22 @@@ } /** + * Creates a writer for a new file in a temporary directory. + * @param fd The file details. + * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region. + * @return Writer for a new StoreFile in the tmp dir. + * @throws IOException + */ + protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint) + throws IOException { + StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, + true, fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0); + return writer; + } + + /** * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to - * {@link #compact(CompactionRequest)}; + * {@link #compact(CompactionRequest, CompactionThroughputController)}; * @param filesToCompact the files to compact. These are used as the compactionSelection for * the generated {@link CompactionRequest}. * @param isMajor true to major compact (prune all deletes, max versions, etc) http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 3109015,b957e16..fab4c2f --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@@ -127,8 -130,8 +130,9 @@@ public class StripeCompactor extends Co // It is ok here if storeScanner is null. StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null; mw.init(storeScanner, factory, store.getComparator()); - finished = performCompaction(fd, scanner, mw, smallestReadPoint, cleanSeqId, - request.isMajor()); + finished = - performCompaction(scanner, mw, smallestReadPoint, cleanSeqId, throughputController); ++ performCompaction(fd, scanner, mw, smallestReadPoint, cleanSeqId, throughputController, ++ request.isMajor()); if (!finished) { throw new InterruptedIOException( "Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java index 0d7efe7,2655e2b..841bc04 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java @@@ -52,21 -51,16 +51,20 @@@ import org.apache.hadoop.hbase.HConstan import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.io.FileLink; import org.apache.hadoop.hbase.io.HFileLink; - import org.apache.hadoop.hbase.io.HLogLink; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.mapreduce.JobUtil; + import org.apache.hadoop.hbase.io.WALLink; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo; import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; - import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.BytesWritable; + import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; - import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; @@@ -423,10 -433,10 +437,10 @@@ public class ExportSnapshot extends Con switch (fileInfo.getType()) { case HFILE: Path inputPath = new Path(fileInfo.getHfile()); - link = HFileLink.buildFromHFileLinkPattern(conf, inputPath); + link = getFileLink(inputPath, conf); break; case WAL: - link = new HLogLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName()); + link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName()); break; default: throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); @@@ -442,16 -452,6 +456,16 @@@ } } + private FileLink getFileLink(Path path, Configuration conf) throws IOException{ + String regionName = HFileLink.getReferencedRegionName(path.getName()); + TableName tableName = HFileLink.getReferencedTableName(path.getName()); + if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) { - return new HFileLink(MobUtils.getQualifiedMobRootDir(conf), ++ return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf), + HFileArchiveUtil.getArchivePath(conf), path); + } - return new HFileLink(inputRoot, inputArchive, path); ++ return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path); + } + private FileChecksum getFileChecksum(final FileSystem fs, final Path path) { try { return fs.getFileChecksum(path); http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java index 6f7d847,330ead4..9d3407a --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java @@@ -34,17 -34,12 +34,18 @@@ import org.apache.hadoop.hbase.classifi import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; + import org.apache.hadoop.hbase.TableDescriptor; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotDataManifest; import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java index 2a599d3,d1f787a..50b5c9a --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java @@@ -39,14 -39,11 +39,14 @@@ import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; - import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; + import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSVisitor; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; /** * Utility methods for interacting with the snapshot referenced files. @@@ -299,15 -296,7 +299,15 @@@ public final class SnapshotReferenceUti } // check if the linked file exists (in the archive, or in the table dir) - HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, linkPath); + HFileLink link = null; + if (MobUtils.isMobRegionInfo(regionInfo)) { + // for mob region - link = new HFileLink(MobUtils.getQualifiedMobRootDir(conf), ++ link = HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf), + HFileArchiveUtil.getArchivePath(conf), linkPath); + } else { + // not mob region - link = new HFileLink(conf, linkPath); ++ link = HFileLink.buildFromHFileLinkPattern(conf, linkPath); + } try { FileStatus fstat = link.getFileStatus(fs); if (storeFile.hasFileSize() && storeFile.getFileSize() != fstat.getLen()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java index a939422,0000000..27d53ba mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java @@@ -1,250 -1,0 +1,251 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; - import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.NamespaceNotFoundException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; ++import org.apache.hadoop.hbase.testclassification.ClientTests; ++import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test clone snapshots from the client + */ - @Category(LargeTests.class) ++@Category({LargeTests.class, ClientTests.class}) +public class TestMobCloneSnapshotFromClient { + final Log LOG = LogFactory.getLog(getClass()); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private final byte[] FAMILY = Bytes.toBytes("cf"); + + private byte[] emptySnapshot; + private byte[] snapshotName0; + private byte[] snapshotName1; + private byte[] snapshotName2; + private int snapshot0Rows; + private int snapshot1Rows; + private TableName tableName; + private Admin admin; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true); + TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); + TEST_UTIL.getConfiguration().setBoolean( + "hbase.master.enabletable.roundrobin", true); + TEST_UTIL.getConfiguration().setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0); + TEST_UTIL.startMiniCluster(3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Initialize the tests with a table filled with some data + * and two snapshots (snapshotName0, snapshotName1) of different states. + * The tableName, snapshotNames and the number of rows in the snapshot are initialized. + */ + @Before + public void setup() throws Exception { + this.admin = TEST_UTIL.getHBaseAdmin(); + + long tid = System.currentTimeMillis(); + tableName = TableName.valueOf("testtb-" + tid); + emptySnapshot = Bytes.toBytes("emptySnaptb-" + tid); + snapshotName0 = Bytes.toBytes("snaptb0-" + tid); + snapshotName1 = Bytes.toBytes("snaptb1-" + tid); + snapshotName2 = Bytes.toBytes("snaptb2-" + tid); + + // create Table and disable it + MobSnapshotTestingUtils.createMobTable(TEST_UTIL, tableName, getNumReplicas(), FAMILY); + admin.disableTable(tableName); + + // take an empty snapshot + admin.snapshot(emptySnapshot, tableName); + + HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); + try { + // enable table and insert data + admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); ++ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + snapshot0Rows = MobSnapshotTestingUtils.countMobRows(table); + admin.disableTable(tableName); + + // take a snapshot + admin.snapshot(snapshotName0, tableName); + + // enable table and insert more data + admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); ++ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + snapshot1Rows = MobSnapshotTestingUtils.countMobRows(table); + admin.disableTable(tableName); + + // take a snapshot of the updated table + admin.snapshot(snapshotName1, tableName); + + // re-enable table + admin.enableTable(tableName); + } finally { + table.close(); + } + } + + protected int getNumReplicas() { + return 1; + } + + @After + public void tearDown() throws Exception { + if (admin.tableExists(tableName)) { + TEST_UTIL.deleteTable(tableName); + } + SnapshotTestingUtils.deleteAllSnapshots(admin); + SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL); + } + + @Test(expected=SnapshotDoesNotExistException.class) + public void testCloneNonExistentSnapshot() throws IOException, InterruptedException { + String snapshotName = "random-snapshot-" + System.currentTimeMillis(); + TableName tableName = TableName.valueOf("random-table-" + System.currentTimeMillis()); + admin.cloneSnapshot(snapshotName, tableName); + } + + @Test(expected = NamespaceNotFoundException.class) + public void testCloneOnMissingNamespace() throws IOException, InterruptedException { + TableName clonedTableName = TableName.valueOf("unknownNS:clonetb"); + admin.cloneSnapshot(snapshotName1, clonedTableName); + } + + @Test + public void testCloneSnapshot() throws IOException, InterruptedException { + TableName clonedTableName = TableName.valueOf("clonedtb-" + System.currentTimeMillis()); + testCloneSnapshot(clonedTableName, snapshotName0, snapshot0Rows); + testCloneSnapshot(clonedTableName, snapshotName1, snapshot1Rows); + testCloneSnapshot(clonedTableName, emptySnapshot, 0); + } + + private void testCloneSnapshot(final TableName tableName, final byte[] snapshotName, + int snapshotRows) throws IOException, InterruptedException { + // create a new table from snapshot + admin.cloneSnapshot(snapshotName, tableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshotRows); + + verifyReplicasCameOnline(tableName); + TEST_UTIL.deleteTable(tableName); + } + + protected void verifyReplicasCameOnline(TableName tableName) throws IOException { + SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas()); + } + + @Test + public void testCloneSnapshotCrossNamespace() throws IOException, InterruptedException { + String nsName = "testCloneSnapshotCrossNamespace"; + admin.createNamespace(NamespaceDescriptor.create(nsName).build()); + TableName clonedTableName = + TableName.valueOf(nsName, "clonedtb-" + System.currentTimeMillis()); + testCloneSnapshot(clonedTableName, snapshotName0, snapshot0Rows); + testCloneSnapshot(clonedTableName, snapshotName1, snapshot1Rows); + testCloneSnapshot(clonedTableName, emptySnapshot, 0); + } + + /** + * Verify that tables created from the snapshot are still alive after source table deletion. + */ + @Test + public void testCloneLinksAfterDelete() throws IOException, InterruptedException { + // Clone a table from the first snapshot + TableName clonedTableName = TableName.valueOf("clonedtb1-" + System.currentTimeMillis()); + admin.cloneSnapshot(snapshotName0, clonedTableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows); + + // Take a snapshot of this cloned table. + admin.disableTable(clonedTableName); + admin.snapshot(snapshotName2, clonedTableName); + + // Clone the snapshot of the cloned table + TableName clonedTableName2 = TableName.valueOf("clonedtb2-" + System.currentTimeMillis()); + admin.cloneSnapshot(snapshotName2, clonedTableName2); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows); + admin.disableTable(clonedTableName2); + + // Remove the original table + TEST_UTIL.deleteTable(tableName); + waitCleanerRun(); + + // Verify the first cloned table + admin.enableTable(clonedTableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows); + + // Verify the second cloned table + admin.enableTable(clonedTableName2); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows); + admin.disableTable(clonedTableName2); + + // Delete the first cloned table + TEST_UTIL.deleteTable(clonedTableName); + waitCleanerRun(); + + // Verify the second cloned table + admin.enableTable(clonedTableName2); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows); + + // Clone a new table from cloned + TableName clonedTableName3 = TableName.valueOf("clonedtb3-" + System.currentTimeMillis()); + admin.cloneSnapshot(snapshotName2, clonedTableName3); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName3, snapshot0Rows); + + // Delete the cloned tables + TEST_UTIL.deleteTable(clonedTableName2); + TEST_UTIL.deleteTable(clonedTableName3); + admin.deleteSnapshot(snapshotName2); + } + + // ========================================================================== + // Helpers + // ========================================================================== + + private void waitCleanerRun() throws InterruptedException { + TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java index c75e006,0000000..0bb498d mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java @@@ -1,303 -1,0 +1,304 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; - import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; +import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; +import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; ++import org.apache.hadoop.hbase.testclassification.ClientTests; ++import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test restore snapshots from the client + */ - @Category(LargeTests.class) ++@Category({ClientTests.class, LargeTests.class}) +public class TestMobRestoreSnapshotFromClient { + final Log LOG = LogFactory.getLog(getClass()); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private final byte[] FAMILY = Bytes.toBytes("cf"); + + private byte[] emptySnapshot; + private byte[] snapshotName0; + private byte[] snapshotName1; + private byte[] snapshotName2; + private int snapshot0Rows; + private int snapshot1Rows; + private TableName tableName; + private Admin admin; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true); + TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); + TEST_UTIL.getConfiguration().setBoolean( + "hbase.master.enabletable.roundrobin", true); + TEST_UTIL.getConfiguration().setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0); + TEST_UTIL.startMiniCluster(3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Initialize the tests with a table filled with some data + * and two snapshots (snapshotName0, snapshotName1) of different states. + * The tableName, snapshotNames and the number of rows in the snapshot are initialized. + */ + @Before + public void setup() throws Exception { + this.admin = TEST_UTIL.getHBaseAdmin(); + + long tid = System.currentTimeMillis(); + tableName = + TableName.valueOf("testtb-" + tid); + emptySnapshot = Bytes.toBytes("emptySnaptb-" + tid); + snapshotName0 = Bytes.toBytes("snaptb0-" + tid); + snapshotName1 = Bytes.toBytes("snaptb1-" + tid); + snapshotName2 = Bytes.toBytes("snaptb2-" + tid); + + // create Table and disable it + MobSnapshotTestingUtils.createMobTable(TEST_UTIL, tableName, getNumReplicas(), FAMILY); + + admin.disableTable(tableName); + + // take an empty snapshot + admin.snapshot(emptySnapshot, tableName); + + HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); + // enable table and insert data + admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); ++ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + snapshot0Rows = MobSnapshotTestingUtils.countMobRows(table); + admin.disableTable(tableName); + + // take a snapshot + admin.snapshot(snapshotName0, tableName); + + // enable table and insert more data + admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); ++ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + snapshot1Rows = MobSnapshotTestingUtils.countMobRows(table); + table.close(); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.deleteTable(tableName); + SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getHBaseAdmin()); + SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL); + } + + @Test + public void testRestoreSnapshot() throws IOException { + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows); + admin.disableTable(tableName); + admin.snapshot(snapshotName1, tableName); + // Restore from snapshot-0 + admin.restoreSnapshot(snapshotName0); + admin.enableTable(tableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows); + SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas()); + + // Restore from emptySnapshot + admin.disableTable(tableName); + admin.restoreSnapshot(emptySnapshot); + admin.enableTable(tableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, 0); + SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas()); + + // Restore from snapshot-1 + admin.disableTable(tableName); + admin.restoreSnapshot(snapshotName1); + admin.enableTable(tableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows); + SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas()); + + // Restore from snapshot-1 + TEST_UTIL.deleteTable(tableName); + admin.restoreSnapshot(snapshotName1); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows); + SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas()); + } + + protected int getNumReplicas() { + return 1; + } + + @Test + public void testRestoreSchemaChange() throws Exception { + byte[] TEST_FAMILY2 = Bytes.toBytes("cf2"); + + HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); + + // Add one column family and put some data in it + admin.disableTable(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY2); + hcd.setMobEnabled(true); + hcd.setMobThreshold(3L); + admin.addColumn(tableName, hcd); + admin.enableTable(tableName); + assertEquals(2, table.getTableDescriptor().getFamilies().size()); + HTableDescriptor htd = admin.getTableDescriptor(tableName); + assertEquals(2, htd.getFamilies().size()); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, TEST_FAMILY2); ++ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, TEST_FAMILY2); + long snapshot2Rows = snapshot1Rows + 500; + assertEquals(snapshot2Rows, MobSnapshotTestingUtils.countMobRows(table)); + assertEquals(500, MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2)); + Set<String> fsFamilies = getFamiliesFromFS(tableName); + assertEquals(2, fsFamilies.size()); + + // Take a snapshot + admin.disableTable(tableName); + admin.snapshot(snapshotName2, tableName); + + // Restore the snapshot (without the cf) + admin.restoreSnapshot(snapshotName0); + admin.enableTable(tableName); + assertEquals(1, table.getTableDescriptor().getFamilies().size()); + try { + MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2); + fail("family '" + Bytes.toString(TEST_FAMILY2) + "' should not exists"); + } catch (NoSuchColumnFamilyException e) { + // expected + } + assertEquals(snapshot0Rows, MobSnapshotTestingUtils.countMobRows(table)); + htd = admin.getTableDescriptor(tableName); + assertEquals(1, htd.getFamilies().size()); + fsFamilies = getFamiliesFromFS(tableName); + assertEquals(1, fsFamilies.size()); + + // Restore back the snapshot (with the cf) + admin.disableTable(tableName); + admin.restoreSnapshot(snapshotName2); + admin.enableTable(tableName); + htd = admin.getTableDescriptor(tableName); + assertEquals(2, htd.getFamilies().size()); + assertEquals(2, table.getTableDescriptor().getFamilies().size()); + assertEquals(500, MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2)); + assertEquals(snapshot2Rows, MobSnapshotTestingUtils.countMobRows(table)); + fsFamilies = getFamiliesFromFS(tableName); + assertEquals(2, fsFamilies.size()); + table.close(); + } + + @Test + public void testCloneSnapshotOfCloned() throws IOException, InterruptedException { + TableName clonedTableName = + TableName.valueOf("clonedtb-" + System.currentTimeMillis()); + admin.cloneSnapshot(snapshotName0, clonedTableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows); + SnapshotTestingUtils.verifyReplicasCameOnline(clonedTableName, admin, getNumReplicas()); + admin.disableTable(clonedTableName); + admin.snapshot(snapshotName2, clonedTableName); + TEST_UTIL.deleteTable(clonedTableName); + waitCleanerRun(); + + admin.cloneSnapshot(snapshotName2, clonedTableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows); + SnapshotTestingUtils.verifyReplicasCameOnline(clonedTableName, admin, getNumReplicas()); + TEST_UTIL.deleteTable(clonedTableName); + } + + @Test + public void testCloneAndRestoreSnapshot() throws IOException, InterruptedException { + TEST_UTIL.deleteTable(tableName); + waitCleanerRun(); + + admin.cloneSnapshot(snapshotName0, tableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows); + SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas()); + waitCleanerRun(); + + admin.disableTable(tableName); + admin.restoreSnapshot(snapshotName0); + admin.enableTable(tableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows); + SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas()); + } + + @Test + public void testCorruptedSnapshot() throws IOException, InterruptedException { + SnapshotTestingUtils.corruptSnapshot(TEST_UTIL, Bytes.toString(snapshotName0)); + TableName cloneName = TableName.valueOf("corruptedClone-" + System.currentTimeMillis()); + try { + admin.cloneSnapshot(snapshotName0, cloneName); + fail("Expected CorruptedSnapshotException, got succeeded cloneSnapshot()"); + } catch (CorruptedSnapshotException e) { + // Got the expected corruption exception. + // check for no references of the cloned table. + assertFalse(admin.tableExists(cloneName)); + } catch (Exception e) { + fail("Expected CorruptedSnapshotException got: " + e); + } + } + + // ========================================================================== + // Helpers + // ========================================================================== + private void waitCleanerRun() throws InterruptedException { + TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); + } + + private Set<String> getFamiliesFromFS(final TableName tableName) throws IOException { + MasterFileSystem mfs = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem(); + Set<String> families = new HashSet<String>(); + Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), tableName); + for (Path regionDir: FSUtils.getRegionDirs(mfs.getFileSystem(), tableDir)) { + for (Path familyDir: FSUtils.getFamilyDirs(mfs.getFileSystem(), regionDir)) { + families.add(familyDir.getName()); + } + } + return families; + } +}
