http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java index 0000000,bdec887..0dbacfb mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java @@@ -1,0 -1,41 +1,41 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob.mapreduce; + -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.hbase.KeyValue; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.mob.MobFileName; + import org.apache.hadoop.io.Text; + import org.apache.hadoop.mapreduce.Partitioner; + + /** + * The partitioner for the sweep job. + * The key is a mob file name. We bucket by date. + */ + @InterfaceAudience.Private + public class MobFilePathHashPartitioner extends Partitioner<Text, KeyValue> { + + @Override + public int getPartition(Text fileName, KeyValue kv, int numPartitions) { + MobFileName mobFileName = MobFileName.create(fileName.toString()); + String date = mobFileName.getDate(); + int hash = date.hashCode(); + return (hash & Integer.MAX_VALUE) % numPartitions; + } + }
http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java index 0000000,5da220f..3a06ad8 mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java @@@ -1,0 -1,603 +1,603 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob.mapreduce; + + import java.io.File; + import java.io.IOException; + import java.net.InetSocketAddress; + import java.util.ArrayList; + import java.util.List; + import java.util.PriorityQueue; + import java.util.Set; + import java.util.TreeSet; + import java.util.UUID; + + import org.apache.commons.lang.StringUtils; + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.CommonConfigurationKeys; + import org.apache.hadoop.fs.FSDataOutputStream; + import org.apache.hadoop.fs.FileStatus; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hbase.Abortable; + import org.apache.hadoop.hbase.HColumnDescriptor; + import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; ++import org.apache.hadoop.hbase.NoTagsKeyValue; + import org.apache.hadoop.hbase.ServerName; + import org.apache.hadoop.hbase.TableName; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.client.Scan; + import org.apache.hadoop.hbase.io.HFileLink; + import org.apache.hadoop.hbase.io.hfile.CacheConfig; + import org.apache.hadoop.hbase.mapreduce.TableInputFormat; + import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; + import org.apache.hadoop.hbase.master.TableLockManager; + import org.apache.hadoop.hbase.master.TableLockManager.TableLock; + import org.apache.hadoop.hbase.mob.MobConstants; + import org.apache.hadoop.hbase.mob.MobUtils; + import org.apache.hadoop.hbase.regionserver.BloomType; + import org.apache.hadoop.hbase.regionserver.StoreFile; + import org.apache.hadoop.hbase.util.Bytes; + import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + import org.apache.hadoop.hbase.util.FSUtils; + import org.apache.hadoop.hbase.util.Strings; + import org.apache.hadoop.hbase.zookeeper.ZKUtil; + import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + import org.apache.hadoop.io.IOUtils; + import org.apache.hadoop.io.SequenceFile; + import org.apache.hadoop.io.SequenceFile.CompressionType; + import org.apache.hadoop.io.Text; + import org.apache.hadoop.io.Writable; + import org.apache.hadoop.io.serializer.JavaSerialization; + import org.apache.hadoop.io.serializer.WritableSerialization; + import org.apache.hadoop.mapreduce.Job; + import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; + import org.apache.hadoop.net.DNS; + import org.apache.hadoop.security.Credentials; + import org.apache.hadoop.security.UserGroupInformation; + import org.apache.zookeeper.KeeperException; + + /** + * The sweep job. + * Run map reduce to merge the smaller mob files into bigger ones and cleans the unused ones. + */ + @InterfaceAudience.Private + public class SweepJob { + + private final FileSystem fs; + private final Configuration conf; + private static final Log LOG = LogFactory.getLog(SweepJob.class); + static final String SWEEP_JOB_ID = "hbase.mob.sweep.job.id"; + static final String SWEEP_JOB_SERVERNAME = "hbase.mob.sweep.job.servername"; + static final String SWEEP_JOB_TABLE_NODE = "hbase.mob.sweep.job.table.node"; + static final String WORKING_DIR_KEY = "hbase.mob.sweep.job.dir"; + static final String WORKING_ALLNAMES_FILE_KEY = "hbase.mob.sweep.job.all.file"; + static final String WORKING_VISITED_DIR_KEY = "hbase.mob.sweep.job.visited.dir"; + static final String WORKING_ALLNAMES_DIR = "all"; + static final String WORKING_VISITED_DIR = "visited"; + public static final String WORKING_FILES_DIR_KEY = "mob.sweep.job.files.dir"; + //the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing. + public static final String MOB_SWEEP_JOB_DELAY = "hbase.mob.sweep.job.delay"; + protected static long ONE_DAY = 24 * 60 * 60 * 1000; + private long compactionStartTime = EnvironmentEdgeManager.currentTime(); + public final static String CREDENTIALS_LOCATION = "credentials_location"; + private CacheConfig cacheConfig; + static final int SCAN_CACHING = 10000; + private TableLockManager tableLockManager; + + public SweepJob(Configuration conf, FileSystem fs) { + this.conf = conf; + this.fs = fs; + // disable the block cache. + Configuration copyOfConf = new Configuration(conf); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + cacheConfig = new CacheConfig(copyOfConf); + } + + static ServerName getCurrentServerName(Configuration conf) throws IOException { + String hostname = conf.get( + "hbase.regionserver.ipc.address", + Strings.domainNamePointerToHostName(DNS.getDefaultHost( + conf.get("hbase.regionserver.dns.interface", "default"), + conf.get("hbase.regionserver.dns.nameserver", "default")))); + int port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); + // Creation of a HSA will force a resolve. + InetSocketAddress initialIsa = new InetSocketAddress(hostname, port); + if (initialIsa.getAddress() == null) { + throw new IllegalArgumentException("Failed resolve of " + initialIsa); + } + return ServerName.valueOf(initialIsa.getHostName(), initialIsa.getPort(), + EnvironmentEdgeManager.currentTime()); + } + + /** + * Runs MapReduce to do the sweeping on the mob files. + * There's a MobReferenceOnlyFilter so that the mappers only get the cells that have mob + * references from 'normal' regions' rows. + * The running of the sweep tool on the same column family are mutually exclusive. + * The HBase major compaction and running of the sweep tool on the same column family + * are mutually exclusive. + * The synchronization is done by the Zookeeper. + * So in the beginning of the running, we need to make sure only this sweep tool is the only one + * that is currently running in this column family, and in this column family there're no major + * compaction in progress. + * @param tn The current table name. + * @param family The descriptor of the current column family. + * @return 0 upon success, 3 if bailing out because another compaction is currently happening, + * or 4 the mr job was unsuccessful + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + * @throws KeeperException + */ + public int sweep(TableName tn, HColumnDescriptor family) throws IOException, + ClassNotFoundException, InterruptedException, KeeperException { + Configuration conf = new Configuration(this.conf); + // check whether the current user is the same one with the owner of hbase root + String currentUserName = UserGroupInformation.getCurrentUser().getShortUserName(); + FileStatus[] hbaseRootFileStat = fs.listStatus(new Path(conf.get(HConstants.HBASE_DIR))); + if (hbaseRootFileStat.length > 0) { + String owner = hbaseRootFileStat[0].getOwner(); + if (!owner.equals(currentUserName)) { + String errorMsg = "The current user[" + currentUserName + + "] doesn't have hbase root credentials." + + " Please make sure the user is the root of the target HBase"; + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + } else { + LOG.error("The target HBase doesn't exist"); + throw new IOException("The target HBase doesn't exist"); + } + String familyName = family.getNameAsString(); + String id = "SweepJob" + UUID.randomUUID().toString().replace("-", ""); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, id, new DummyMobAbortable()); + try { + ServerName serverName = getCurrentServerName(conf); + tableLockManager = TableLockManager.createTableLockManager(conf, zkw, serverName); + TableName lockName = MobUtils.getTableLockName(tn); + TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool"); + String tableName = tn.getNameAsString(); + // Try to obtain the lock. Use this lock to synchronize all the query + try { + lock.acquire(); + } catch (Exception e) { + LOG.warn("Can not lock the table " + tableName + + ". The major compaction in HBase may be in-progress or another sweep job is running." + + " Please re-run the job."); + return 3; + } + Job job = null; + try { + Scan scan = new Scan(); + scan.addFamily(family.getName()); + // Do not retrieve the mob data when scanning + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE)); + scan.setCaching(SCAN_CACHING); + scan.setCacheBlocks(false); + scan.setMaxVersions(family.getMaxVersions()); + conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, + JavaSerialization.class.getName() + "," + WritableSerialization.class.getName()); + conf.set(SWEEP_JOB_ID, id); + conf.set(SWEEP_JOB_SERVERNAME, serverName.toString()); + String tableLockNode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString()); + conf.set(SWEEP_JOB_TABLE_NODE, tableLockNode); + job = prepareJob(tn, familyName, scan, conf); + job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, familyName); + // Record the compaction start time. + // In the sweep tool, only the mob file whose modification time is older than + // (startTime - delay) could be handled by this tool. + // The delay is one day. It could be configured as well, but this is only used + // in the test. + job.getConfiguration().setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, + compactionStartTime); + + job.setPartitionerClass(MobFilePathHashPartitioner.class); + submit(job, tn, familyName); + if (job.waitForCompletion(true)) { + // Archive the unused mob files. + removeUnusedFiles(job, tn, family); + } else { + System.err.println("Job was not successful"); + return 4; + } + } finally { + try { + cleanup(job, tn, familyName); + } finally { + try { + lock.release(); + } catch (IOException e) { + LOG.error("Failed to release the table lock " + tableName, e); + } + } + } + } finally { + zkw.close(); + } + return 0; + } + + /** + * Prepares a map reduce job. + * @param tn The current table name. + * @param familyName The current family name. + * @param scan The current scan. + * @param conf The current configuration. + * @return A map reduce job. + * @throws IOException + */ + private Job prepareJob(TableName tn, String familyName, Scan scan, Configuration conf) + throws IOException { + Job job = Job.getInstance(conf); + job.setJarByClass(SweepMapper.class); + TableMapReduceUtil.initTableMapperJob(tn.getNameAsString(), scan, + SweepMapper.class, Text.class, Writable.class, job); + + job.setInputFormatClass(TableInputFormat.class); + job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(KeyValue.class); ++ job.setMapOutputValueClass(NoTagsKeyValue.class); + job.setReducerClass(SweepReducer.class); + job.setOutputFormatClass(NullOutputFormat.class); + String jobName = getCustomJobName(this.getClass().getSimpleName(), tn, familyName); + job.setJobName(jobName); + if (StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) { + String fileLoc = conf.get(CREDENTIALS_LOCATION); + Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf); + job.getCredentials().addAll(cred); + } + return job; + } + + /** + * Gets a customized job name. + * It's className-mapperClassName-reducerClassName-tableName-familyName. + * @param className The current class name. + * @param tableName The current table name. + * @param familyName The current family name. + * @return The customized job name. + */ + private static String getCustomJobName(String className, TableName tableName, String familyName) { + StringBuilder name = new StringBuilder(); + name.append(className); + name.append('-').append(SweepMapper.class.getSimpleName()); + name.append('-').append(SweepReducer.class.getSimpleName()); + name.append('-').append(tableName.getNamespaceAsString()); + name.append('-').append(tableName.getQualifierAsString()); + name.append('-').append(familyName); + return name.toString(); + } + + /** + * Submits a job. + * @param job The current job. + * @param tn The current table name. + * @param familyName The current family name. + * @throws IOException + */ + private void submit(Job job, TableName tn, String familyName) throws IOException { + // delete the temp directory of the mob files in case the failure in the previous + // execution. + Path tempDir = + new Path(MobUtils.getMobHome(job.getConfiguration()), MobConstants.TEMP_DIR_NAME); + Path mobCompactionTempDir = + new Path(tempDir, MobConstants.MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME); + Path workingPath = MobUtils.getCompactionWorkingPath(mobCompactionTempDir, job.getJobName()); + job.getConfiguration().set(WORKING_DIR_KEY, workingPath.toString()); + // delete the working directory in case it'not deleted by the last running. + fs.delete(workingPath, true); + // create the working directory. + fs.mkdirs(workingPath); + // create a sequence file which contains the names of all the existing files. + Path workingPathOfFiles = new Path(workingPath, "files"); + Path workingPathOfNames = new Path(workingPath, "names"); + job.getConfiguration().set(WORKING_FILES_DIR_KEY, workingPathOfFiles.toString()); + Path allFileNamesPath = new Path(workingPathOfNames, WORKING_ALLNAMES_DIR); + job.getConfiguration().set(WORKING_ALLNAMES_FILE_KEY, allFileNamesPath.toString()); + Path vistiedFileNamesPath = new Path(workingPathOfNames, WORKING_VISITED_DIR); + job.getConfiguration().set(WORKING_VISITED_DIR_KEY, vistiedFileNamesPath.toString()); + // create a directory where the files contain names of visited mob files are saved. + fs.mkdirs(vistiedFileNamesPath); + Path mobStorePath = MobUtils.getMobFamilyPath(job.getConfiguration(), tn, familyName); + // Find all the files whose creation time are older than one day. + // Write those file names to a file. + // In each reducer there's a writer, it write the visited file names to a file which is saved + // in WORKING_VISITED_DIR. + // After the job is finished, compare those files, then find out the unused mob files and + // archive them. + FileStatus[] files = fs.listStatus(mobStorePath); + Set<String> fileNames = new TreeSet<String>(); + long mobCompactionDelay = job.getConfiguration().getLong(MOB_SWEEP_JOB_DELAY, ONE_DAY); + for (FileStatus fileStatus : files) { + if (fileStatus.isFile() && !HFileLink.isHFileLink(fileStatus.getPath())) { + if (compactionStartTime - fileStatus.getModificationTime() > mobCompactionDelay) { + // only record the potentially unused files older than one day. + fileNames.add(fileStatus.getPath().getName()); + } + } + } + FSDataOutputStream fout = null; + SequenceFile.Writer writer = null; + try { + // create a file includes all the existing mob files whose creation time is older than + // (now - oneDay) + fout = fs.create(allFileNamesPath, true); + // write the names to a sequence file + writer = SequenceFile.createWriter(job.getConfiguration(), fout, String.class, String.class, + CompressionType.NONE, null); + for (String fileName : fileNames) { + writer.append(fileName, MobConstants.EMPTY_STRING); + } + writer.hflush(); + } finally { + if (writer != null) { + IOUtils.closeStream(writer); + } + if (fout != null) { + IOUtils.closeStream(fout); + } + } + } + + /** + * Gets the unused mob files. + * Compare the file which contains all the existing mob files and the visited files, + * find out the unused mob file and archive them. + * @param conf The current configuration. + * @return The unused mob files. + * @throws IOException + */ + List<String> getUnusedFiles(Configuration conf) throws IOException { + // find out the unused files and archive them + Path allFileNamesPath = new Path(conf.get(WORKING_ALLNAMES_FILE_KEY)); + SequenceFile.Reader allNamesReader = null; + MergeSortReader visitedNamesReader = null; + List<String> toBeArchived = new ArrayList<String>(); + try { + allNamesReader = new SequenceFile.Reader(fs, allFileNamesPath, conf); + visitedNamesReader = new MergeSortReader(fs, conf, + new Path(conf.get(WORKING_VISITED_DIR_KEY))); + String nextAll = (String) allNamesReader.next((String) null); + String nextVisited = visitedNamesReader.next(); + do { + if (nextAll != null) { + if (nextVisited != null) { + int compare = nextAll.compareTo(nextVisited); + if (compare < 0) { + toBeArchived.add(nextAll); + nextAll = (String) allNamesReader.next((String) null); + } else if (compare > 0) { + nextVisited = visitedNamesReader.next(); + } else { + nextAll = (String) allNamesReader.next((String) null); + nextVisited = visitedNamesReader.next(); + } + } else { + toBeArchived.add(nextAll); + nextAll = (String) allNamesReader.next((String) null); + } + } else { + break; + } + } while (nextAll != null || nextVisited != null); + } finally { + if (allNamesReader != null) { + IOUtils.closeStream(allNamesReader); + } + if (visitedNamesReader != null) { + visitedNamesReader.close(); + } + } + return toBeArchived; + } + + /** + * Archives unused mob files. + * @param job The current job. + * @param tn The current table name. + * @param hcd The descriptor of the current column family. + * @throws IOException + */ + private void removeUnusedFiles(Job job, TableName tn, HColumnDescriptor hcd) throws IOException { + // find out the unused files and archive them + List<StoreFile> storeFiles = new ArrayList<StoreFile>(); + List<String> toBeArchived = getUnusedFiles(job.getConfiguration()); + // archive them + Path mobStorePath = MobUtils + .getMobFamilyPath(job.getConfiguration(), tn, hcd.getNameAsString()); + for (String archiveFileName : toBeArchived) { + Path path = new Path(mobStorePath, archiveFileName); + storeFiles.add(new StoreFile(fs, path, job.getConfiguration(), cacheConfig, BloomType.NONE)); + } + if (!storeFiles.isEmpty()) { + try { + MobUtils.removeMobFiles(job.getConfiguration(), fs, tn, + FSUtils.getTableDir(MobUtils.getMobHome(conf), tn), hcd.getName(), storeFiles); + LOG.info(storeFiles.size() + " unused MOB files are removed"); + } catch (Exception e) { + LOG.error("Failed to archive the store files " + storeFiles, e); + } + } + } + + /** + * Deletes the working directory. + * @param job The current job. + * @param familyName The family to cleanup + */ + private void cleanup(Job job, TableName tn, String familyName) { + if (job != null) { + // delete the working directory + Path workingPath = new Path(job.getConfiguration().get(WORKING_DIR_KEY)); + try { + fs.delete(workingPath, true); + } catch (IOException e) { + LOG.warn("Failed to delete the working directory after sweeping store " + familyName + + " in the table " + tn.getNameAsString(), e); + } + } + } + + /** + * A result with index. + */ + private class IndexedResult implements Comparable<IndexedResult> { + private int index; + private String value; + + public IndexedResult(int index, String value) { + this.index = index; + this.value = value; + } + + public int getIndex() { + return this.index; + } + + public String getValue() { + return this.value; + } + + @Override + public int compareTo(IndexedResult o) { + if (this.value == null && o.getValue() == null) { + return 0; + } else if (o.value == null) { + return 1; + } else if (this.value == null) { + return -1; + } else { + return this.value.compareTo(o.value); + } + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof IndexedResult)) { + return false; + } + return compareTo((IndexedResult) obj) == 0; + } + + @Override + public int hashCode() { + return value.hashCode(); + } + } + + /** + * Merge sort reader. + * It merges and sort the readers in different sequence files as one where + * the results are read in order. + */ + private class MergeSortReader { + + private List<SequenceFile.Reader> readers = new ArrayList<SequenceFile.Reader>(); + private PriorityQueue<IndexedResult> results = new PriorityQueue<IndexedResult>(); + + public MergeSortReader(FileSystem fs, Configuration conf, Path path) throws IOException { + if (fs.exists(path)) { + FileStatus[] files = fs.listStatus(path); + int index = 0; + for (FileStatus file : files) { + if (file.isFile()) { + SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf); + String key = (String) reader.next((String) null); + if (key != null) { + results.add(new IndexedResult(index, key)); + readers.add(reader); + index++; + } + } + } + } + } + + public String next() throws IOException { + IndexedResult result = results.poll(); + if (result != null) { + SequenceFile.Reader reader = readers.get(result.getIndex()); + String key = (String) reader.next((String) null); + if (key != null) { + results.add(new IndexedResult(result.getIndex(), key)); + } + return result.getValue(); + } + return null; + } + + public void close() { + for (SequenceFile.Reader reader : readers) { + if (reader != null) { + IOUtils.closeStream(reader); + } + } + } + } + + /** + * The counter used in sweep job. + */ + public enum SweepCounter { + + /** + * How many files are read. + */ + INPUT_FILE_COUNT, + + /** + * How many files need to be merged or cleaned. + */ + FILE_TO_BE_MERGE_OR_CLEAN, + + /** + * How many files are left after merging. + */ + FILE_AFTER_MERGE_OR_CLEAN, + + /** + * How many records are updated. + */ + RECORDS_UPDATED, + } + + public static class DummyMobAbortable implements Abortable { + + private boolean abort = false; + + public void abort(String why, Throwable e) { + abort = true; + } + + public boolean isAborted() { + return abort; + } + + } -} ++} http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java index 0000000,7844359..dea67a4 mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java @@@ -1,0 -1,100 +1,100 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob.mapreduce; + + import java.util.List; + import java.util.SortedSet; + import java.util.TreeSet; + -import org.apache.hadoop.classification.InterfaceAudience; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.master.TableLockManager; + import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName; + import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock; + import org.apache.hadoop.hbase.zookeeper.ZKUtil; + import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; + import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + import org.apache.zookeeper.KeeperException; + + /** + * Tracker on the sweep tool node in zookeeper. + * The sweep tool node is an ephemeral one, when the process dies this node is deleted, + * at that time MR might be still running, and if another sweep job is started, two MR + * for the same column family will run at the same time. + * This tracker watches this ephemeral node, if it's gone or it's not created by the + * sweep job that owns the current MR, the current process will be aborted. + */ + @InterfaceAudience.Private + public class SweepJobNodeTracker extends ZooKeeperListener { + + private String parentNode; + private String lockNodePrefix; + private String owner; + private String lockNode; + + public SweepJobNodeTracker(ZooKeeperWatcher watcher, String parentNode, String owner) { + super(watcher); + this.parentNode = parentNode; + this.owner = owner; + this.lockNodePrefix = ZKUtil.joinZNode(parentNode, "write-"); + } + + /** + * Registers the watcher on the sweep job node. + * If there's no such a sweep job node, or it's not created by the sweep job that + * owns the current MR, the current process will be aborted. + * This assumes the table lock uses the Zookeeper. It's a workaround and only used + * in the sweep tool, and the sweep tool will be removed after the mob file compaction + * is finished. + */ + public void start() throws KeeperException { + watcher.registerListener(this); + List<String> children = ZKUtil.listChildrenNoWatch(watcher, parentNode); + if (children != null && !children.isEmpty()) { + // there are locks + TreeSet<String> sortedChildren = new TreeSet<String>(); + sortedChildren.addAll(children); + // find all the write locks + SortedSet<String> tails = sortedChildren.tailSet(lockNodePrefix); + if (!tails.isEmpty()) { + for (String tail : tails) { + String path = ZKUtil.joinZNode(parentNode, tail); + byte[] data = ZKUtil.getDataAndWatch(watcher, path); + TableLock lock = TableLockManager.fromBytes(data); + ServerName serverName = lock.getLockOwner(); + org.apache.hadoop.hbase.ServerName sn = org.apache.hadoop.hbase.ServerName.valueOf( + serverName.getHostName(), serverName.getPort(), serverName.getStartCode()); + // compare the server names (host, port and start code), make sure the lock is created + if (owner.equals(sn.toString())) { + lockNode = path; + return; + } + } + } + } + System.exit(1); + } + + @Override + public void nodeDeleted(String path) { + // If the lock node is deleted, abort the current process. + if (path.equals(lockNode)) { + System.exit(1); + } + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java index 0000000,559d6db..7ac628c mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java @@@ -1,0 -1,87 +1,87 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob.mapreduce; + + import java.io.IOException; + -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.hbase.Cell; + import org.apache.hadoop.hbase.KeyValue; + import org.apache.hadoop.hbase.KeyValueUtil; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.client.Result; + import org.apache.hadoop.hbase.io.ImmutableBytesWritable; + import org.apache.hadoop.hbase.mapreduce.TableMapper; + import org.apache.hadoop.hbase.mob.MobUtils; + import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable; + import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + import org.apache.hadoop.io.Text; + import org.apache.zookeeper.KeeperException; + + /** + * The mapper of a sweep job. + * Takes the rows from the table and their results and map to <filename:Text, mobValue:KeyValue> + * where mobValue is the actual cell in HBase. + */ + @InterfaceAudience.Private + public class SweepMapper extends TableMapper<Text, KeyValue> { + + private ZooKeeperWatcher zkw = null; + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + String id = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID); + String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME); + String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE); + zkw = new ZooKeeperWatcher(context.getConfiguration(), id, + new DummyMobAbortable()); + try { + SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner); + tracker.start(); + } catch (KeeperException e) { + throw new IOException(e); + } + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + if (zkw != null) { + zkw.close(); + } + } + + @Override + public void map(ImmutableBytesWritable r, Result columns, Context context) throws IOException, + InterruptedException { + if (columns == null) { + return; + } + Cell[] cells = columns.rawCells(); + if (cells == null || cells.length == 0) { + return; + } + for (Cell c : cells) { + if (MobUtils.hasValidMobRefCellValue(c)) { + String fileName = MobUtils.getMobFileName(c); + context.write(new Text(fileName), KeyValueUtil.ensureKeyValue(c)); + } + } + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java index 0000000,a2dfa29..03a05cc mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java @@@ -1,0 -1,472 +1,472 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob.mapreduce; + + import java.io.IOException; + import java.util.ArrayList; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.UUID; + + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FSDataOutputStream; + import org.apache.hadoop.fs.FileStatus; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.fs.PathFilter; + import org.apache.hadoop.hbase.Cell; + import org.apache.hadoop.hbase.HColumnDescriptor; + import org.apache.hadoop.hbase.HConstants; + import org.apache.hadoop.hbase.InvalidFamilyOperationException; + import org.apache.hadoop.hbase.KeyValue; + import org.apache.hadoop.hbase.KeyValueUtil; + import org.apache.hadoop.hbase.TableName; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.client.Admin; + import org.apache.hadoop.hbase.client.BufferedMutator; + import org.apache.hadoop.hbase.client.BufferedMutatorParams; + import org.apache.hadoop.hbase.client.Connection; + import org.apache.hadoop.hbase.client.ConnectionFactory; + import org.apache.hadoop.hbase.io.HFileLink; + import org.apache.hadoop.hbase.io.hfile.CacheConfig; + import org.apache.hadoop.hbase.mapreduce.TableInputFormat; + import org.apache.hadoop.hbase.mob.MobConstants; + import org.apache.hadoop.hbase.mob.MobFile; + import org.apache.hadoop.hbase.mob.MobFileName; + import org.apache.hadoop.hbase.mob.MobUtils; + import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId; + import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable; + import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter; + import org.apache.hadoop.hbase.regionserver.BloomType; + import org.apache.hadoop.hbase.regionserver.DefaultMemStore; + import org.apache.hadoop.hbase.regionserver.StoreFile; + import org.apache.hadoop.hbase.regionserver.StoreFileScanner; + import org.apache.hadoop.hbase.util.Bytes; + import org.apache.hadoop.hbase.util.FSUtils; + import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + import org.apache.hadoop.io.IOUtils; + import org.apache.hadoop.io.SequenceFile; + import org.apache.hadoop.io.SequenceFile.CompressionType; + import org.apache.hadoop.io.Text; + import org.apache.hadoop.io.Writable; + import org.apache.hadoop.mapreduce.Reducer; + import org.apache.zookeeper.KeeperException; + + /** + * The reducer of a sweep job. + * This reducer merges the small mob files into bigger ones, and write visited + * names of mob files to a sequence file which is used by the sweep job to delete + * the unused mob files. + * The key of the input is a file name, the value is a collection of KeyValues + * (the value format of KeyValue is valueLength + fileName) in HBase. + * In this reducer, we could know how many cells exist in HBase for a mob file. + * If the existCellSize/mobFileSize < compactionRatio, this mob + * file needs to be merged. + */ + @InterfaceAudience.Private + public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> { + + private static final Log LOG = LogFactory.getLog(SweepReducer.class); + + private SequenceFile.Writer writer = null; + private MemStoreWrapper memstore; + private Configuration conf; + private FileSystem fs; + + private Path familyDir; + private CacheConfig cacheConfig; + private long compactionBegin; + private BufferedMutator table; + private HColumnDescriptor family; + private long mobCompactionDelay; + private Path mobTableDir; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + this.conf = context.getConfiguration(); + Connection c = ConnectionFactory.createConnection(this.conf); + this.fs = FileSystem.get(conf); + // the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing. + mobCompactionDelay = conf.getLong(SweepJob.MOB_SWEEP_JOB_DELAY, SweepJob.ONE_DAY); + String tableName = conf.get(TableInputFormat.INPUT_TABLE); + String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY); + TableName tn = TableName.valueOf(tableName); + this.familyDir = MobUtils.getMobFamilyPath(conf, tn, familyName); + Admin admin = c.getAdmin(); + try { + family = admin.getTableDescriptor(tn).getFamily(Bytes.toBytes(familyName)); + if (family == null) { + // this column family might be removed, directly return. + throw new InvalidFamilyOperationException("Column family '" + familyName + + "' does not exist. It might be removed."); + } + } finally { + try { + admin.close(); + } catch (IOException e) { + LOG.warn("Failed to close the HBaseAdmin", e); + } + } + // disable the block cache. + Configuration copyOfConf = new Configuration(conf); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + this.cacheConfig = new CacheConfig(copyOfConf); + + table = c.getBufferedMutator(new BufferedMutatorParams(tn).writeBufferSize(1*1024*1024)); + memstore = new MemStoreWrapper(context, fs, table, family, new DefaultMemStore(), cacheConfig); + + // The start time of the sweep tool. + // Only the mob files whose creation time is older than startTime-oneDay will be handled by the + // reducer since it brings inconsistency to handle the latest mob files. + this.compactionBegin = conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, 0); + mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tn); + } + + private SweepPartition createPartition(CompactionPartitionId id, Context context) + throws IOException { + return new SweepPartition(id, context); + } + + @Override + public void run(Context context) throws IOException, InterruptedException { + String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID); + String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME); + String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId, + new DummyMobAbortable()); + FSDataOutputStream fout = null; + try { + SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner); + tracker.start(); + setup(context); + // create a sequence contains all the visited file names in this reducer. + String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY); + Path nameFilePath = new Path(dir, UUID.randomUUID().toString() + .replace("-", MobConstants.EMPTY_STRING)); + fout = fs.create(nameFilePath, true); + writer = SequenceFile.createWriter(context.getConfiguration(), fout, String.class, + String.class, CompressionType.NONE, null); + CompactionPartitionId id; + SweepPartition partition = null; + // the mob files which have the same start key and date are in the same partition. + while (context.nextKey()) { + Text key = context.getCurrentKey(); + String keyString = key.toString(); + id = createPartitionId(keyString); + if (null == partition || !id.equals(partition.getId())) { + // It's the first mob file in the current partition. + if (null != partition) { + // this mob file is in different partitions with the previous mob file. + // directly close. + partition.close(); + } + // create a new one + partition = createPartition(id, context); + } + if (partition != null) { + // run the partition + partition.execute(key, context.getValues()); + } + } + if (null != partition) { + partition.close(); + } + writer.hflush(); + } catch (KeeperException e) { + throw new IOException(e); + } finally { + cleanup(context); + zkw.close(); + if (writer != null) { + IOUtils.closeStream(writer); + } + if (fout != null) { + IOUtils.closeStream(fout); + } + if (table != null) { + try { + table.close(); + } catch (IOException e) { + LOG.warn(e); + } + } + } + + } + + /** + * The mob files which have the same start key and date are in the same partition. + * The files in the same partition are merged together into bigger ones. + */ + public class SweepPartition { + + private final CompactionPartitionId id; + private final Context context; + private boolean memstoreUpdated = false; + private boolean mergeSmall = false; + private final Map<String, MobFileStatus> fileStatusMap = new HashMap<String, MobFileStatus>(); + private final List<Path> toBeDeleted = new ArrayList<Path>(); + + public SweepPartition(CompactionPartitionId id, Context context) throws IOException { + this.id = id; + this.context = context; + memstore.setPartitionId(id); + init(); + } + + public CompactionPartitionId getId() { + return this.id; + } + + /** + * Prepares the map of files. + * + * @throws IOException + */ + private void init() throws IOException { + FileStatus[] fileStats = listStatus(familyDir, id.getStartKey()); + if (null == fileStats) { + return; + } + + int smallFileCount = 0; + float compactionRatio = conf.getFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, + MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO); + long compactionMergeableSize = conf.getLong( + MobConstants.MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE, + MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE); + // list the files. Just merge the hfiles, don't merge the hfile links. + // prepare the map of mob files. The key is the file name, the value is the file status. + for (FileStatus fileStat : fileStats) { + MobFileStatus mobFileStatus = null; + if (!HFileLink.isHFileLink(fileStat.getPath())) { + mobFileStatus = new MobFileStatus(fileStat, compactionRatio, compactionMergeableSize); + if (mobFileStatus.needMerge()) { + smallFileCount++; + } + // key is file name (not hfile name), value is hfile status. + fileStatusMap.put(fileStat.getPath().getName(), mobFileStatus); + } + } + if (smallFileCount >= 2) { + // merge the files only when there're more than 1 files in the same partition. + this.mergeSmall = true; + } + } + + /** + * Flushes the data into mob files and store files, and archives the small + * files after they're merged. + * @throws IOException + */ + public void close() throws IOException { + if (null == id) { + return; + } + // flush remain key values into mob files + if (memstoreUpdated) { + memstore.flushMemStore(); + } + List<StoreFile> storeFiles = new ArrayList<StoreFile>(toBeDeleted.size()); + // delete samll files after compaction + for (Path path : toBeDeleted) { + LOG.info("[In Partition close] Delete the file " + path + " in partition close"); + storeFiles.add(new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE)); + } + if (!storeFiles.isEmpty()) { + try { + MobUtils.removeMobFiles(conf, fs, table.getName(), mobTableDir, family.getName(), + storeFiles); + context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(storeFiles.size()); + } catch (IOException e) { + LOG.error("Failed to archive the store files " + storeFiles, e); + } + storeFiles.clear(); + } + fileStatusMap.clear(); + } + + /** + * Merges the small mob files into bigger ones. + * @param fileName The current mob file name. + * @param values The collection of KeyValues in this mob file. + * @throws IOException + */ + public void execute(Text fileName, Iterable<KeyValue> values) throws IOException { + if (null == values) { + return; + } + MobFileName mobFileName = MobFileName.create(fileName.toString()); + LOG.info("[In reducer] The file name: " + fileName.toString()); + MobFileStatus mobFileStat = fileStatusMap.get(mobFileName.getFileName()); + if (null == mobFileStat) { + LOG.info("[In reducer] Cannot find the file, probably this record is obsolete"); + return; + } + // only handle the files that are older then one day. + if (compactionBegin - mobFileStat.getFileStatus().getModificationTime() + <= mobCompactionDelay) { + return; + } + // write the hfile name + writer.append(mobFileName.getFileName(), MobConstants.EMPTY_STRING); + Set<KeyValue> kvs = new HashSet<KeyValue>(); + for (KeyValue kv : values) { + if (kv.getValueLength() > Bytes.SIZEOF_INT) { + mobFileStat.addValidSize(Bytes.toInt(kv.getValueArray(), kv.getValueOffset(), + Bytes.SIZEOF_INT)); + } + kvs.add(kv.createKeyOnly(false)); + } + // If the mob file is a invalid one or a small one, merge it into new/bigger ones. + if (mobFileStat.needClean() || (mergeSmall && mobFileStat.needMerge())) { + context.getCounter(SweepCounter.INPUT_FILE_COUNT).increment(1); + MobFile file = MobFile.create(fs, + new Path(familyDir, mobFileName.getFileName()), conf, cacheConfig); + StoreFileScanner scanner = null; + file.open(); + try { + scanner = file.getScanner(); + scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY)); + Cell cell; + while (null != (cell = scanner.next())) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + KeyValue keyOnly = kv.createKeyOnly(false); + if (kvs.contains(keyOnly)) { + // write the KeyValue existing in HBase to the memstore. + memstore.addToMemstore(kv); + memstoreUpdated = true; + } + } + } finally { + if (scanner != null) { + scanner.close(); + } + file.close(); + } + toBeDeleted.add(mobFileStat.getFileStatus().getPath()); + } + } + + /** + * Lists the files with the same prefix. + * @param p The file path. + * @param prefix The prefix. + * @return The files with the same prefix. + * @throws IOException + */ + private FileStatus[] listStatus(Path p, String prefix) throws IOException { + return fs.listStatus(p, new PathPrefixFilter(prefix)); + } + } + + static class PathPrefixFilter implements PathFilter { + + private final String prefix; + + public PathPrefixFilter(String prefix) { + this.prefix = prefix; + } + + public boolean accept(Path path) { + return path.getName().startsWith(prefix, 0); + } + + } + + /** + * Creates the partition id. + * @param fileNameAsString The current file name, in string. + * @return The partition id. + */ + private CompactionPartitionId createPartitionId(String fileNameAsString) { + MobFileName fileName = MobFileName.create(fileNameAsString); + return new CompactionPartitionId(fileName.getStartKey(), fileName.getDate()); + } + + /** + * The mob file status used in the sweep reduecer. + */ + private static class MobFileStatus { + private FileStatus fileStatus; + private int validSize; + private long size; + + private float compactionRatio = MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO; + private long compactionMergeableSize = + MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE; + + /** + * @param fileStatus The current FileStatus. + * @param compactionRatio compactionRatio the invalid ratio. + * If there're too many cells deleted in a mob file, it's regarded as invalid, + * and needs to be written to a new one. + * If existingCellSize/fileSize < compactionRatio, it's regarded as a invalid one. + * @param compactionMergeableSize compactionMergeableSize If the size of a mob file is less + * than this value, it's regarded as a small file and needs to be merged + */ + public MobFileStatus(FileStatus fileStatus, float compactionRatio, + long compactionMergeableSize) { + this.fileStatus = fileStatus; + this.size = fileStatus.getLen(); + validSize = 0; + this.compactionRatio = compactionRatio; + this.compactionMergeableSize = compactionMergeableSize; + } + + /** + * Add size to this file. + * @param size The size to be added. + */ + public void addValidSize(int size) { + this.validSize += size; + } + + /** + * Whether the mob files need to be cleaned. + * If there're too many cells deleted in this mob file, it needs to be cleaned. + * @return True if it needs to be cleaned. + */ + public boolean needClean() { + return validSize < compactionRatio * size; + } + + /** + * Whether the mob files need to be merged. + * If this mob file is too small, it needs to be merged. + * @return True if it needs to be merged. + */ + public boolean needMerge() { + return this.size < compactionMergeableSize; + } + + /** + * Gets the file status. + * @return The file status. + */ + public FileStatus getFileStatus() { + return fileStatus; + } + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java index 0000000,5436554..e20186e mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java @@@ -1,0 -1,119 +1,119 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob.mapreduce; + + import java.io.IOException; + -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.conf.Configured; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.hbase.HBaseConfiguration; + import org.apache.hadoop.hbase.HColumnDescriptor; + import org.apache.hadoop.hbase.HTableDescriptor; + import org.apache.hadoop.hbase.TableName; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.classification.InterfaceStability; + import org.apache.hadoop.hbase.client.HBaseAdmin; + import org.apache.hadoop.hbase.util.Bytes; + import org.apache.hadoop.util.Tool; + import org.apache.hadoop.util.ToolRunner; + import org.apache.zookeeper.KeeperException; + + import com.google.protobuf.ServiceException; + + /** + * The sweep tool. It deletes the mob files that are not used and merges the small mob files to + * bigger ones. Each run of this sweep tool only handles one column family. The runs on + * the same column family are mutually exclusive. And the major compaction and sweep tool on the + * same column family are mutually exclusive too. + */ + @InterfaceAudience.Public + @InterfaceStability.Evolving + public class Sweeper extends Configured implements Tool { + + /** + * Sweeps the mob files on one column family. It deletes the unused mob files and merges + * the small mob files into bigger ones. + * @param tableName The current table name in string format. + * @param familyName The column family name. + * @return 0 if success, 2 if job aborted with an exception, 3 if unable to start due to + * other compaction,4 if mr job was unsuccessful + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + * @throws KeeperException + * @throws ServiceException + */ + int sweepFamily(String tableName, String familyName) throws IOException, InterruptedException, + ClassNotFoundException, KeeperException, ServiceException { + Configuration conf = getConf(); + // make sure the target HBase exists. + HBaseAdmin.checkHBaseAvailable(conf); + HBaseAdmin admin = new HBaseAdmin(conf); + try { + FileSystem fs = FileSystem.get(conf); + TableName tn = TableName.valueOf(tableName); + HTableDescriptor htd = admin.getTableDescriptor(tn); + HColumnDescriptor family = htd.getFamily(Bytes.toBytes(familyName)); + if (family == null || !family.isMobEnabled()) { + throw new IOException("Column family " + familyName + " is not a MOB column family"); + } + SweepJob job = new SweepJob(conf, fs); + // Run the sweeping + return job.sweep(tn, family); + } catch (Exception e) { + System.err.println("Job aborted due to exception " + e); + return 2; // job failed + } finally { + try { + admin.close(); + } catch (IOException e) { + System.out.println("Failed to close the HBaseAdmin: " + e.getMessage()); + } + } + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + int ret = ToolRunner.run(conf, new Sweeper(), args); + System.exit(ret); + } + + private void printUsage() { + System.err.println("Usage:\n" + "--------------------------\n" + Sweeper.class.getName() + + " tableName familyName"); + System.err.println(" tableName The table name"); + System.err.println(" familyName The column family name"); + } + + /** + * Main method for the tool. + * @return 0 if success, 1 for bad args. 2 if job aborted with an exception, + * 3 if unable to start due to other compaction, 4 if mr job was unsuccessful + */ + public int run(String[] args) throws Exception { + if (args.length != 2) { + printUsage(); + return 1; + } + String table = args[0]; + String family = args[1]; + return sweepFamily(table, family); + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index 519edde,04d2b13..51e1a2d --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@@ -71,12 -79,15 +79,15 @@@ public class DefaultStoreEngine extend } catch (Exception e) { throw new IOException("Unable to load configured compactor '" + className + "'", e); } - className = conf.get( + } + + protected void createCompactionPolicy(Configuration conf, Store store) throws IOException { + String className = conf.get( - DEFAULT_COMPACTION_POLICY_CLASS_KEY, DEFAULT_COMPACTION_POLICY_CLASS.getName()); + DEFAULT_COMPACTION_POLICY_CLASS_KEY, DEFAULT_COMPACTION_POLICY_CLASS.getName()); try { compactionPolicy = ReflectionUtils.instantiateWithCustomCtor(className, - new Class[]{Configuration.class, StoreConfigInformation.class}, - new Object[]{conf, store}); + new Class[] { Configuration.class, StoreConfigInformation.class }, + new Object[] { conf, store }); } catch (Exception e) { throw new IOException("Unable to load configured compaction policy '" + className + "'", e); } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 0000000,8f12656..ad0c4d7 mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@@ -1,0 -1,585 +1,585 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.regionserver; + + import java.io.FileNotFoundException; + import java.io.IOException; + import java.util.ArrayList; + import java.util.Date; + import java.util.List; + import java.util.Map; + import java.util.NavigableSet; + import java.util.UUID; + import java.util.concurrent.ConcurrentHashMap; + + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hbase.Cell; + import org.apache.hadoop.hbase.CellComparator; + import org.apache.hadoop.hbase.HColumnDescriptor; + import org.apache.hadoop.hbase.HConstants; + import org.apache.hadoop.hbase.KeyValue; + import org.apache.hadoop.hbase.KeyValue.Type; + import org.apache.hadoop.hbase.TableName; + import org.apache.hadoop.hbase.Tag; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.client.Scan; + import org.apache.hadoop.hbase.filter.Filter; + import org.apache.hadoop.hbase.filter.FilterList; + import org.apache.hadoop.hbase.io.compress.Compression; + import org.apache.hadoop.hbase.io.hfile.CacheConfig; + import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; + import org.apache.hadoop.hbase.io.hfile.HFileContext; + import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; + import org.apache.hadoop.hbase.master.TableLockManager; + import org.apache.hadoop.hbase.master.TableLockManager.TableLock; + import org.apache.hadoop.hbase.mob.MobCacheConfig; + import org.apache.hadoop.hbase.mob.MobConstants; + import org.apache.hadoop.hbase.mob.MobFile; + import org.apache.hadoop.hbase.mob.MobFileName; + import org.apache.hadoop.hbase.mob.MobStoreEngine; + import org.apache.hadoop.hbase.mob.MobUtils; + import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; + import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; + import org.apache.hadoop.hbase.util.Bytes; + import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + import org.apache.hadoop.hbase.util.HFileArchiveUtil; + import org.apache.hadoop.hbase.util.IdLock; + + /** + * The store implementation to save MOBs (medium objects), it extends the HStore. + * When a descriptor of a column family has the value "IS_MOB", it means this column family + * is a mob one. When a HRegion instantiate a store for this column family, the HMobStore is + * created. + * HMobStore is almost the same with the HStore except using different types of scanners. + * In the method of getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. + * In these scanners, a additional seeks in the mob files should be performed after the seek + * to HBase is done. + * The store implements how we save MOBs by extending HStore. When a descriptor + * of a column family has the value "IS_MOB", it means this column family is a mob one. When a + * HRegion instantiate a store for this column family, the HMobStore is created. HMobStore is + * almost the same with the HStore except using different types of scanners. In the method of + * getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. In these scanners, a + * additional seeks in the mob files should be performed after the seek in HBase is done. + */ + @InterfaceAudience.Private + public class HMobStore extends HStore { + private static final Log LOG = LogFactory.getLog(HMobStore.class); + private MobCacheConfig mobCacheConfig; + private Path homePath; + private Path mobFamilyPath; + private volatile long cellsCountCompactedToMob = 0; + private volatile long cellsCountCompactedFromMob = 0; + private volatile long cellsSizeCompactedToMob = 0; + private volatile long cellsSizeCompactedFromMob = 0; + private volatile long mobFlushCount = 0; + private volatile long mobFlushedCellsCount = 0; + private volatile long mobFlushedCellsSize = 0; + private volatile long mobScanCellsCount = 0; + private volatile long mobScanCellsSize = 0; + private HColumnDescriptor family; + private TableLockManager tableLockManager; + private TableName tableLockName; + private Map<String, List<Path>> map = new ConcurrentHashMap<String, List<Path>>(); + private final IdLock keyLock = new IdLock(); + + public HMobStore(final HRegion region, final HColumnDescriptor family, + final Configuration confParam) throws IOException { + super(region, family, confParam); + this.family = family; + this.mobCacheConfig = (MobCacheConfig) cacheConf; + this.homePath = MobUtils.getMobHome(conf); + this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(), + family.getNameAsString()); + List<Path> locations = new ArrayList<Path>(2); + locations.add(mobFamilyPath); + TableName tn = region.getTableDesc().getTableName(); + locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils.getMobRegionInfo(tn) + .getEncodedName(), family.getNameAsString())); + map.put(Bytes.toString(tn.getName()), locations); + if (region.getRegionServerServices() != null) { + tableLockManager = region.getRegionServerServices().getTableLockManager(); + tableLockName = MobUtils.getTableLockName(getTableName()); + } + } + + /** + * Creates the mob cache config. + */ + @Override + protected void createCacheConf(HColumnDescriptor family) { + cacheConf = new MobCacheConfig(conf, family); + } + + /** + * Gets current config. + */ + public Configuration getConfiguration() { + return this.conf; + } + + /** + * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in + * the mob files should be performed after the seek in HBase is done. + */ + @Override + protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols, + long readPt, KeyValueScanner scanner) throws IOException { + if (scanner == null) { + if (MobUtils.isRefOnlyScan(scan)) { + Filter refOnlyFilter = new MobReferenceOnlyFilter(); + Filter filter = scan.getFilter(); + if (filter != null) { + scan.setFilter(new FilterList(filter, refOnlyFilter)); + } else { + scan.setFilter(refOnlyFilter); + } + } + scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan, + targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt); + } + return scanner; + } + + /** + * Creates the mob store engine. + */ + @Override + protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf, + CellComparator cellComparator) throws IOException { + MobStoreEngine engine = new MobStoreEngine(); + engine.createComponents(conf, store, cellComparator); + return engine; + } + + /** + * Gets the temp directory. + * @return The temp directory. + */ + private Path getTempDir() { + return new Path(homePath, MobConstants.TEMP_DIR_NAME); + } + + /** + * Creates the writer for the mob file in temp directory. + * @param date The latest date of written cells. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @return The writer for the mob file. + * @throws IOException + */ + public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey) throws IOException { + if (startKey == null) { + startKey = HConstants.EMPTY_START_ROW; + } + Path path = getTempDir(); + return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey); + } + + /** + * Creates the writer for the del file in temp directory. + * The del file keeps tracking the delete markers. Its name has a suffix _del, + * the format is [0-9a-f]+(_del)?. + * @param date The latest date of written cells. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @return The writer for the del file. + * @throws IOException + */ + public StoreFile.Writer createDelFileWriterInTmp(Date date, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey) throws IOException { + if (startKey == null) { + startKey = HConstants.EMPTY_START_ROW; + } + Path path = getTempDir(); + String suffix = UUID + .randomUUID().toString().replaceAll("-", "") + "_del"; + MobFileName mobFileName = MobFileName.create(startKey, MobUtils.formatDate(date), suffix); + return createWriterInTmp(mobFileName, path, maxKeyCount, compression); + } + + /** + * Creates the writer for the mob file in temp directory. + * @param date The date string, its format is yyyymmmdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @return The writer for the mob file. + * @throws IOException + */ + public StoreFile.Writer createWriterInTmp(String date, Path basePath, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey) throws IOException { + MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID() + .toString().replaceAll("-", "")); + return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression); + } + + /** + * Creates the writer for the mob file in temp directory. + * @param mobFileName The mob file name. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @return The writer for the mob file. + * @throws IOException + */ - public StoreFile.Writer createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount, - Compression.Algorithm compression) throws IOException { ++ public StoreFile.Writer createWriterInTmp(MobFileName mobFileName, Path basePath, ++ long maxKeyCount, Compression.Algorithm compression) throws IOException { + final CacheConfig writerCacheConf = mobCacheConfig; + HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) + .withIncludesMvcc(true).withIncludesTags(true) + .withCompressTags(family.isCompressTags()) + .withChecksumType(checksumType) + .withBytesPerCheckSum(bytesPerChecksum) + .withBlockSize(blocksize) + .withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding()) + .withEncryptionContext(cryptoContext) + .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); + + StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, region.getFilesystem()) + .withFilePath(new Path(basePath, mobFileName.getFileName())) + .withComparator(CellComparator.COMPARATOR).withBloomType(BloomType.NONE) + .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); + return w; + } + + /** + * Commits the mob file. + * @param sourceFile The source file. + * @param targetPath The directory path where the source file is renamed to. + * @throws IOException + */ + public void commitFile(final Path sourceFile, Path targetPath) throws IOException { + if (sourceFile == null) { + return; + } + Path dstPath = new Path(targetPath, sourceFile.getName()); + validateMobFile(sourceFile); + String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath; + LOG.info(msg); + Path parent = dstPath.getParent(); + if (!region.getFilesystem().exists(parent)) { + region.getFilesystem().mkdirs(parent); + } + if (!region.getFilesystem().rename(sourceFile, dstPath)) { + throw new IOException("Failed rename of " + sourceFile + " to " + dstPath); + } + } + + /** + * Validates a mob file by opening and closing it. + * + * @param path the path to the mob file + */ + private void validateMobFile(Path path) throws IOException { + StoreFile storeFile = null; + try { + storeFile = + new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE); + storeFile.createReader(); + } catch (IOException e) { + LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e); + throw e; + } finally { + if (storeFile != null) { + storeFile.closeReader(false); + } + } + } + + /** + * Reads the cell from the mob file, and the read point does not count. + * This is used for DefaultMobStoreCompactor where we can read empty value for the missing cell. + * @param reference The cell found in the HBase, its value is a path to a mob file. + * @param cacheBlocks Whether the scanner should cache blocks. + * @return The cell found in the mob file. + * @throws IOException + */ + public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException { + return resolve(reference, cacheBlocks, -1, true); + } + + /** + * Reads the cell from the mob file. + * @param reference The cell found in the HBase, its value is a path to a mob file. + * @param cacheBlocks Whether the scanner should cache blocks. + * @param readPt the read point. + * @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is + * missing or corrupt. + * @return The cell found in the mob file. + * @throws IOException + */ + public Cell resolve(Cell reference, boolean cacheBlocks, long readPt, + boolean readEmptyValueOnMobCellMiss) throws IOException { + Cell result = null; + if (MobUtils.hasValidMobRefCellValue(reference)) { + String fileName = MobUtils.getMobFileName(reference); + Tag tableNameTag = MobUtils.getTableNameTag(reference); + if (tableNameTag != null) { + byte[] tableName = tableNameTag.getValue(); + String tableNameString = Bytes.toString(tableName); + List<Path> locations = map.get(tableNameString); + if (locations == null) { + IdLock.Entry lockEntry = keyLock.getLockEntry(tableNameString.hashCode()); + try { + locations = map.get(tableNameString); + if (locations == null) { + locations = new ArrayList<Path>(2); + TableName tn = TableName.valueOf(tableName); + locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString())); + locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils + .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString())); + map.put(tableNameString, locations); + } + } finally { + keyLock.releaseLockEntry(lockEntry); + } + } + result = readCell(locations, fileName, reference, cacheBlocks, readPt, + readEmptyValueOnMobCellMiss); + } + } + if (result == null) { + LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family," + + "qualifier,timestamp,type and tags but with an empty value to return."); + result = new KeyValue(reference.getRowArray(), reference.getRowOffset(), + reference.getRowLength(), reference.getFamilyArray(), reference.getFamilyOffset(), + reference.getFamilyLength(), reference.getQualifierArray(), + reference.getQualifierOffset(), reference.getQualifierLength(), reference.getTimestamp(), + Type.codeToType(reference.getTypeByte()), HConstants.EMPTY_BYTE_ARRAY, + 0, 0, reference.getTagsArray(), reference.getTagsOffset(), + reference.getTagsLength()); + } + return result; + } + + /** + * Reads the cell from a mob file. + * The mob file might be located in different directories. + * 1. The working directory. + * 2. The archive directory. + * Reads the cell from the files located in both of the above directories. + * @param locations The possible locations where the mob files are saved. + * @param fileName The file to be read. + * @param search The cell to be searched. + * @param cacheMobBlocks Whether the scanner should cache blocks. + * @param readPt the read point. + * @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is + * missing or corrupt. + * @return The found cell. Null if there's no such a cell. + * @throws IOException + */ + private Cell readCell(List<Path> locations, String fileName, Cell search, boolean cacheMobBlocks, + long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException { + FileSystem fs = getFileSystem(); + Throwable throwable = null; + for (Path location : locations) { + MobFile file = null; + Path path = new Path(location, fileName); + try { + file = mobCacheConfig.getMobFileCache().openFile(fs, path, mobCacheConfig); + return readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt) : file.readCell(search, + cacheMobBlocks); + } catch (IOException e) { + mobCacheConfig.getMobFileCache().evictFile(fileName); + throwable = e; + if ((e instanceof FileNotFoundException) || + (e.getCause() instanceof FileNotFoundException)) { + LOG.warn("Fail to read the cell, the mob file " + path + " doesn't exist", e); + } else if (e instanceof CorruptHFileException) { + LOG.error("The mob file " + path + " is corrupt", e); + break; + } else { + throw e; + } + } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt() + mobCacheConfig.getMobFileCache().evictFile(fileName); + LOG.warn("Fail to read the cell", e); + throwable = e; + } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt() + mobCacheConfig.getMobFileCache().evictFile(fileName); + LOG.warn("Fail to read the cell", e); + throwable = e; + } finally { + if (file != null) { + mobCacheConfig.getMobFileCache().closeFile(file); + } + } + } + LOG.error("The mob file " + fileName + " could not be found in the locations " + locations + + " or it is corrupt"); + if (readEmptyValueOnMobCellMiss) { + return null; + } else if (throwable instanceof IOException) { + throw (IOException) throwable; + } else { + throw new IOException(throwable); + } + } + + /** + * Gets the mob file path. + * @return The mob file path. + */ + public Path getPath() { + return mobFamilyPath; + } + + /** + * The compaction in the store of mob. + * The cells in this store contains the path of the mob files. There might be race + * condition between the major compaction and the sweeping in mob files. + * In order to avoid this, we need mutually exclude the running of the major compaction and + * sweeping in mob files. + * The minor compaction is not affected. + * The major compaction is marked as retainDeleteMarkers when a sweeping is in progress. + */ + @Override + public List<StoreFile> compact(CompactionContext compaction, + CompactionThroughputController throughputController) throws IOException { + // If it's major compaction, try to find whether there's a sweeper is running + // If yes, mark the major compaction as retainDeleteMarkers + if (compaction.getRequest().isAllFiles()) { + // Use the Zookeeper to coordinate. + // 1. Acquire a operation lock. + // 1.1. If no, mark the major compaction as retainDeleteMarkers and continue the compaction. + // 1.2. If the lock is obtained, search the node of sweeping. + // 1.2.1. If the node is there, the sweeping is in progress, mark the major + // compaction as retainDeleteMarkers and continue the compaction. + // 1.2.2. If the node is not there, add a child to the major compaction node, and + // run the compaction directly. + TableLock lock = null; + if (tableLockManager != null) { + lock = tableLockManager.readLock(tableLockName, "Major compaction in HMobStore"); + } + boolean tableLocked = false; + String tableName = getTableName().getNameAsString(); + if (lock != null) { + try { + LOG.info("Start to acquire a read lock for the table[" + tableName + + "], ready to perform the major compaction"); + lock.acquire(); + tableLocked = true; + } catch (Exception e) { + LOG.error("Fail to lock the table " + tableName, e); + } + } else { + // If the tableLockManager is null, mark the tableLocked as true. + tableLocked = true; + } + try { + if (!tableLocked) { + LOG.warn("Cannot obtain the table lock, maybe a sweep tool is running on this table[" + + tableName + "], forcing the delete markers to be retained"); + compaction.getRequest().forceRetainDeleteMarkers(); + } + return super.compact(compaction, throughputController); + } finally { + if (tableLocked && lock != null) { + try { + lock.release(); + } catch (IOException e) { + LOG.error("Fail to release the table lock " + tableName, e); + } + } + } + } else { + // If it's not a major compaction, continue the compaction. + return super.compact(compaction, throughputController); + } + } + + public void updateCellsCountCompactedToMob(long count) { + cellsCountCompactedToMob += count; + } + + public long getCellsCountCompactedToMob() { + return cellsCountCompactedToMob; + } + + public void updateCellsCountCompactedFromMob(long count) { + cellsCountCompactedFromMob += count; + } + + public long getCellsCountCompactedFromMob() { + return cellsCountCompactedFromMob; + } + + public void updateCellsSizeCompactedToMob(long size) { + cellsSizeCompactedToMob += size; + } + + public long getCellsSizeCompactedToMob() { + return cellsSizeCompactedToMob; + } + + public void updateCellsSizeCompactedFromMob(long size) { + cellsSizeCompactedFromMob += size; + } + + public long getCellsSizeCompactedFromMob() { + return cellsSizeCompactedFromMob; + } + + public void updateMobFlushCount() { + mobFlushCount++; + } + + public long getMobFlushCount() { + return mobFlushCount; + } + + public void updateMobFlushedCellsCount(long count) { + mobFlushedCellsCount += count; + } + + public long getMobFlushedCellsCount() { + return mobFlushedCellsCount; + } + + public void updateMobFlushedCellsSize(long size) { + mobFlushedCellsSize += size; + } + + public long getMobFlushedCellsSize() { + return mobFlushedCellsSize; + } + + public void updateMobScanCellsCount(long count) { + mobScanCellsCount += count; + } + + public long getMobScanCellsCount() { + return mobScanCellsCount; + } + + public void updateMobScanCellsSize(long size) { + mobScanCellsSize += size; + } + + public long getMobScanCellsSize() { + return mobScanCellsSize; + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ----------------------------------------------------------------------
