http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java index d286b72,0000000..37d4461 mode 100644,000000..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java @@@ -1,180 -1,0 +1,182 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; ++import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; ++import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter; +import org.apache.hadoop.hbase.mob.mapreduce.SweepReducer.SweepPartitionId; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.MemStore; +import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Reducer.Context; + +/** + * The wrapper of a DefaultMemStore. + * This wrapper is used in the sweep reducer to buffer and sort the cells written from + * the invalid and small mob files. + * It's flushed when it's full, the mob data are written to the mob files, and their file names + * are written back to store files of HBase. + * This memStore is used to sort the cells in mob files. + * In a reducer of sweep tool, the mob files are grouped by the same prefix (start key and date), + * in each group, the reducer iterates the files and read the cells to a new and bigger mob file. + * The cells in the same mob file are ordered, but cells across mob files are not. + * So we need this MemStoreWrapper to sort those cells come from different mob files before + * flushing them to the disk, when the memStore is big enough it's flushed as a new mob file. + */ [email protected] +public class MemStoreWrapper { + + private static final Log LOG = LogFactory.getLog(MemStoreWrapper.class); + + private MemStore memstore; + private long flushSize; + private SweepPartitionId partitionId; + private Context context; + private Configuration conf; - private HTable table; ++ private BufferedMutator table; + private HColumnDescriptor hcd; + private Path mobFamilyDir; + private FileSystem fs; + private CacheConfig cacheConfig; + - public MemStoreWrapper(Context context, FileSystem fs, HTable table, HColumnDescriptor hcd, ++ public MemStoreWrapper(Context context, FileSystem fs, BufferedMutator table, HColumnDescriptor hcd, + MemStore memstore, CacheConfig cacheConfig) throws IOException { + this.memstore = memstore; + this.context = context; + this.fs = fs; + this.table = table; + this.hcd = hcd; + this.conf = context.getConfiguration(); + this.cacheConfig = cacheConfig; + flushSize = this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE, + MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE); + mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), hcd.getNameAsString()); + } + + public void setPartitionId(SweepPartitionId partitionId) { + this.partitionId = partitionId; + } + + /** + * Flushes the memstore if the size is large enough. + * @throws IOException + */ + private void flushMemStoreIfNecessary() throws IOException { + if (memstore.heapSize() >= flushSize) { + flushMemStore(); + } + } + + /** + * Flushes the memstore anyway. + * @throws IOException + */ + public void flushMemStore() throws IOException { + MemStoreSnapshot snapshot = memstore.snapshot(); + internalFlushCache(snapshot); + memstore.clearSnapshot(snapshot.getId()); + } + + /** + * Flushes the snapshot of the memstore. + * Flushes the mob data to the mob files, and flushes the name of these mob files to HBase. + * @param snapshot The snapshot of the memstore. + * @throws IOException + */ + private void internalFlushCache(final MemStoreSnapshot snapshot) + throws IOException { + if (snapshot.getCellsCount() == 0) { + return; + } + // generate the files into a temp directory. + String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY); + StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd, + partitionId.getDate(), new Path(tempPathString), snapshot.getCellsCount(), + hcd.getCompactionCompression(), partitionId.getStartKey(), cacheConfig); + + String relativePath = mobFileWriter.getPath().getName(); + LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString()); + + byte[] referenceValue = Bytes.toBytes(relativePath); + KeyValueScanner scanner = snapshot.getScanner(); + Cell cell = null; + while (null != (cell = scanner.next())) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + mobFileWriter.append(kv); + } + scanner.close(); + // Write out the log sequence number that corresponds to this output + // hfile. The hfile is current up to and including logCacheFlushId. + mobFileWriter.appendMetadata(Long.MAX_VALUE, false, snapshot.getCellsCount()); + mobFileWriter.close(); + + MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig); + context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1); + // write reference/fileName back to the store files of HBase. + scanner = snapshot.getScanner(); + scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); + cell = null; - Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, this.table.getTableName()); ++ Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, Bytes.toBytes(this.table.getName().toString())); + while (null != (cell = scanner.next())) { + KeyValue reference = MobUtils.createMobRefKeyValue(cell, referenceValue, tableNameTag); + Put put = + new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength()); + put.add(reference); - table.put(put); ++ table.mutate(put); + context.getCounter(SweepCounter.RECORDS_UPDATED).increment(1); + } - table.flushCommits(); ++ table.flush(); + scanner.close(); + } + + /** + * Adds a KeyValue into the memstore. + * @param kv The KeyValue to be added. + * @throws IOException + */ + public void addToMemstore(KeyValue kv) throws IOException { + memstore.add(kv); + // flush the memstore if it's full. + flushMemStoreIfNecessary(); + } + +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java index 73ca1a2,0000000..cbefd8a mode 100644,000000..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java @@@ -1,512 -1,0 +1,509 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.InvalidFamilyOperationException; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; - import org.apache.hadoop.hbase.client.HBaseAdmin; - import org.apache.hadoop.hbase.client.HTable; ++import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFile; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable; +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.DefaultMemStore; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.zookeeper.KeeperException; + +/** + * The reducer of a sweep job. + * This reducer merges the small mob files into bigger ones, and write visited + * names of mob files to a sequence file which is used by the sweep job to delete + * the unused mob files. + * The key of the input is a file name, the value is a collection of KeyValue where + * the KeyValue is the actual cell (its format is valueLength + fileName) in HBase. + * In this reducer, we could know how many cells exist in HBase for a mob file. + * If the existCellSize/mobFileSize < compactionRatio, this mob + * file needs to be merged. + */ [email protected] +public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> { + + private static final Log LOG = LogFactory.getLog(SweepReducer.class); + + private SequenceFile.Writer writer = null; + private MemStoreWrapper memstore; + private Configuration conf; + private FileSystem fs; + + private Path familyDir; + private CacheConfig cacheConfig; + private long compactionBegin; - private HTable table; ++ private BufferedMutator table; + private HColumnDescriptor family; + private long mobCompactionDelay; + private Path mobTableDir; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + this.conf = context.getConfiguration(); ++ Connection c = ConnectionFactory.createConnection(this.conf); + this.fs = FileSystem.get(conf); + // the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing. + mobCompactionDelay = conf.getLong(SweepJob.MOB_SWEEP_JOB_DELAY, SweepJob.ONE_DAY); + String tableName = conf.get(TableInputFormat.INPUT_TABLE); + String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY); + TableName tn = TableName.valueOf(tableName); + this.familyDir = MobUtils.getMobFamilyPath(conf, tn, familyName); - HBaseAdmin admin = new HBaseAdmin(this.conf); ++ Admin admin = c.getAdmin(); + try { + family = admin.getTableDescriptor(tn).getFamily(Bytes.toBytes(familyName)); + if (family == null) { + // this column family might be removed, directly return. + throw new InvalidFamilyOperationException("Column family '" + familyName + + "' does not exist. It might be removed."); + } + } finally { + try { + admin.close(); + } catch (IOException e) { + LOG.warn("Fail to close the HBaseAdmin", e); + } + } + // disable the block cache. + Configuration copyOfConf = new Configuration(conf); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + this.cacheConfig = new CacheConfig(copyOfConf); + - table = new HTable(this.conf, Bytes.toBytes(tableName)); - table.setAutoFlush(false, false); - - table.setWriteBufferSize(1 * 1024 * 1024); // 1MB ++ table = c.getBufferedMutator(new BufferedMutatorParams(tn).writeBufferSize(1*1024*1024)); + memstore = new MemStoreWrapper(context, fs, table, family, new DefaultMemStore(), cacheConfig); + + // The start time of the sweep tool. + // Only the mob files whose creation time is older than startTime-oneDay will be handled by the + // reducer since it brings inconsistency to handle the latest mob files. + this.compactionBegin = conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, 0); + mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tn); + } + + private SweepPartition createPartition(SweepPartitionId id, Context context) throws IOException { + return new SweepPartition(id, context); + } + + @Override + public void run(Context context) throws IOException, InterruptedException { + String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID); + String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME); + String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId, + new DummyMobAbortable()); + FSDataOutputStream fout = null; + try { + SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner); + tracker.start(); + setup(context); + // create a sequence contains all the visited file names in this reducer. + String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY); + Path nameFilePath = new Path(dir, UUID.randomUUID().toString() + .replace("-", MobConstants.EMPTY_STRING)); + fout = fs.create(nameFilePath, true); + writer = SequenceFile.createWriter(context.getConfiguration(), fout, String.class, + String.class, CompressionType.NONE, null); + SweepPartitionId id; + SweepPartition partition = null; + // the mob files which have the same start key and date are in the same partition. + while (context.nextKey()) { + Text key = context.getCurrentKey(); + String keyString = key.toString(); + id = SweepPartitionId.create(keyString); + if (null == partition || !id.equals(partition.getId())) { + // It's the first mob file in the current partition. + if (null != partition) { + // this mob file is in different partitions with the previous mob file. + // directly close. + partition.close(); + } + // create a new one + partition = createPartition(id, context); + } + if (partition != null) { + // run the partition + partition.execute(key, context.getValues()); + } + } + if (null != partition) { + partition.close(); + } + writer.hflush(); + } catch (KeeperException e) { + throw new IOException(e); + } finally { + cleanup(context); + zkw.close(); + if (writer != null) { + IOUtils.closeStream(writer); + } + if (fout != null) { + IOUtils.closeStream(fout); + } + if (table != null) { + try { + table.close(); + } catch (IOException e) { + LOG.warn(e); + } + } + } + + } + + /** + * The mob files which have the same start key and date are in the same partition. + * The files in the same partition are merged together into bigger ones. + */ + public class SweepPartition { + + private final SweepPartitionId id; + private final Context context; + private boolean memstoreUpdated = false; + private boolean mergeSmall = false; + private final Map<String, MobFileStatus> fileStatusMap = new HashMap<String, MobFileStatus>(); + private final List<Path> toBeDeleted = new ArrayList<Path>(); + + public SweepPartition(SweepPartitionId id, Context context) throws IOException { + this.id = id; + this.context = context; + memstore.setPartitionId(id); + init(); + } + + public SweepPartitionId getId() { + return this.id; + } + + /** + * Prepares the map of files. + * + * @throws IOException + */ + private void init() throws IOException { + FileStatus[] fileStats = listStatus(familyDir, id.getStartKey()); + if (null == fileStats) { + return; + } + + int smallFileCount = 0; + float compactionRatio = conf.getFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, + MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO); + long compactionMergeableSize = conf.getLong( + MobConstants.MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE, + MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE); + // list the files. Just merge the hfiles, don't merge the hfile links. + // prepare the map of mob files. The key is the file name, the value is the file status. + for (FileStatus fileStat : fileStats) { + MobFileStatus mobFileStatus = null; + if (!HFileLink.isHFileLink(fileStat.getPath())) { + mobFileStatus = new MobFileStatus(fileStat, compactionRatio, compactionMergeableSize); + if (mobFileStatus.needMerge()) { + smallFileCount++; + } + // key is file name (not hfile name), value is hfile status. + fileStatusMap.put(fileStat.getPath().getName(), mobFileStatus); + } + } + if (smallFileCount >= 2) { + // merge the files only when there're more than 1 files in the same partition. + this.mergeSmall = true; + } + } + + /** + * Flushes the data into mob files and store files, and archives the small + * files after they're merged. + * @throws IOException + */ + public void close() throws IOException { + if (null == id) { + return; + } + // flush remain key values into mob files + if (memstoreUpdated) { + memstore.flushMemStore(); + } + List<StoreFile> storeFiles = new ArrayList<StoreFile>(toBeDeleted.size()); + // delete samll files after compaction + for (Path path : toBeDeleted) { + LOG.info("[In Partition close] Delete the file " + path + " in partition close"); + storeFiles.add(new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE)); + } + if (!storeFiles.isEmpty()) { + try { + MobUtils.removeMobFiles(conf, fs, table.getName(), mobTableDir, family.getName(), + storeFiles); + context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(storeFiles.size()); + } catch (IOException e) { + LOG.error("Fail to archive the store files " + storeFiles, e); + } + storeFiles.clear(); + } + fileStatusMap.clear(); + } + + /** + * Merges the small mob files into bigger ones. + * @param fileName The current mob file name. + * @param values The collection of KeyValues in this mob file. + * @throws IOException + */ + public void execute(Text fileName, Iterable<KeyValue> values) throws IOException { + if (null == values) { + return; + } + MobFileName mobFileName = MobFileName.create(fileName.toString()); + LOG.info("[In reducer] The file name: " + fileName.toString()); + MobFileStatus mobFileStat = fileStatusMap.get(mobFileName.getFileName()); + if (null == mobFileStat) { + LOG.info("[In reducer] Cannot find the file, probably this record is obsolete"); + return; + } + // only handle the files that are older then one day. + if (compactionBegin - mobFileStat.getFileStatus().getModificationTime() + <= mobCompactionDelay) { + return; + } + // write the hfile name + writer.append(mobFileName.getFileName(), MobConstants.EMPTY_STRING); + Set<KeyValue> kvs = new HashSet<KeyValue>(); + for (KeyValue kv : values) { + if (kv.getValueLength() > Bytes.SIZEOF_INT) { + mobFileStat.addValidSize(Bytes.toInt(kv.getValueArray(), kv.getValueOffset(), + Bytes.SIZEOF_INT)); + } + kvs.add(kv.createKeyOnly(false)); + } + // If the mob file is a invalid one or a small one, merge it into new/bigger ones. + if (mobFileStat.needClean() || (mergeSmall && mobFileStat.needMerge())) { + context.getCounter(SweepCounter.INPUT_FILE_COUNT).increment(1); + MobFile file = MobFile.create(fs, + new Path(familyDir, mobFileName.getFileName()), conf, cacheConfig); + StoreFileScanner scanner = null; + try { + scanner = file.getScanner(); + scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY)); + Cell cell; + while (null != (cell = scanner.next())) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + KeyValue keyOnly = kv.createKeyOnly(false); + if (kvs.contains(keyOnly)) { + // write the KeyValue existing in HBase to the memstore. + memstore.addToMemstore(kv); + memstoreUpdated = true; + } + } + } finally { + if (scanner != null) { + scanner.close(); + } + } + toBeDeleted.add(mobFileStat.getFileStatus().getPath()); + } + } + + /** + * Lists the files with the same prefix. + * @param p The file path. + * @param prefix The prefix. + * @return The files with the same prefix. + * @throws IOException + */ + private FileStatus[] listStatus(Path p, String prefix) throws IOException { + return fs.listStatus(p, new PathPrefixFilter(prefix)); + } + } + + static class PathPrefixFilter implements PathFilter { + + private final String prefix; + + public PathPrefixFilter(String prefix) { + this.prefix = prefix; + } + + public boolean accept(Path path) { + return path.getName().startsWith(prefix, 0); + } + + } + + /** + * The sweep partition id. + * It consists of the start key and date. + * The start key is a hex string of the checksum of a region start key. + * The date is the latest timestamp of cells in a mob file. + */ + public static class SweepPartitionId { + private String date; + private String startKey; + + public SweepPartitionId(MobFileName fileName) { + this.date = fileName.getDate(); + this.startKey = fileName.getStartKey(); + } + + public SweepPartitionId(String date, String startKey) { + this.date = date; + this.startKey = startKey; + } + + public static SweepPartitionId create(String key) { + return new SweepPartitionId(MobFileName.create(key)); + } + + @Override + public boolean equals(Object anObject) { + if (this == anObject) { + return true; + } + if (anObject instanceof SweepPartitionId) { + SweepPartitionId another = (SweepPartitionId) anObject; + if (this.date.equals(another.getDate()) && this.startKey.equals(another.getStartKey())) { + return true; + } + } + return false; + } + + public String getDate() { + return this.date; + } + + public String getStartKey() { + return this.startKey; + } + + public void setDate(String date) { + this.date = date; + } + + public void setStartKey(String startKey) { + this.startKey = startKey; + } + } + + /** + * The mob file status used in the sweep reduecer. + */ + private static class MobFileStatus { + private FileStatus fileStatus; + private int validSize; + private long size; + + private float compactionRatio = MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO; + private long compactionMergeableSize = + MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE; + + /** + * @param fileStatus The current FileStatus. + * @param compactionRatio compactionRatio the invalid ratio. + * If there're too many cells deleted in a mob file, it's regarded as invalid, + * and needs to be written to a new one. + * If existingCellSize/fileSize < compactionRatio, it's regarded as a invalid one. + * @param compactionMergeableSize compactionMergeableSize If the size of a mob file is less + * than this value, it's regarded as a small file and needs to be merged + */ + public MobFileStatus(FileStatus fileStatus, float compactionRatio, + long compactionMergeableSize) { + this.fileStatus = fileStatus; + this.size = fileStatus.getLen(); + validSize = 0; + this.compactionRatio = compactionRatio; + this.compactionMergeableSize = compactionMergeableSize; + } + + /** + * Add size to this file. + * @param size The size to be added. + */ + public void addValidSize(int size) { + this.validSize += size; + } + + /** + * Whether the mob files need to be cleaned. + * If there're too many cells deleted in this mob file, it needs to be cleaned. + * @return True if it needs to be cleaned. + */ + public boolean needClean() { + return validSize < compactionRatio * size; + } + + /** + * Whether the mob files need to be merged. + * If this mob file is too small, it needs to be merged. + * @return True if it needs to be merged. + */ + public boolean needMerge() { + return this.size < compactionMergeableSize; + } + + /** + * Gets the file status. + * @return The file status. + */ + public FileStatus getFileStatus() { + return fileStatus; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index 73b8cb9,73b8cb9..8ff4840 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@@ -85,7 -85,7 +85,7 @@@ public class DefaultStoreFlusher extend scanner.close(); } LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize=" -- + StringUtils.humanReadableInt(snapshot.getSize()) + ++ + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getSize(), "", 1) + ", hasBloomFilter=" + writer.hasGeneralBloom() + ", into tmp file " + writer.getPath()); result.add(writer.getPath()); http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ab0165d,e082698..6684309 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@@ -3276,34 -3421,12 +3422,30 @@@ public class HRegion implements HeapSiz Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir); SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(), - snapshotDir, desc, exnSnare); + snapshotDir, desc, exnSnare); manifest.addRegion(this); + + // The regionserver holding the first region of the table is responsible for taking the + // manifest of the mob dir. - if (!Bytes.equals(getStartKey(), HConstants.EMPTY_START_ROW)) ++ if (!Bytes.equals(getRegionInfo().getStartKey(), HConstants.EMPTY_START_ROW)) + return; + + // if any cf's have is mob enabled, add the "mob region" to the manifest. - Map<byte[], Store> stores = getStores(); - for (Entry<byte[], Store> store : stores.entrySet()) { - boolean hasMobStore = store.getValue().getFamily().isMobEnabled(); ++ List<Store> stores = getStores(); ++ for (Store store : stores) { ++ boolean hasMobStore = store.getFamily().isMobEnabled(); + if (hasMobStore) { + // use the .mob as the start key and 0 as the regionid + HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(this.getTableDesc().getTableName()); + mobRegionInfo.setOffline(true); + manifest.addMobRegion(mobRegionInfo, this.getTableDesc().getColumnFamilies()); + return; + } + } } - /** - * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the - * provided current timestamp. - * @throws IOException - */ - void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now) + @Override + public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now) throws IOException { for (List<Cell> cells: cellItr) { if (cells == null) continue; http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index 159ec55,8f7dee4..ea9558f --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@@ -549,27 -461,19 +560,28 @@@ class MetricsRegionServerWrapperImp long tempFlushedCellsSize = 0; long tempCompactedCellsSize = 0; long tempMajorCompactedCellsSize = 0; + long tempMobCompactedIntoMobCellsCount = 0; + long tempMobCompactedFromMobCellsCount = 0; + long tempMobCompactedIntoMobCellsSize = 0; + long tempMobCompactedFromMobCellsSize = 0; + long tempMobFlushCount = 0; + long tempMobFlushedCellsCount = 0; + long tempMobFlushedCellsSize = 0; + long tempMobScanCellsCount = 0; + long tempMobScanCellsSize = 0; long tempBlockedRequestsCount = 0L; - for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { - tempNumMutationsWithoutWAL += r.numMutationsWithoutWAL.get(); - tempDataInMemoryWithoutWAL += r.dataInMemoryWithoutWAL.get(); - tempReadRequestsCount += r.readRequestsCount.get(); - tempWriteRequestsCount += r.writeRequestsCount.get(); - tempCheckAndMutateChecksFailed += r.checkAndMutateChecksFailed.get(); - tempCheckAndMutateChecksPassed += r.checkAndMutateChecksPassed.get(); + for (Region r : regionServer.getOnlineRegionsLocalContext()) { + tempNumMutationsWithoutWAL += r.getNumMutationsWithoutWAL(); + tempDataInMemoryWithoutWAL += r.getDataInMemoryWithoutWAL(); + tempReadRequestsCount += r.getReadRequestsCount(); + tempWriteRequestsCount += r.getWriteRequestsCount(); + tempCheckAndMutateChecksFailed += r.getCheckAndMutateChecksFailed(); + tempCheckAndMutateChecksPassed += r.getCheckAndMutateChecksPassed(); tempBlockedRequestsCount += r.getBlockedRequestsCount(); - tempNumStores += r.stores.size(); - for (Store store : r.stores.values()) { + List<Store> storeList = r.getStores(); + tempNumStores += storeList.size(); + for (Store store : storeList) { tempNumStoreFiles += store.getStorefilesCount(); tempMemstoreSize += store.getMemStoreSize(); tempStoreFileSize += store.getStorefilesSize(); @@@ -582,21 -486,13 +594,25 @@@ tempFlushedCellsSize += store.getFlushedCellsSize(); tempCompactedCellsSize += store.getCompactedCellsSize(); tempMajorCompactedCellsSize += store.getMajorCompactedCellsSize(); + if (store instanceof HMobStore) { + HMobStore mobStore = (HMobStore) store; + tempMobCompactedIntoMobCellsCount += mobStore.getMobCompactedIntoMobCellsCount(); + tempMobCompactedFromMobCellsCount += mobStore.getMobCompactedFromMobCellsCount(); + tempMobCompactedIntoMobCellsSize += mobStore.getMobCompactedIntoMobCellsSize(); + tempMobCompactedFromMobCellsSize += mobStore.getMobCompactedFromMobCellsSize(); + tempMobFlushCount += mobStore.getMobFlushCount(); + tempMobFlushedCellsCount += mobStore.getMobFlushedCellsCount(); + tempMobFlushedCellsSize += mobStore.getMobFlushedCellsSize(); + tempMobScanCellsCount += mobStore.getMobScanCellsCount(); + tempMobScanCellsSize += mobStore.getMobScanCellsSize(); + } } - hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution()); + HDFSBlocksDistribution distro = r.getHDFSBlocksDistribution(); + hdfsBlocksDistribution.add(distro); + if (r.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { + hdfsBlocksDistributionSecondaryRegions.add(distro); + } } float localityIndex = hdfsBlocksDistribution.getBlockLocalityIndex( http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java index f7f0acd,0000000..5739df1 mode 100644,000000..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java @@@ -1,80 -1,0 +1,80 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mob.MobUtils; + +/** + * Scanner scans both the memstore and the MOB Store. Coalesce KeyValue stream into List<KeyValue> + * for a single row. + * + */ [email protected] +public class MobStoreScanner extends StoreScanner { + + private boolean cacheMobBlocks = false; + private final HMobStore mobStore; + + public MobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, + final NavigableSet<byte[]> columns, long readPt) throws IOException { + super(store, scanInfo, scan, columns, readPt); + cacheMobBlocks = MobUtils.isCacheMobBlocks(scan); + if (!(store instanceof HMobStore)) { + throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); + } + mobStore = (HMobStore) store; + } + + /** + * Firstly reads the cells from the HBase. If the cell are a reference cell (which has the + * reference tag), the scanner need seek this cell from the mob file, and use the cell found + * from the mob file as the result. + */ + @Override - public boolean next(List<Cell> outResult, int limit) throws IOException { - boolean result = super.next(outResult, limit); ++ public boolean next(List<Cell> outResult, ScannerContext ctx) throws IOException { ++ boolean result = super.next(outResult, ctx); + if (!MobUtils.isRawMobScan(scan)) { + // retrieve the mob data + if (outResult.isEmpty()) { + return result; + } + long mobKVCount = 0; + long mobKVSize = 0; + for (int i = 0; i < outResult.size(); i++) { + Cell cell = outResult.get(i); + if (MobUtils.isMobReferenceCell(cell)) { + Cell mobCell = mobStore.resolve(cell, cacheMobBlocks); + mobKVCount++; + mobKVSize += mobCell.getValueLength(); + outResult.set(i, mobCell); + } + } + mobStore.updateMobScanCellsCount(mobKVCount); + mobStore.updateMobScanCellsSize(mobKVSize); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java index 4c46218,0000000..85be382 mode 100644,000000..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java @@@ -1,80 -1,0 +1,80 @@@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mob.MobUtils; + +/** + * ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support + * reversed scanning in both the memstore and the MOB store. + * + */ [email protected] +public class ReversedMobStoreScanner extends ReversedStoreScanner { + + private boolean cacheMobBlocks = false; + protected final HMobStore mobStore; + + ReversedMobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, + long readPt) throws IOException { + super(store, scanInfo, scan, columns, readPt); + cacheMobBlocks = MobUtils.isCacheMobBlocks(scan); + if (!(store instanceof HMobStore)) { + throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); + } + mobStore = (HMobStore) store; + } + + /** + * Firstly reads the cells from the HBase. If the cell are a reference cell (which has the + * reference tag), the scanner need seek this cell from the mob file, and use the cell found + * from the mob file as the result. + */ + @Override - public boolean next(List<Cell> outResult, int limit) throws IOException { - boolean result = super.next(outResult, limit); ++ public boolean next(List<Cell> outResult, ScannerContext ctx) throws IOException { ++ boolean result = super.next(outResult, ctx); + if (!MobUtils.isRawMobScan(scan)) { + // retrieve the mob data + if (outResult.isEmpty()) { + return result; + } + long mobKVCount = 0; + long mobKVSize = 0; + for (int i = 0; i < outResult.size(); i++) { + Cell cell = outResult.get(i); + if (MobUtils.isMobReferenceCell(cell)) { + Cell mobCell = mobStore.resolve(cell, cacheMobBlocks); + mobKVCount++; + mobKVSize += mobCell.getValueLength(); + outResult.set(i, mobCell); + } + } + mobStore.updateMobScanCellsCount(mobKVCount); + mobStore.updateMobScanCellsSize(mobKVSize); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java index 27d53ba,0000000..60fc0ff mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java @@@ -1,251 -1,0 +1,252 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.NamespaceNotFoundException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test clone snapshots from the client + */ +@Category({LargeTests.class, ClientTests.class}) +public class TestMobCloneSnapshotFromClient { + final Log LOG = LogFactory.getLog(getClass()); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private final byte[] FAMILY = Bytes.toBytes("cf"); + + private byte[] emptySnapshot; + private byte[] snapshotName0; + private byte[] snapshotName1; + private byte[] snapshotName2; + private int snapshot0Rows; + private int snapshot1Rows; + private TableName tableName; + private Admin admin; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true); + TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); + TEST_UTIL.getConfiguration().setBoolean( + "hbase.master.enabletable.roundrobin", true); + TEST_UTIL.getConfiguration().setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0); + TEST_UTIL.startMiniCluster(3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Initialize the tests with a table filled with some data + * and two snapshots (snapshotName0, snapshotName1) of different states. + * The tableName, snapshotNames and the number of rows in the snapshot are initialized. + */ + @Before + public void setup() throws Exception { + this.admin = TEST_UTIL.getHBaseAdmin(); + + long tid = System.currentTimeMillis(); + tableName = TableName.valueOf("testtb-" + tid); + emptySnapshot = Bytes.toBytes("emptySnaptb-" + tid); + snapshotName0 = Bytes.toBytes("snaptb0-" + tid); + snapshotName1 = Bytes.toBytes("snaptb1-" + tid); + snapshotName2 = Bytes.toBytes("snaptb2-" + tid); + + // create Table and disable it + MobSnapshotTestingUtils.createMobTable(TEST_UTIL, tableName, getNumReplicas(), FAMILY); + admin.disableTable(tableName); + + // take an empty snapshot + admin.snapshot(emptySnapshot, tableName); + - HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); ++ Connection c = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); ++ Table table = c.getTable(tableName); + try { + // enable table and insert data + admin.enableTable(tableName); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + snapshot0Rows = MobSnapshotTestingUtils.countMobRows(table); + admin.disableTable(tableName); + + // take a snapshot + admin.snapshot(snapshotName0, tableName); + + // enable table and insert more data + admin.enableTable(tableName); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + snapshot1Rows = MobSnapshotTestingUtils.countMobRows(table); + admin.disableTable(tableName); + + // take a snapshot of the updated table + admin.snapshot(snapshotName1, tableName); + + // re-enable table + admin.enableTable(tableName); + } finally { + table.close(); + } + } + + protected int getNumReplicas() { + return 1; + } + + @After + public void tearDown() throws Exception { + if (admin.tableExists(tableName)) { + TEST_UTIL.deleteTable(tableName); + } + SnapshotTestingUtils.deleteAllSnapshots(admin); + SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL); + } + + @Test(expected=SnapshotDoesNotExistException.class) + public void testCloneNonExistentSnapshot() throws IOException, InterruptedException { + String snapshotName = "random-snapshot-" + System.currentTimeMillis(); + TableName tableName = TableName.valueOf("random-table-" + System.currentTimeMillis()); + admin.cloneSnapshot(snapshotName, tableName); + } + + @Test(expected = NamespaceNotFoundException.class) + public void testCloneOnMissingNamespace() throws IOException, InterruptedException { + TableName clonedTableName = TableName.valueOf("unknownNS:clonetb"); + admin.cloneSnapshot(snapshotName1, clonedTableName); + } + + @Test + public void testCloneSnapshot() throws IOException, InterruptedException { + TableName clonedTableName = TableName.valueOf("clonedtb-" + System.currentTimeMillis()); + testCloneSnapshot(clonedTableName, snapshotName0, snapshot0Rows); + testCloneSnapshot(clonedTableName, snapshotName1, snapshot1Rows); + testCloneSnapshot(clonedTableName, emptySnapshot, 0); + } + + private void testCloneSnapshot(final TableName tableName, final byte[] snapshotName, + int snapshotRows) throws IOException, InterruptedException { + // create a new table from snapshot + admin.cloneSnapshot(snapshotName, tableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshotRows); + + verifyReplicasCameOnline(tableName); + TEST_UTIL.deleteTable(tableName); + } + + protected void verifyReplicasCameOnline(TableName tableName) throws IOException { + SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas()); + } + + @Test + public void testCloneSnapshotCrossNamespace() throws IOException, InterruptedException { + String nsName = "testCloneSnapshotCrossNamespace"; + admin.createNamespace(NamespaceDescriptor.create(nsName).build()); + TableName clonedTableName = + TableName.valueOf(nsName, "clonedtb-" + System.currentTimeMillis()); + testCloneSnapshot(clonedTableName, snapshotName0, snapshot0Rows); + testCloneSnapshot(clonedTableName, snapshotName1, snapshot1Rows); + testCloneSnapshot(clonedTableName, emptySnapshot, 0); + } + + /** + * Verify that tables created from the snapshot are still alive after source table deletion. + */ + @Test + public void testCloneLinksAfterDelete() throws IOException, InterruptedException { + // Clone a table from the first snapshot + TableName clonedTableName = TableName.valueOf("clonedtb1-" + System.currentTimeMillis()); + admin.cloneSnapshot(snapshotName0, clonedTableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows); + + // Take a snapshot of this cloned table. + admin.disableTable(clonedTableName); + admin.snapshot(snapshotName2, clonedTableName); + + // Clone the snapshot of the cloned table + TableName clonedTableName2 = TableName.valueOf("clonedtb2-" + System.currentTimeMillis()); + admin.cloneSnapshot(snapshotName2, clonedTableName2); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows); + admin.disableTable(clonedTableName2); + + // Remove the original table + TEST_UTIL.deleteTable(tableName); + waitCleanerRun(); + + // Verify the first cloned table + admin.enableTable(clonedTableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows); + + // Verify the second cloned table + admin.enableTable(clonedTableName2); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows); + admin.disableTable(clonedTableName2); + + // Delete the first cloned table + TEST_UTIL.deleteTable(clonedTableName); + waitCleanerRun(); + + // Verify the second cloned table + admin.enableTable(clonedTableName2); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows); + + // Clone a new table from cloned + TableName clonedTableName3 = TableName.valueOf("clonedtb3-" + System.currentTimeMillis()); + admin.cloneSnapshot(snapshotName2, clonedTableName3); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName3, snapshot0Rows); + + // Delete the cloned tables + TEST_UTIL.deleteTable(clonedTableName2); + TEST_UTIL.deleteTable(clonedTableName3); + admin.deleteSnapshot(snapshotName2); + } + + // ========================================================================== + // Helpers + // ========================================================================== + + private void waitCleanerRun() throws InterruptedException { + TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java index 0bb498d,0000000..6fc2d28 mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java @@@ -1,304 -1,0 +1,306 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; +import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; +import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test restore snapshots from the client + */ +@Category({ClientTests.class, LargeTests.class}) +public class TestMobRestoreSnapshotFromClient { + final Log LOG = LogFactory.getLog(getClass()); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private final byte[] FAMILY = Bytes.toBytes("cf"); + + private byte[] emptySnapshot; + private byte[] snapshotName0; + private byte[] snapshotName1; + private byte[] snapshotName2; + private int snapshot0Rows; + private int snapshot1Rows; + private TableName tableName; + private Admin admin; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true); + TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); + TEST_UTIL.getConfiguration().setBoolean( + "hbase.master.enabletable.roundrobin", true); + TEST_UTIL.getConfiguration().setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0); + TEST_UTIL.startMiniCluster(3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Initialize the tests with a table filled with some data + * and two snapshots (snapshotName0, snapshotName1) of different states. + * The tableName, snapshotNames and the number of rows in the snapshot are initialized. + */ + @Before + public void setup() throws Exception { + this.admin = TEST_UTIL.getHBaseAdmin(); + + long tid = System.currentTimeMillis(); + tableName = + TableName.valueOf("testtb-" + tid); + emptySnapshot = Bytes.toBytes("emptySnaptb-" + tid); + snapshotName0 = Bytes.toBytes("snaptb0-" + tid); + snapshotName1 = Bytes.toBytes("snaptb1-" + tid); + snapshotName2 = Bytes.toBytes("snaptb2-" + tid); + + // create Table and disable it + MobSnapshotTestingUtils.createMobTable(TEST_UTIL, tableName, getNumReplicas(), FAMILY); + + admin.disableTable(tableName); + + // take an empty snapshot + admin.snapshot(emptySnapshot, tableName); + - HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); ++ Table table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()) ++ .getTable(tableName); + // enable table and insert data + admin.enableTable(tableName); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + snapshot0Rows = MobSnapshotTestingUtils.countMobRows(table); + admin.disableTable(tableName); + + // take a snapshot + admin.snapshot(snapshotName0, tableName); + + // enable table and insert more data + admin.enableTable(tableName); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + snapshot1Rows = MobSnapshotTestingUtils.countMobRows(table); + table.close(); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.deleteTable(tableName); + SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getHBaseAdmin()); + SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL); + } + + @Test + public void testRestoreSnapshot() throws IOException { + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows); + admin.disableTable(tableName); + admin.snapshot(snapshotName1, tableName); + // Restore from snapshot-0 + admin.restoreSnapshot(snapshotName0); + admin.enableTable(tableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows); + SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas()); + + // Restore from emptySnapshot + admin.disableTable(tableName); + admin.restoreSnapshot(emptySnapshot); + admin.enableTable(tableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, 0); + SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas()); + + // Restore from snapshot-1 + admin.disableTable(tableName); + admin.restoreSnapshot(snapshotName1); + admin.enableTable(tableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows); + SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas()); + + // Restore from snapshot-1 + TEST_UTIL.deleteTable(tableName); + admin.restoreSnapshot(snapshotName1); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows); + SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas()); + } + + protected int getNumReplicas() { + return 1; + } + + @Test + public void testRestoreSchemaChange() throws Exception { + byte[] TEST_FAMILY2 = Bytes.toBytes("cf2"); + - HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); ++ Table table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()) ++ .getTable(tableName); + + // Add one column family and put some data in it + admin.disableTable(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY2); + hcd.setMobEnabled(true); + hcd.setMobThreshold(3L); + admin.addColumn(tableName, hcd); + admin.enableTable(tableName); + assertEquals(2, table.getTableDescriptor().getFamilies().size()); + HTableDescriptor htd = admin.getTableDescriptor(tableName); + assertEquals(2, htd.getFamilies().size()); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, TEST_FAMILY2); + long snapshot2Rows = snapshot1Rows + 500; + assertEquals(snapshot2Rows, MobSnapshotTestingUtils.countMobRows(table)); + assertEquals(500, MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2)); + Set<String> fsFamilies = getFamiliesFromFS(tableName); + assertEquals(2, fsFamilies.size()); + + // Take a snapshot + admin.disableTable(tableName); + admin.snapshot(snapshotName2, tableName); + + // Restore the snapshot (without the cf) + admin.restoreSnapshot(snapshotName0); + admin.enableTable(tableName); + assertEquals(1, table.getTableDescriptor().getFamilies().size()); + try { + MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2); + fail("family '" + Bytes.toString(TEST_FAMILY2) + "' should not exists"); + } catch (NoSuchColumnFamilyException e) { + // expected + } + assertEquals(snapshot0Rows, MobSnapshotTestingUtils.countMobRows(table)); + htd = admin.getTableDescriptor(tableName); + assertEquals(1, htd.getFamilies().size()); + fsFamilies = getFamiliesFromFS(tableName); + assertEquals(1, fsFamilies.size()); + + // Restore back the snapshot (with the cf) + admin.disableTable(tableName); + admin.restoreSnapshot(snapshotName2); + admin.enableTable(tableName); + htd = admin.getTableDescriptor(tableName); + assertEquals(2, htd.getFamilies().size()); + assertEquals(2, table.getTableDescriptor().getFamilies().size()); + assertEquals(500, MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2)); + assertEquals(snapshot2Rows, MobSnapshotTestingUtils.countMobRows(table)); + fsFamilies = getFamiliesFromFS(tableName); + assertEquals(2, fsFamilies.size()); + table.close(); + } + + @Test + public void testCloneSnapshotOfCloned() throws IOException, InterruptedException { + TableName clonedTableName = + TableName.valueOf("clonedtb-" + System.currentTimeMillis()); + admin.cloneSnapshot(snapshotName0, clonedTableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows); + SnapshotTestingUtils.verifyReplicasCameOnline(clonedTableName, admin, getNumReplicas()); + admin.disableTable(clonedTableName); + admin.snapshot(snapshotName2, clonedTableName); + TEST_UTIL.deleteTable(clonedTableName); + waitCleanerRun(); + + admin.cloneSnapshot(snapshotName2, clonedTableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows); + SnapshotTestingUtils.verifyReplicasCameOnline(clonedTableName, admin, getNumReplicas()); + TEST_UTIL.deleteTable(clonedTableName); + } + + @Test + public void testCloneAndRestoreSnapshot() throws IOException, InterruptedException { + TEST_UTIL.deleteTable(tableName); + waitCleanerRun(); + + admin.cloneSnapshot(snapshotName0, tableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows); + SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas()); + waitCleanerRun(); + + admin.disableTable(tableName); + admin.restoreSnapshot(snapshotName0); + admin.enableTable(tableName); + MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows); + SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas()); + } + + @Test + public void testCorruptedSnapshot() throws IOException, InterruptedException { + SnapshotTestingUtils.corruptSnapshot(TEST_UTIL, Bytes.toString(snapshotName0)); + TableName cloneName = TableName.valueOf("corruptedClone-" + System.currentTimeMillis()); + try { + admin.cloneSnapshot(snapshotName0, cloneName); + fail("Expected CorruptedSnapshotException, got succeeded cloneSnapshot()"); + } catch (CorruptedSnapshotException e) { + // Got the expected corruption exception. + // check for no references of the cloned table. + assertFalse(admin.tableExists(cloneName)); + } catch (Exception e) { + fail("Expected CorruptedSnapshotException got: " + e); + } + } + + // ========================================================================== + // Helpers + // ========================================================================== + private void waitCleanerRun() throws InterruptedException { + TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); + } + + private Set<String> getFamiliesFromFS(final TableName tableName) throws IOException { + MasterFileSystem mfs = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem(); + Set<String> families = new HashSet<String>(); + Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), tableName); + for (Path regionDir: FSUtils.getRegionDirs(mfs.getFileSystem(), tableDir)) { + for (Path familyDir: FSUtils.getFamilyDirs(mfs.getFileSystem(), regionDir)) { + families.add(familyDir.getName()); + } + } + return families; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java ---------------------------------------------------------------------- diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java index 612b98a,0000000..a2cd51c mode 100644,000000..100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java @@@ -1,376 -1,0 +1,375 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; +import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test to verify that the cloned table is independent of the table from which it was cloned + */ +@Category(LargeTests.class) +public class TestMobSnapshotCloneIndependence { + private static final Log LOG = LogFactory.getLog(TestSnapshotCloneIndependence.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final int NUM_RS = 2; + private static final String STRING_TABLE_NAME = "test"; + private static final String TEST_FAM_STR = "fam"; + private static final byte[] TEST_FAM = Bytes.toBytes(TEST_FAM_STR); + private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME); + + /** + * Setup the config for the cluster and start it + * @throws Exception on failure + */ + @BeforeClass + public static void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(NUM_RS); + } + + private static void setupConf(Configuration conf) { + // enable snapshot support + conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + // disable the ui + conf.setInt("hbase.regionsever.info.port", -1); + // change the flush size to a small amount, regulating number of store files + conf.setInt("hbase.hregion.memstore.flush.size", 25000); + // so make sure we get a compaction when doing a load, but keep around + // some files in the store + conf.setInt("hbase.hstore.compaction.min", 10); + conf.setInt("hbase.hstore.compactionThreshold", 10); + // block writes if we get to 12 store files + conf.setInt("hbase.hstore.blockingStoreFiles", 12); + conf.setInt("hbase.regionserver.msginterval", 100); + conf.setBoolean("hbase.master.enabletable.roundrobin", true); + // Avoid potentially aggressive splitting which would cause snapshot to fail + conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + ConstantSizeRegionSplitPolicy.class.getName()); + conf.setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0); + } + + @Before + public void setup() throws Exception { + MobSnapshotTestingUtils.createMobTable(UTIL, TableName.valueOf(STRING_TABLE_NAME), TEST_FAM); + } + + @After + public void tearDown() throws Exception { + UTIL.deleteTable(TABLE_NAME); + SnapshotTestingUtils.deleteAllSnapshots(UTIL.getHBaseAdmin()); + SnapshotTestingUtils.deleteArchiveDirectory(UTIL); + } + + @AfterClass + public static void cleanupTest() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + /** + * Verify that adding data to the cloned table will not affect the original, and vice-versa when + * it is taken as an online snapshot. + */ + @Test (timeout=300000) + public void testOnlineSnapshotAppendIndependent() throws Exception { + runTestSnapshotAppendIndependent(true); + } + + /** + * Verify that adding data to the cloned table will not affect the original, and vice-versa when + * it is taken as an offline snapshot. + */ + @Test (timeout=300000) + public void testOfflineSnapshotAppendIndependent() throws Exception { + runTestSnapshotAppendIndependent(false); + } + + /** + * Verify that adding metadata to the cloned table will not affect the original, and vice-versa + * when it is taken as an online snapshot. + */ + @Test (timeout=300000) + public void testOnlineSnapshotMetadataChangesIndependent() throws Exception { + runTestSnapshotMetadataChangesIndependent(true); + } + + /** + * Verify that adding netadata to the cloned table will not affect the original, and vice-versa + * when is taken as an online snapshot. + */ + @Test (timeout=300000) + public void testOfflineSnapshotMetadataChangesIndependent() throws Exception { + runTestSnapshotMetadataChangesIndependent(false); + } + + /** + * Verify that region operations, in this case splitting a region, are independent between the + * cloned table and the original. + */ + @Test (timeout=300000) + public void testOfflineSnapshotRegionOperationsIndependent() throws Exception { + runTestRegionOperationsIndependent(false); + } + + /** + * Verify that region operations, in this case splitting a region, are independent between the + * cloned table and the original. + */ + @Test (timeout=300000) + public void testOnlineSnapshotRegionOperationsIndependent() throws Exception { + runTestRegionOperationsIndependent(true); + } + + private static void waitOnSplit(final HTable t, int originalCount) throws Exception { + for (int i = 0; i < 200; i++) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + } + if (t.getRegionLocations().size() > originalCount) { + return; + } + } + throw new Exception("Split did not increase the number of regions"); + } + + /* + * Take a snapshot of a table, add data, and verify that this only + * affects one table + * @param online - Whether the table is online or not during the snapshot + */ + private void runTestSnapshotAppendIndependent(boolean online) throws Exception { + FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem(); + Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); + + Admin admin = UTIL.getHBaseAdmin(); + final long startTime = System.currentTimeMillis(); + final TableName localTableName = + TableName.valueOf(STRING_TABLE_NAME + startTime); + - HTable original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM); ++ Table original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM); + try { + + SnapshotTestingUtils.loadData(UTIL, localTableName, 500, TEST_FAM); + final int origTableRowCount = MobSnapshotTestingUtils.countMobRows(original); + + // Take a snapshot + final String snapshotNameAsString = "snapshot_" + localTableName; + byte[] snapshotName = Bytes.toBytes(snapshotNameAsString); + + SnapshotTestingUtils.createSnapshotAndValidate(admin, localTableName, TEST_FAM_STR, + snapshotNameAsString, rootDir, fs, online); + + if (!online) { + admin.enableTable(localTableName); + } + TableName cloneTableName = TableName.valueOf("test-clone-" + localTableName); + admin.cloneSnapshot(snapshotName, cloneTableName); + - HTable clonedTable = new HTable(UTIL.getConfiguration(), cloneTableName); ++ Table clonedTable = ConnectionFactory.createConnection(UTIL.getConfiguration()) ++ .getTable(cloneTableName); + + try { + final int clonedTableRowCount = MobSnapshotTestingUtils.countMobRows(clonedTable); + + Assert.assertEquals( + "The line counts of original and cloned tables do not match after clone. ", + origTableRowCount, clonedTableRowCount); + + // Attempt to add data to the test + final String rowKey = "new-row-" + System.currentTimeMillis(); + + Put p = new Put(Bytes.toBytes(rowKey)); + p.add(TEST_FAM, Bytes.toBytes("someQualifier"), Bytes.toBytes("someString")); + original.put(p); - original.flushCommits(); + + // Verify that it is not present in the original table + Assert.assertEquals("The row count of the original table was not modified by the put", + origTableRowCount + 1, MobSnapshotTestingUtils.countMobRows(original)); + Assert.assertEquals( + "The row count of the cloned table changed as a result of addition to the original", + clonedTableRowCount, MobSnapshotTestingUtils.countMobRows(clonedTable)); + + p = new Put(Bytes.toBytes(rowKey)); - p.add(TEST_FAM, Bytes.toBytes("someQualifier"), Bytes.toBytes("someString")); ++ p.addColumn(TEST_FAM, Bytes.toBytes("someQualifier"), Bytes.toBytes("someString")); + clonedTable.put(p); - clonedTable.flushCommits(); + + // Verify that the new family is not in the restored table's description + Assert.assertEquals( + "The row count of the original table was modified by the put to the clone", + origTableRowCount + 1, MobSnapshotTestingUtils.countMobRows(original)); + Assert.assertEquals("The row count of the cloned table was not modified by the put", + clonedTableRowCount + 1, MobSnapshotTestingUtils.countMobRows(clonedTable)); + } finally { + + clonedTable.close(); + } + } finally { + + original.close(); + } + } + + /* + * Take a snapshot of a table, do a split, and verify that this only affects one table + * @param online - Whether the table is online or not during the snapshot + */ + private void runTestRegionOperationsIndependent(boolean online) throws Exception { + FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem(); + Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); + + // Create a table + Admin admin = UTIL.getHBaseAdmin(); + final long startTime = System.currentTimeMillis(); + final TableName localTableName = + TableName.valueOf(STRING_TABLE_NAME + startTime); - HTable original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM); ++ Table original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM); + SnapshotTestingUtils.loadData(UTIL, localTableName, 500, TEST_FAM); + final int loadedTableCount = MobSnapshotTestingUtils.countMobRows(original); + System.out.println("Original table has: " + loadedTableCount + " rows"); + + final String snapshotNameAsString = "snapshot_" + localTableName; + + // Create a snapshot + SnapshotTestingUtils.createSnapshotAndValidate(admin, localTableName, TEST_FAM_STR, + snapshotNameAsString, rootDir, fs, online); + + if (!online) { + admin.enableTable(localTableName); + } + + TableName cloneTableName = TableName.valueOf("test-clone-" + localTableName); + + // Clone the snapshot + byte[] snapshotName = Bytes.toBytes(snapshotNameAsString); + admin.cloneSnapshot(snapshotName, cloneTableName); + + // Verify that region information is the same pre-split - original.clearRegionCache(); ++ ((HTable)original).clearRegionCache(); + List<HRegionInfo> originalTableHRegions = admin.getTableRegions(localTableName); + + final int originalRegionCount = originalTableHRegions.size(); + final int cloneTableRegionCount = admin.getTableRegions(cloneTableName).size(); + Assert.assertEquals( + "The number of regions in the cloned table is different than in the original table.", + originalRegionCount, cloneTableRegionCount); + + // Split a region on the parent table + admin.splitRegion(originalTableHRegions.get(0).getRegionName()); - waitOnSplit(original, originalRegionCount); ++ waitOnSplit((HTable)original, originalRegionCount); + + // Verify that the cloned table region is not split + final int cloneTableRegionCount2 = admin.getTableRegions(cloneTableName).size(); + Assert.assertEquals( + "The number of regions in the cloned table changed though none of its regions were split.", + cloneTableRegionCount, cloneTableRegionCount2); + } + + /* + * Take a snapshot of a table, add metadata, and verify that this only + * affects one table + * @param online - Whether the table is online or not during the snapshot + */ + private void runTestSnapshotMetadataChangesIndependent(boolean online) throws Exception { + FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem(); + Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); + + // Create a table + Admin admin = UTIL.getHBaseAdmin(); + final long startTime = System.currentTimeMillis(); + final TableName localTableName = + TableName.valueOf(STRING_TABLE_NAME + startTime); - HTable original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM); ++ Table original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM); + SnapshotTestingUtils.loadData(UTIL, localTableName, 500, TEST_FAM); + + final String snapshotNameAsString = "snapshot_" + localTableName; + + // Create a snapshot + SnapshotTestingUtils.createSnapshotAndValidate(admin, localTableName, TEST_FAM_STR, + snapshotNameAsString, rootDir, fs, online); + + if (!online) { + admin.enableTable(localTableName); + } + TableName cloneTableName = TableName.valueOf("test-clone-" + localTableName); + + // Clone the snapshot + byte[] snapshotName = Bytes.toBytes(snapshotNameAsString); + admin.cloneSnapshot(snapshotName, cloneTableName); + + // Add a new column family to the original table + byte[] TEST_FAM_2 = Bytes.toBytes("fam2"); + HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM_2); + + admin.disableTable(localTableName); + admin.addColumn(localTableName, hcd); + + // Verify that it is not in the snapshot + admin.enableTable(localTableName); + + // get a description of the cloned table + // get a list of its families + // assert that the family is there + HTableDescriptor originalTableDescriptor = original.getTableDescriptor(); + HTableDescriptor clonedTableDescriptor = admin.getTableDescriptor(cloneTableName); + + Assert.assertTrue("The original family was not found. There is something wrong. ", + originalTableDescriptor.hasFamily(TEST_FAM)); + Assert.assertTrue("The original family was not found in the clone. There is something wrong. ", + clonedTableDescriptor.hasFamily(TEST_FAM)); + + Assert.assertTrue("The new family was not found. ", + originalTableDescriptor.hasFamily(TEST_FAM_2)); + Assert.assertTrue("The new family was not found. ", + !clonedTableDescriptor.hasFamily(TEST_FAM_2)); + } +}
